From 3d2b2fc68de86dc17b0be2e49561b6687cdde6da Mon Sep 17 00:00:00 2001 From: Levi Neuwirth Date: Mon, 13 Apr 2026 12:16:53 -0400 Subject: [PATCH] i/o scaffold --- .github/workflows/ci.yml | 15 +-- pyproject.toml | 11 ++ src/neuropose/config.py | 162 ++++++++++++++++++++++++++++++ src/neuropose/io.py | 199 ++++++++++++++++++++++++++++++++++++ tests/conftest.py | 38 +++++++ tests/unit/test_config.py | 178 ++++++++++++++++++++++++++++++++ tests/unit/test_io.py | 206 ++++++++++++++++++++++++++++++++++++++ 7 files changed, 795 insertions(+), 14 deletions(-) create mode 100644 src/neuropose/config.py create mode 100644 src/neuropose/io.py create mode 100644 tests/conftest.py create mode 100644 tests/unit/test_config.py create mode 100644 tests/unit/test_io.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4868487..32c7d3f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -93,18 +93,5 @@ jobs: - name: Install project + dev dependencies run: uv sync --group dev - # Tolerates exit code 5 ("no tests collected") because the tests - # directory does not exist yet at commit 3. Commit 4 lands the first - # tests and this tolerance block should be removed at that point. - name: Pytest - shell: bash - run: | - set +e - uv run pytest --tb=short - ec=$? - set -e - if [ "$ec" -eq 5 ]; then - echo "::warning::pytest collected no tests. Remove this tolerance after commit 4 lands real tests." - exit 0 - fi - exit "$ec" + run: uv run pytest --tb=short diff --git a/pyproject.toml b/pyproject.toml index b64e736..adbddec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -132,6 +132,17 @@ known-first-party = ["neuropose"] [tool.ruff.format] docstring-code-format = true +# --------------------------------------------------------------------------- +# Pytest. +# --------------------------------------------------------------------------- +[tool.pytest.ini_options] +testpaths = ["tests"] +pythonpath = ["src"] +addopts = ["--strict-markers", "--strict-config", "-ra"] +markers = [ + "slow: tests that require model download or are otherwise slow (deselect with '-m \"not slow\"')", +] + # --------------------------------------------------------------------------- # Pyright. "standard" rather than "strict" because the TensorFlow / OpenCV / # scikit-learn stubs would generate thousands of unknown-type warnings under diff --git a/src/neuropose/config.py b/src/neuropose/config.py new file mode 100644 index 0000000..f4829c9 --- /dev/null +++ b/src/neuropose/config.py @@ -0,0 +1,162 @@ +"""Runtime configuration for NeuroPose. + +Central settings model built on pydantic-settings. Configuration can be +supplied by, in order of decreasing precedence: + +1. Keyword arguments passed directly to ``Settings(...)``. +2. Environment variables prefixed with ``NEUROPOSE_`` (e.g. + ``NEUROPOSE_DEVICE="/GPU:0"``). +3. A YAML file loaded explicitly via :meth:`Settings.from_yaml`. +4. Field defaults. + +There is intentionally no implicit config-file discovery. The daemon must be +pointed at a config file explicitly via the CLI ``--config`` flag. This +avoids the relative-path footgun from the previous prototype, where the +daemon only worked when launched from a specific working directory. +""" + +from __future__ import annotations + +import os +import re +from pathlib import Path +from typing import Any + +import yaml +from pydantic import Field, field_validator +from pydantic_settings import BaseSettings, SettingsConfigDict + +_DEVICE_PATTERN = re.compile(r"^/(CPU|GPU):\d+$") + + +def _xdg_data_home() -> Path: + """Return ``$XDG_DATA_HOME``, falling back to ``~/.local/share``.""" + env = os.environ.get("XDG_DATA_HOME") + if env: + return Path(env) + return Path.home() / ".local" / "share" + + +def _default_data_dir() -> Path: + """Return the default runtime data directory (under XDG).""" + return _xdg_data_home() / "neuropose" / "jobs" + + +def _default_model_cache_dir() -> Path: + """Return the default MeTRAbs model cache directory (under XDG).""" + return _xdg_data_home() / "neuropose" / "models" + + +class Settings(BaseSettings): + """NeuroPose runtime configuration. + + Parameters + ---------- + data_dir + Base directory that holds ``in/``, ``out/``, and ``failed/`` + subdirectories for job processing. Defaults to a location under + ``$XDG_DATA_HOME`` so runtime data never lives inside the repository. + model_cache_dir + Directory where the MeTRAbs model is downloaded and cached. + poll_interval_seconds + Interval between filesystem scans performed by the interfacer daemon. + device + TensorFlow device string, e.g. ``"/CPU:0"`` or ``"/GPU:0"``. + default_fov_degrees + Default horizontal field of view passed to MeTRAbs when a video does + not supply camera intrinsics. The MeTRAbs upstream default is 55°. + """ + + model_config = SettingsConfigDict( + env_prefix="NEUROPOSE_", + env_nested_delimiter="__", + extra="forbid", + ) + + data_dir: Path = Field(default_factory=_default_data_dir) + model_cache_dir: Path = Field(default_factory=_default_model_cache_dir) + poll_interval_seconds: int = Field(default=10, ge=1) + device: str = Field(default="/CPU:0") + default_fov_degrees: float = Field(default=55.0, gt=0.0, lt=180.0) + + @field_validator("device") + @classmethod + def _validate_device(cls, value: str) -> str: + if not _DEVICE_PATTERN.match(value): + raise ValueError( + f"device must match '/(CPU|GPU):' " + f"(e.g. '/CPU:0', '/GPU:0'); got {value!r}" + ) + return value + + @property + def input_dir(self) -> Path: + """Return the directory containing job subdirectories to be processed.""" + return self.data_dir / "in" + + @property + def output_dir(self) -> Path: + """Return the directory where completed job results are written.""" + return self.data_dir / "out" + + @property + def failed_dir(self) -> Path: + """Return the directory where inputs are quarantined after catastrophic failure.""" + return self.data_dir / "failed" + + @property + def status_file(self) -> Path: + """Return the path of the persistent job-status JSON file.""" + return self.output_dir / "status.json" + + @classmethod + def from_yaml(cls, path: Path) -> Settings: + """Load settings from a YAML file. + + Parameters + ---------- + path + Path to a YAML configuration file. The file must be a mapping of + field names to values; unknown fields are rejected. + + Returns + ------- + Settings + A validated settings instance. + + Raises + ------ + FileNotFoundError + If ``path`` does not exist. + ValueError + If the file is not a YAML mapping. + pydantic.ValidationError + If field validation fails. + """ + if not path.exists(): + raise FileNotFoundError(f"config file not found: {path}") + with path.open("r", encoding="utf-8") as f: + data: Any = yaml.safe_load(f) + if data is None: + data = {} + if not isinstance(data, dict): + raise ValueError( + f"config file must contain a YAML mapping; got {type(data).__name__}" + ) + return cls(**data) + + def ensure_dirs(self) -> None: + """Create all runtime directories if they do not already exist. + + Called by the interfacer daemon on startup. Kept as an explicit method + rather than a side effect of construction so that ``Settings()`` is + safe to call in tests without touching the filesystem. + """ + for path in ( + self.data_dir, + self.input_dir, + self.output_dir, + self.failed_dir, + self.model_cache_dir, + ): + path.mkdir(parents=True, exist_ok=True) diff --git a/src/neuropose/io.py b/src/neuropose/io.py new file mode 100644 index 0000000..51c5e33 --- /dev/null +++ b/src/neuropose/io.py @@ -0,0 +1,199 @@ +"""I/O helpers and schema definitions for NeuroPose prediction data. + +Defines pydantic models for per-frame predictions, per-video aggregated +predictions, job-level aggregated results, and the persistent status file. +All models are validated on load, so malformed files are caught at the +boundary rather than at some downstream call site. + +Atomicity: :func:`save_status` and :func:`save_job_results` write to a sibling +temp file and then atomically rename, so a crash mid-write will not leave a +partially-written file behind. This matches the crash-resilience guarantee +the interfacer daemon makes to callers. +""" + +from __future__ import annotations + +import json +from collections.abc import Iterator +from datetime import datetime +from enum import Enum +from pathlib import Path +from typing import Any + +from pydantic import BaseModel, ConfigDict, Field, RootModel + + +class JobStatus(str, Enum): + """Lifecycle state of a single processing job.""" + + PROCESSING = "processing" + COMPLETED = "completed" + FAILED = "failed" + + +class FramePrediction(BaseModel): + """Pose estimation output for a single video frame. + + Each inner list corresponds to one detected person. Coordinate units + follow MeTRAbs conventions: ``poses3d`` in millimetres, ``poses2d`` in + pixels, ``boxes`` as ``[x, y, width, height, confidence]`` in pixels. + + Frozen (immutable) to prevent the in-place coordinate-swap aliasing bug + that affected the previous prototype's visualizer. + """ + + model_config = ConfigDict(extra="forbid", frozen=True) + + boxes: list[list[float]] = Field( + description="Per-detection bounding boxes as [x, y, width, height, confidence]." + ) + poses3d: list[list[list[float]]] = Field( + description="Per-detection 3D joint positions in millimetres." + ) + poses2d: list[list[list[float]]] = Field( + description="Per-detection 2D joint positions in pixels." + ) + + +class VideoPredictions(RootModel[dict[str, FramePrediction]]): + """Per-frame predictions for a single video, keyed by frame filename. + + Frame names are expected to follow the ``frame_.png`` convention + written by the estimator, but no constraint is enforced at the schema + level so downstream consumers can key by any naming scheme. + """ + + def frames(self) -> list[str]: + """Return the frame names in insertion order.""" + return list(self.root.keys()) + + def __len__(self) -> int: + return len(self.root) + + def __iter__(self) -> Iterator[str]: # type: ignore[override] + return iter(self.root) + + def __getitem__(self, key: str) -> FramePrediction: + return self.root[key] + + +class JobResults(RootModel[dict[str, VideoPredictions]]): + """Aggregated predictions for an entire job, keyed by video filename. + + This is the shape of the top-level ``results.json`` written by the + interfacer daemon: one entry per video in the job directory. + """ + + def videos(self) -> list[str]: + """Return the video names in insertion order.""" + return list(self.root.keys()) + + def __len__(self) -> int: + return len(self.root) + + def __iter__(self) -> Iterator[str]: # type: ignore[override] + return iter(self.root) + + def __getitem__(self, key: str) -> VideoPredictions: + return self.root[key] + + +class JobStatusEntry(BaseModel): + """Status entry for a single job in the persistent status file.""" + + model_config = ConfigDict(extra="forbid") + + status: JobStatus + started_at: datetime + completed_at: datetime | None = None + results_path: Path | None = None + error: str | None = Field( + default=None, + description=( + "Short human-readable reason if status == failed. " + "Populated by the interfacer on failure paths." + ), + ) + + +class StatusFile(RootModel[dict[str, JobStatusEntry]]): + """Mapping of job name to its status entry.""" + + def is_empty(self) -> bool: + """Return ``True`` if the status file contains no entries.""" + return len(self.root) == 0 + + def __len__(self) -> int: + return len(self.root) + + def __iter__(self) -> Iterator[str]: # type: ignore[override] + return iter(self.root) + + +# --------------------------------------------------------------------------- +# Load / save helpers +# --------------------------------------------------------------------------- + + +def load_video_predictions(path: Path) -> VideoPredictions: + """Load and validate a per-video predictions JSON file.""" + with path.open("r", encoding="utf-8") as f: + data: Any = json.load(f) + return VideoPredictions.model_validate(data) + + +def save_video_predictions(path: Path, predictions: VideoPredictions) -> None: + """Serialize per-video predictions to a JSON file.""" + path.parent.mkdir(parents=True, exist_ok=True) + _write_json_atomic(path, predictions.model_dump(mode="json")) + + +def load_job_results(path: Path) -> JobResults: + """Load and validate an aggregated per-job results JSON file.""" + with path.open("r", encoding="utf-8") as f: + data: Any = json.load(f) + return JobResults.model_validate(data) + + +def save_job_results(path: Path, results: JobResults) -> None: + """Serialize aggregated job results to a JSON file atomically.""" + path.parent.mkdir(parents=True, exist_ok=True) + _write_json_atomic(path, results.model_dump(mode="json")) + + +def load_status(path: Path) -> StatusFile: + """Load the persistent job status file. + + Returns an empty :class:`StatusFile` if the file is missing, is not valid + JSON, or does not contain a JSON mapping. This preserves the + crash-resilient behaviour the daemon relies on: a missing or corrupted + status file is treated as a clean slate rather than a fatal error. + """ + if not path.exists(): + return StatusFile(root={}) + try: + with path.open("r", encoding="utf-8") as f: + data: Any = json.load(f) + except json.JSONDecodeError: + return StatusFile(root={}) + if not isinstance(data, dict): + return StatusFile(root={}) + return StatusFile.model_validate(data) + + +def save_status(path: Path, status: StatusFile) -> None: + """Persist the job status file atomically.""" + path.parent.mkdir(parents=True, exist_ok=True) + _write_json_atomic(path, status.model_dump(mode="json")) + + +def _write_json_atomic(path: Path, payload: Any) -> None: + """Write ``payload`` to ``path`` as JSON, atomically. + + Writes to a sibling ``.tmp`` first, then atomically renames over + ``path`` so a crash mid-write cannot leave behind a truncated file. + """ + tmp = path.with_suffix(path.suffix + ".tmp") + with tmp.open("w", encoding="utf-8") as f: + json.dump(payload, f, indent=2) + tmp.replace(path) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..6023d16 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,38 @@ +"""Shared pytest configuration and fixtures for the NeuroPose test suite.""" + +from __future__ import annotations + +import os +from collections.abc import Iterator +from pathlib import Path + +import pytest + + +@pytest.fixture(autouse=True) +def _isolate_environment( + monkeypatch: pytest.MonkeyPatch, + tmp_path_factory: pytest.TempPathFactory, +) -> Iterator[None]: + """Isolate every test from the developer's real home directory. + + Points ``$HOME`` and ``$XDG_DATA_HOME`` at per-test temp directories so + that any code path that uses the default ``Settings()`` (which reaches + into ``~/.local/share/neuropose``) cannot accidentally write to the real + machine. Also clears any ``NEUROPOSE_*`` environment variables that may + be set in the developer's shell, so test behaviour does not depend on + who is running the test suite. + """ + isolated = tmp_path_factory.mktemp("neuropose_env_isolation") + monkeypatch.setenv("HOME", str(isolated)) + monkeypatch.setenv("XDG_DATA_HOME", str(isolated / "xdg")) + for key in list(os.environ): + if key.startswith("NEUROPOSE_"): + monkeypatch.delenv(key, raising=False) + yield + + +@pytest.fixture +def xdg_home() -> Path: + """Return the isolated ``$XDG_DATA_HOME`` set up by ``_isolate_environment``.""" + return Path(os.environ["XDG_DATA_HOME"]) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py new file mode 100644 index 0000000..0cdd7d5 --- /dev/null +++ b/tests/unit/test_config.py @@ -0,0 +1,178 @@ +"""Tests for :mod:`neuropose.config`.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest +import yaml +from pydantic import ValidationError + +from neuropose.config import Settings + + +class TestDefaults: + """Default values wire through XDG correctly.""" + + def test_data_dir_uses_xdg_data_home(self, xdg_home: Path) -> None: + settings = Settings() + assert settings.data_dir == xdg_home / "neuropose" / "jobs" + + def test_model_cache_dir_uses_xdg_data_home(self, xdg_home: Path) -> None: + settings = Settings() + assert settings.model_cache_dir == xdg_home / "neuropose" / "models" + + def test_derived_directories(self, xdg_home: Path) -> None: + settings = Settings() + assert settings.input_dir == settings.data_dir / "in" + assert settings.output_dir == settings.data_dir / "out" + assert settings.failed_dir == settings.data_dir / "failed" + assert settings.status_file == settings.output_dir / "status.json" + + def test_fallback_when_xdg_unset( + self, + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, + ) -> None: + monkeypatch.delenv("XDG_DATA_HOME", raising=False) + monkeypatch.setenv("HOME", str(tmp_path)) + settings = Settings() + assert settings.data_dir == tmp_path / ".local" / "share" / "neuropose" / "jobs" + + def test_default_scalars(self, xdg_home: Path) -> None: + settings = Settings() + assert settings.poll_interval_seconds == 10 + assert settings.device == "/CPU:0" + assert settings.default_fov_degrees == pytest.approx(55.0) + + +class TestValidation: + """Field validators reject malformed input.""" + + @pytest.mark.parametrize("device", ["/CPU:0", "/GPU:0", "/CPU:1", "/GPU:3"]) + def test_device_accepts_valid(self, device: str, xdg_home: Path) -> None: + settings = Settings(device=device) + assert settings.device == device + + @pytest.mark.parametrize("device", ["cpu", "/cpu:0", "GPU:0", "/TPU:0", "", "/GPU"]) + def test_device_rejects_invalid(self, device: str, xdg_home: Path) -> None: + with pytest.raises(ValidationError): + Settings(device=device) + + def test_poll_interval_rejects_zero(self, xdg_home: Path) -> None: + with pytest.raises(ValidationError): + Settings(poll_interval_seconds=0) + + def test_poll_interval_rejects_negative(self, xdg_home: Path) -> None: + with pytest.raises(ValidationError): + Settings(poll_interval_seconds=-5) + + def test_fov_rejects_zero(self, xdg_home: Path) -> None: + with pytest.raises(ValidationError): + Settings(default_fov_degrees=0) + + def test_fov_rejects_at_limit(self, xdg_home: Path) -> None: + with pytest.raises(ValidationError): + Settings(default_fov_degrees=180) + + def test_extra_fields_rejected(self, xdg_home: Path) -> None: + with pytest.raises(ValidationError): + Settings(nonexistent_field=True) + + +class TestYamlLoad: + """``Settings.from_yaml`` behaves correctly for valid and malformed files.""" + + def test_valid(self, tmp_path: Path, xdg_home: Path) -> None: + config_path = tmp_path / "config.yaml" + config_path.write_text( + yaml.safe_dump({"device": "/GPU:0", "poll_interval_seconds": 30}) + ) + settings = Settings.from_yaml(config_path) + assert settings.device == "/GPU:0" + assert settings.poll_interval_seconds == 30 + + def test_missing_file_raises(self, tmp_path: Path) -> None: + with pytest.raises(FileNotFoundError): + Settings.from_yaml(tmp_path / "nope.yaml") + + def test_empty_file_uses_defaults(self, tmp_path: Path, xdg_home: Path) -> None: + config_path = tmp_path / "config.yaml" + config_path.write_text("") + settings = Settings.from_yaml(config_path) + assert settings.poll_interval_seconds == 10 + + def test_non_mapping_rejected(self, tmp_path: Path) -> None: + config_path = tmp_path / "config.yaml" + config_path.write_text("- item1\n- item2\n") + with pytest.raises(ValueError, match="YAML mapping"): + Settings.from_yaml(config_path) + + def test_invalid_field_rejected(self, tmp_path: Path, xdg_home: Path) -> None: + config_path = tmp_path / "config.yaml" + config_path.write_text(yaml.safe_dump({"device": "cpu"})) + with pytest.raises(ValidationError): + Settings.from_yaml(config_path) + + +class TestEnvironmentOverrides: + """``NEUROPOSE_*`` env vars override field defaults.""" + + def test_device_override( + self, + xdg_home: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + monkeypatch.setenv("NEUROPOSE_DEVICE", "/GPU:0") + settings = Settings() + assert settings.device == "/GPU:0" + + def test_poll_interval_override( + self, + xdg_home: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + monkeypatch.setenv("NEUROPOSE_POLL_INTERVAL_SECONDS", "60") + settings = Settings() + assert settings.poll_interval_seconds == 60 + + def test_kwargs_beat_env_vars( + self, + xdg_home: Path, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + monkeypatch.setenv("NEUROPOSE_DEVICE", "/GPU:0") + settings = Settings(device="/CPU:0") + assert settings.device == "/CPU:0" + + +class TestEnsureDirs: + """``ensure_dirs`` creates all directories idempotently.""" + + def test_creates_all_directories(self, tmp_path: Path) -> None: + settings = Settings( + data_dir=tmp_path / "jobs", + model_cache_dir=tmp_path / "models", + ) + settings.ensure_dirs() + assert settings.data_dir.is_dir() + assert settings.input_dir.is_dir() + assert settings.output_dir.is_dir() + assert settings.failed_dir.is_dir() + assert settings.model_cache_dir.is_dir() + + def test_idempotent(self, tmp_path: Path) -> None: + settings = Settings( + data_dir=tmp_path / "jobs", + model_cache_dir=tmp_path / "models", + ) + settings.ensure_dirs() + settings.ensure_dirs() + assert settings.data_dir.is_dir() + + def test_construction_has_no_filesystem_side_effects(self, tmp_path: Path) -> None: + # Creating Settings() must NOT touch the filesystem. + target = tmp_path / "jobs" + assert not target.exists() + _ = Settings(data_dir=target, model_cache_dir=tmp_path / "models") + assert not target.exists() diff --git a/tests/unit/test_io.py b/tests/unit/test_io.py new file mode 100644 index 0000000..70f5765 --- /dev/null +++ b/tests/unit/test_io.py @@ -0,0 +1,206 @@ +"""Tests for :mod:`neuropose.io` schema and helpers.""" + +from __future__ import annotations + +import json +from datetime import UTC, datetime +from pathlib import Path + +import pytest +from pydantic import ValidationError + +from neuropose.io import ( + FramePrediction, + JobResults, + JobStatus, + StatusFile, + VideoPredictions, + load_job_results, + load_status, + load_video_predictions, + save_job_results, + save_status, + save_video_predictions, +) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def one_frame() -> dict: + """A minimal valid FramePrediction payload (one person, two joints).""" + return { + "boxes": [[10.0, 20.0, 100.0, 200.0, 0.95]], + "poses3d": [[[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]], + "poses2d": [[[10.0, 20.0], [30.0, 40.0]]], + } + + +@pytest.fixture +def video_payload(one_frame: dict) -> dict: + return { + "frame_0000.png": one_frame, + "frame_0001.png": one_frame, + } + + +# --------------------------------------------------------------------------- +# FramePrediction +# --------------------------------------------------------------------------- + + +class TestFramePrediction: + def test_roundtrip(self, one_frame: dict) -> None: + frame = FramePrediction.model_validate(one_frame) + assert frame.boxes == one_frame["boxes"] + assert frame.poses3d == one_frame["poses3d"] + assert frame.poses2d == one_frame["poses2d"] + + def test_rejects_extra_fields(self, one_frame: dict) -> None: + one_frame["extra"] = "bogus" + with pytest.raises(ValidationError): + FramePrediction.model_validate(one_frame) + + def test_is_frozen(self, one_frame: dict) -> None: + frame = FramePrediction.model_validate(one_frame) + with pytest.raises(ValidationError): + frame.boxes = [] + + +# --------------------------------------------------------------------------- +# VideoPredictions +# --------------------------------------------------------------------------- + + +class TestVideoPredictions: + def test_from_dict(self, video_payload: dict) -> None: + vp = VideoPredictions.model_validate(video_payload) + assert len(vp) == 2 + assert vp.frames() == ["frame_0000.png", "frame_0001.png"] + assert vp["frame_0000.png"].boxes[0][4] == pytest.approx(0.95) + + def test_iteration(self, video_payload: dict) -> None: + vp = VideoPredictions.model_validate(video_payload) + assert list(vp) == ["frame_0000.png", "frame_0001.png"] + + def test_save_and_load_roundtrip(self, tmp_path: Path, video_payload: dict) -> None: + vp = VideoPredictions.model_validate(video_payload) + path = tmp_path / "preds" / "video.json" + save_video_predictions(path, vp) + assert path.exists() + loaded = load_video_predictions(path) + assert loaded.frames() == vp.frames() + assert loaded["frame_0000.png"].poses3d == vp["frame_0000.png"].poses3d + + +# --------------------------------------------------------------------------- +# JobResults +# --------------------------------------------------------------------------- + + +class TestJobResults: + def test_save_and_load_roundtrip(self, tmp_path: Path, video_payload: dict) -> None: + jr = JobResults.model_validate( + {"video_a.mp4": video_payload, "video_b.mp4": video_payload} + ) + path = tmp_path / "results.json" + save_job_results(path, jr) + loaded = load_job_results(path) + assert loaded.videos() == ["video_a.mp4", "video_b.mp4"] + assert len(loaded["video_a.mp4"]) == 2 + + +# --------------------------------------------------------------------------- +# Status file +# --------------------------------------------------------------------------- + + +class TestStatusFile: + def test_load_missing_returns_empty(self, tmp_path: Path) -> None: + status = load_status(tmp_path / "nope.json") + assert status.is_empty() + + def test_load_corrupt_json_returns_empty(self, tmp_path: Path) -> None: + path = tmp_path / "bad.json" + path.write_text("{ not valid json") + status = load_status(path) + assert status.is_empty() + + def test_load_non_mapping_returns_empty(self, tmp_path: Path) -> None: + path = tmp_path / "list.json" + path.write_text(json.dumps([1, 2, 3])) + status = load_status(path) + assert status.is_empty() + + def test_save_and_load_completed_entry(self, tmp_path: Path) -> None: + started = datetime(2026, 4, 13, 12, 0, 0, tzinfo=UTC) + completed = datetime(2026, 4, 13, 12, 5, 0, tzinfo=UTC) + status = StatusFile.model_validate( + { + "job_001": { + "status": "completed", + "started_at": started.isoformat(), + "completed_at": completed.isoformat(), + "results_path": "/tmp/results.json", + "error": None, + } + } + ) + path = tmp_path / "status.json" + save_status(path, status) + loaded = load_status(path) + entry = loaded.root["job_001"] + assert entry.status == JobStatus.COMPLETED + assert entry.started_at == started + assert entry.completed_at == completed + assert entry.error is None + + def test_save_is_atomic(self, tmp_path: Path) -> None: + """``save_status`` leaves no orphan ``.tmp`` file on success.""" + started = datetime(2026, 4, 13, tzinfo=UTC) + status = StatusFile.model_validate( + { + "job_001": { + "status": "processing", + "started_at": started.isoformat(), + } + } + ) + path = tmp_path / "status.json" + save_status(path, status) + assert path.exists() + tmps = list(tmp_path.glob("status.json.tmp")) + assert tmps == [] + + def test_failed_entry_carries_error_message(self, tmp_path: Path) -> None: + started = datetime(2026, 4, 13, tzinfo=UTC) + status = StatusFile.model_validate( + { + "job_001": { + "status": "failed", + "started_at": started.isoformat(), + "error": "ffmpeg decode failed: codec not supported", + } + } + ) + path = tmp_path / "status.json" + save_status(path, status) + loaded = load_status(path) + entry = loaded.root["job_001"] + assert entry.status == JobStatus.FAILED + assert entry.error is not None + assert "ffmpeg" in entry.error + + def test_rejects_unknown_status(self, tmp_path: Path) -> None: + with pytest.raises(ValidationError): + StatusFile.model_validate( + { + "job_001": { + "status": "some-unknown-state", + "started_at": datetime(2026, 4, 13, tzinfo=UTC).isoformat(), + } + } + )