i/o scaffold

This commit is contained in:
Levi Neuwirth 2026-04-13 12:16:53 -04:00
parent 5b722d3c6a
commit 3d2b2fc68d
7 changed files with 795 additions and 14 deletions

View File

@ -93,18 +93,5 @@ jobs:
- name: Install project + dev dependencies
run: uv sync --group dev
# Tolerates exit code 5 ("no tests collected") because the tests
# directory does not exist yet at commit 3. Commit 4 lands the first
# tests and this tolerance block should be removed at that point.
- name: Pytest
shell: bash
run: |
set +e
uv run pytest --tb=short
ec=$?
set -e
if [ "$ec" -eq 5 ]; then
echo "::warning::pytest collected no tests. Remove this tolerance after commit 4 lands real tests."
exit 0
fi
exit "$ec"
run: uv run pytest --tb=short

View File

@ -132,6 +132,17 @@ known-first-party = ["neuropose"]
[tool.ruff.format]
docstring-code-format = true
# ---------------------------------------------------------------------------
# Pytest.
# ---------------------------------------------------------------------------
[tool.pytest.ini_options]
testpaths = ["tests"]
pythonpath = ["src"]
addopts = ["--strict-markers", "--strict-config", "-ra"]
markers = [
"slow: tests that require model download or are otherwise slow (deselect with '-m \"not slow\"')",
]
# ---------------------------------------------------------------------------
# Pyright. "standard" rather than "strict" because the TensorFlow / OpenCV /
# scikit-learn stubs would generate thousands of unknown-type warnings under

162
src/neuropose/config.py Normal file
View File

@ -0,0 +1,162 @@
"""Runtime configuration for NeuroPose.
Central settings model built on pydantic-settings. Configuration can be
supplied by, in order of decreasing precedence:
1. Keyword arguments passed directly to ``Settings(...)``.
2. Environment variables prefixed with ``NEUROPOSE_`` (e.g.
``NEUROPOSE_DEVICE="/GPU:0"``).
3. A YAML file loaded explicitly via :meth:`Settings.from_yaml`.
4. Field defaults.
There is intentionally no implicit config-file discovery. The daemon must be
pointed at a config file explicitly via the CLI ``--config`` flag. This
avoids the relative-path footgun from the previous prototype, where the
daemon only worked when launched from a specific working directory.
"""
from __future__ import annotations
import os
import re
from pathlib import Path
from typing import Any
import yaml
from pydantic import Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
_DEVICE_PATTERN = re.compile(r"^/(CPU|GPU):\d+$")
def _xdg_data_home() -> Path:
"""Return ``$XDG_DATA_HOME``, falling back to ``~/.local/share``."""
env = os.environ.get("XDG_DATA_HOME")
if env:
return Path(env)
return Path.home() / ".local" / "share"
def _default_data_dir() -> Path:
"""Return the default runtime data directory (under XDG)."""
return _xdg_data_home() / "neuropose" / "jobs"
def _default_model_cache_dir() -> Path:
"""Return the default MeTRAbs model cache directory (under XDG)."""
return _xdg_data_home() / "neuropose" / "models"
class Settings(BaseSettings):
"""NeuroPose runtime configuration.
Parameters
----------
data_dir
Base directory that holds ``in/``, ``out/``, and ``failed/``
subdirectories for job processing. Defaults to a location under
``$XDG_DATA_HOME`` so runtime data never lives inside the repository.
model_cache_dir
Directory where the MeTRAbs model is downloaded and cached.
poll_interval_seconds
Interval between filesystem scans performed by the interfacer daemon.
device
TensorFlow device string, e.g. ``"/CPU:0"`` or ``"/GPU:0"``.
default_fov_degrees
Default horizontal field of view passed to MeTRAbs when a video does
not supply camera intrinsics. The MeTRAbs upstream default is 55°.
"""
model_config = SettingsConfigDict(
env_prefix="NEUROPOSE_",
env_nested_delimiter="__",
extra="forbid",
)
data_dir: Path = Field(default_factory=_default_data_dir)
model_cache_dir: Path = Field(default_factory=_default_model_cache_dir)
poll_interval_seconds: int = Field(default=10, ge=1)
device: str = Field(default="/CPU:0")
default_fov_degrees: float = Field(default=55.0, gt=0.0, lt=180.0)
@field_validator("device")
@classmethod
def _validate_device(cls, value: str) -> str:
if not _DEVICE_PATTERN.match(value):
raise ValueError(
f"device must match '/(CPU|GPU):<index>' "
f"(e.g. '/CPU:0', '/GPU:0'); got {value!r}"
)
return value
@property
def input_dir(self) -> Path:
"""Return the directory containing job subdirectories to be processed."""
return self.data_dir / "in"
@property
def output_dir(self) -> Path:
"""Return the directory where completed job results are written."""
return self.data_dir / "out"
@property
def failed_dir(self) -> Path:
"""Return the directory where inputs are quarantined after catastrophic failure."""
return self.data_dir / "failed"
@property
def status_file(self) -> Path:
"""Return the path of the persistent job-status JSON file."""
return self.output_dir / "status.json"
@classmethod
def from_yaml(cls, path: Path) -> Settings:
"""Load settings from a YAML file.
Parameters
----------
path
Path to a YAML configuration file. The file must be a mapping of
field names to values; unknown fields are rejected.
Returns
-------
Settings
A validated settings instance.
Raises
------
FileNotFoundError
If ``path`` does not exist.
ValueError
If the file is not a YAML mapping.
pydantic.ValidationError
If field validation fails.
"""
if not path.exists():
raise FileNotFoundError(f"config file not found: {path}")
with path.open("r", encoding="utf-8") as f:
data: Any = yaml.safe_load(f)
if data is None:
data = {}
if not isinstance(data, dict):
raise ValueError(
f"config file must contain a YAML mapping; got {type(data).__name__}"
)
return cls(**data)
def ensure_dirs(self) -> None:
"""Create all runtime directories if they do not already exist.
Called by the interfacer daemon on startup. Kept as an explicit method
rather than a side effect of construction so that ``Settings()`` is
safe to call in tests without touching the filesystem.
"""
for path in (
self.data_dir,
self.input_dir,
self.output_dir,
self.failed_dir,
self.model_cache_dir,
):
path.mkdir(parents=True, exist_ok=True)

199
src/neuropose/io.py Normal file
View File

@ -0,0 +1,199 @@
"""I/O helpers and schema definitions for NeuroPose prediction data.
Defines pydantic models for per-frame predictions, per-video aggregated
predictions, job-level aggregated results, and the persistent status file.
All models are validated on load, so malformed files are caught at the
boundary rather than at some downstream call site.
Atomicity: :func:`save_status` and :func:`save_job_results` write to a sibling
temp file and then atomically rename, so a crash mid-write will not leave a
partially-written file behind. This matches the crash-resilience guarantee
the interfacer daemon makes to callers.
"""
from __future__ import annotations
import json
from collections.abc import Iterator
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any
from pydantic import BaseModel, ConfigDict, Field, RootModel
class JobStatus(str, Enum):
"""Lifecycle state of a single processing job."""
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
class FramePrediction(BaseModel):
"""Pose estimation output for a single video frame.
Each inner list corresponds to one detected person. Coordinate units
follow MeTRAbs conventions: ``poses3d`` in millimetres, ``poses2d`` in
pixels, ``boxes`` as ``[x, y, width, height, confidence]`` in pixels.
Frozen (immutable) to prevent the in-place coordinate-swap aliasing bug
that affected the previous prototype's visualizer.
"""
model_config = ConfigDict(extra="forbid", frozen=True)
boxes: list[list[float]] = Field(
description="Per-detection bounding boxes as [x, y, width, height, confidence]."
)
poses3d: list[list[list[float]]] = Field(
description="Per-detection 3D joint positions in millimetres."
)
poses2d: list[list[list[float]]] = Field(
description="Per-detection 2D joint positions in pixels."
)
class VideoPredictions(RootModel[dict[str, FramePrediction]]):
"""Per-frame predictions for a single video, keyed by frame filename.
Frame names are expected to follow the ``frame_<index>.png`` convention
written by the estimator, but no constraint is enforced at the schema
level so downstream consumers can key by any naming scheme.
"""
def frames(self) -> list[str]:
"""Return the frame names in insertion order."""
return list(self.root.keys())
def __len__(self) -> int:
return len(self.root)
def __iter__(self) -> Iterator[str]: # type: ignore[override]
return iter(self.root)
def __getitem__(self, key: str) -> FramePrediction:
return self.root[key]
class JobResults(RootModel[dict[str, VideoPredictions]]):
"""Aggregated predictions for an entire job, keyed by video filename.
This is the shape of the top-level ``results.json`` written by the
interfacer daemon: one entry per video in the job directory.
"""
def videos(self) -> list[str]:
"""Return the video names in insertion order."""
return list(self.root.keys())
def __len__(self) -> int:
return len(self.root)
def __iter__(self) -> Iterator[str]: # type: ignore[override]
return iter(self.root)
def __getitem__(self, key: str) -> VideoPredictions:
return self.root[key]
class JobStatusEntry(BaseModel):
"""Status entry for a single job in the persistent status file."""
model_config = ConfigDict(extra="forbid")
status: JobStatus
started_at: datetime
completed_at: datetime | None = None
results_path: Path | None = None
error: str | None = Field(
default=None,
description=(
"Short human-readable reason if status == failed. "
"Populated by the interfacer on failure paths."
),
)
class StatusFile(RootModel[dict[str, JobStatusEntry]]):
"""Mapping of job name to its status entry."""
def is_empty(self) -> bool:
"""Return ``True`` if the status file contains no entries."""
return len(self.root) == 0
def __len__(self) -> int:
return len(self.root)
def __iter__(self) -> Iterator[str]: # type: ignore[override]
return iter(self.root)
# ---------------------------------------------------------------------------
# Load / save helpers
# ---------------------------------------------------------------------------
def load_video_predictions(path: Path) -> VideoPredictions:
"""Load and validate a per-video predictions JSON file."""
with path.open("r", encoding="utf-8") as f:
data: Any = json.load(f)
return VideoPredictions.model_validate(data)
def save_video_predictions(path: Path, predictions: VideoPredictions) -> None:
"""Serialize per-video predictions to a JSON file."""
path.parent.mkdir(parents=True, exist_ok=True)
_write_json_atomic(path, predictions.model_dump(mode="json"))
def load_job_results(path: Path) -> JobResults:
"""Load and validate an aggregated per-job results JSON file."""
with path.open("r", encoding="utf-8") as f:
data: Any = json.load(f)
return JobResults.model_validate(data)
def save_job_results(path: Path, results: JobResults) -> None:
"""Serialize aggregated job results to a JSON file atomically."""
path.parent.mkdir(parents=True, exist_ok=True)
_write_json_atomic(path, results.model_dump(mode="json"))
def load_status(path: Path) -> StatusFile:
"""Load the persistent job status file.
Returns an empty :class:`StatusFile` if the file is missing, is not valid
JSON, or does not contain a JSON mapping. This preserves the
crash-resilient behaviour the daemon relies on: a missing or corrupted
status file is treated as a clean slate rather than a fatal error.
"""
if not path.exists():
return StatusFile(root={})
try:
with path.open("r", encoding="utf-8") as f:
data: Any = json.load(f)
except json.JSONDecodeError:
return StatusFile(root={})
if not isinstance(data, dict):
return StatusFile(root={})
return StatusFile.model_validate(data)
def save_status(path: Path, status: StatusFile) -> None:
"""Persist the job status file atomically."""
path.parent.mkdir(parents=True, exist_ok=True)
_write_json_atomic(path, status.model_dump(mode="json"))
def _write_json_atomic(path: Path, payload: Any) -> None:
"""Write ``payload`` to ``path`` as JSON, atomically.
Writes to a sibling ``<path>.tmp`` first, then atomically renames over
``path`` so a crash mid-write cannot leave behind a truncated file.
"""
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as f:
json.dump(payload, f, indent=2)
tmp.replace(path)

38
tests/conftest.py Normal file
View File

@ -0,0 +1,38 @@
"""Shared pytest configuration and fixtures for the NeuroPose test suite."""
from __future__ import annotations
import os
from collections.abc import Iterator
from pathlib import Path
import pytest
@pytest.fixture(autouse=True)
def _isolate_environment(
monkeypatch: pytest.MonkeyPatch,
tmp_path_factory: pytest.TempPathFactory,
) -> Iterator[None]:
"""Isolate every test from the developer's real home directory.
Points ``$HOME`` and ``$XDG_DATA_HOME`` at per-test temp directories so
that any code path that uses the default ``Settings()`` (which reaches
into ``~/.local/share/neuropose``) cannot accidentally write to the real
machine. Also clears any ``NEUROPOSE_*`` environment variables that may
be set in the developer's shell, so test behaviour does not depend on
who is running the test suite.
"""
isolated = tmp_path_factory.mktemp("neuropose_env_isolation")
monkeypatch.setenv("HOME", str(isolated))
monkeypatch.setenv("XDG_DATA_HOME", str(isolated / "xdg"))
for key in list(os.environ):
if key.startswith("NEUROPOSE_"):
monkeypatch.delenv(key, raising=False)
yield
@pytest.fixture
def xdg_home() -> Path:
"""Return the isolated ``$XDG_DATA_HOME`` set up by ``_isolate_environment``."""
return Path(os.environ["XDG_DATA_HOME"])

178
tests/unit/test_config.py Normal file
View File

@ -0,0 +1,178 @@
"""Tests for :mod:`neuropose.config`."""
from __future__ import annotations
from pathlib import Path
import pytest
import yaml
from pydantic import ValidationError
from neuropose.config import Settings
class TestDefaults:
"""Default values wire through XDG correctly."""
def test_data_dir_uses_xdg_data_home(self, xdg_home: Path) -> None:
settings = Settings()
assert settings.data_dir == xdg_home / "neuropose" / "jobs"
def test_model_cache_dir_uses_xdg_data_home(self, xdg_home: Path) -> None:
settings = Settings()
assert settings.model_cache_dir == xdg_home / "neuropose" / "models"
def test_derived_directories(self, xdg_home: Path) -> None:
settings = Settings()
assert settings.input_dir == settings.data_dir / "in"
assert settings.output_dir == settings.data_dir / "out"
assert settings.failed_dir == settings.data_dir / "failed"
assert settings.status_file == settings.output_dir / "status.json"
def test_fallback_when_xdg_unset(
self,
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
monkeypatch.delenv("XDG_DATA_HOME", raising=False)
monkeypatch.setenv("HOME", str(tmp_path))
settings = Settings()
assert settings.data_dir == tmp_path / ".local" / "share" / "neuropose" / "jobs"
def test_default_scalars(self, xdg_home: Path) -> None:
settings = Settings()
assert settings.poll_interval_seconds == 10
assert settings.device == "/CPU:0"
assert settings.default_fov_degrees == pytest.approx(55.0)
class TestValidation:
"""Field validators reject malformed input."""
@pytest.mark.parametrize("device", ["/CPU:0", "/GPU:0", "/CPU:1", "/GPU:3"])
def test_device_accepts_valid(self, device: str, xdg_home: Path) -> None:
settings = Settings(device=device)
assert settings.device == device
@pytest.mark.parametrize("device", ["cpu", "/cpu:0", "GPU:0", "/TPU:0", "", "/GPU"])
def test_device_rejects_invalid(self, device: str, xdg_home: Path) -> None:
with pytest.raises(ValidationError):
Settings(device=device)
def test_poll_interval_rejects_zero(self, xdg_home: Path) -> None:
with pytest.raises(ValidationError):
Settings(poll_interval_seconds=0)
def test_poll_interval_rejects_negative(self, xdg_home: Path) -> None:
with pytest.raises(ValidationError):
Settings(poll_interval_seconds=-5)
def test_fov_rejects_zero(self, xdg_home: Path) -> None:
with pytest.raises(ValidationError):
Settings(default_fov_degrees=0)
def test_fov_rejects_at_limit(self, xdg_home: Path) -> None:
with pytest.raises(ValidationError):
Settings(default_fov_degrees=180)
def test_extra_fields_rejected(self, xdg_home: Path) -> None:
with pytest.raises(ValidationError):
Settings(nonexistent_field=True)
class TestYamlLoad:
"""``Settings.from_yaml`` behaves correctly for valid and malformed files."""
def test_valid(self, tmp_path: Path, xdg_home: Path) -> None:
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump({"device": "/GPU:0", "poll_interval_seconds": 30})
)
settings = Settings.from_yaml(config_path)
assert settings.device == "/GPU:0"
assert settings.poll_interval_seconds == 30
def test_missing_file_raises(self, tmp_path: Path) -> None:
with pytest.raises(FileNotFoundError):
Settings.from_yaml(tmp_path / "nope.yaml")
def test_empty_file_uses_defaults(self, tmp_path: Path, xdg_home: Path) -> None:
config_path = tmp_path / "config.yaml"
config_path.write_text("")
settings = Settings.from_yaml(config_path)
assert settings.poll_interval_seconds == 10
def test_non_mapping_rejected(self, tmp_path: Path) -> None:
config_path = tmp_path / "config.yaml"
config_path.write_text("- item1\n- item2\n")
with pytest.raises(ValueError, match="YAML mapping"):
Settings.from_yaml(config_path)
def test_invalid_field_rejected(self, tmp_path: Path, xdg_home: Path) -> None:
config_path = tmp_path / "config.yaml"
config_path.write_text(yaml.safe_dump({"device": "cpu"}))
with pytest.raises(ValidationError):
Settings.from_yaml(config_path)
class TestEnvironmentOverrides:
"""``NEUROPOSE_*`` env vars override field defaults."""
def test_device_override(
self,
xdg_home: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setenv("NEUROPOSE_DEVICE", "/GPU:0")
settings = Settings()
assert settings.device == "/GPU:0"
def test_poll_interval_override(
self,
xdg_home: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setenv("NEUROPOSE_POLL_INTERVAL_SECONDS", "60")
settings = Settings()
assert settings.poll_interval_seconds == 60
def test_kwargs_beat_env_vars(
self,
xdg_home: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setenv("NEUROPOSE_DEVICE", "/GPU:0")
settings = Settings(device="/CPU:0")
assert settings.device == "/CPU:0"
class TestEnsureDirs:
"""``ensure_dirs`` creates all directories idempotently."""
def test_creates_all_directories(self, tmp_path: Path) -> None:
settings = Settings(
data_dir=tmp_path / "jobs",
model_cache_dir=tmp_path / "models",
)
settings.ensure_dirs()
assert settings.data_dir.is_dir()
assert settings.input_dir.is_dir()
assert settings.output_dir.is_dir()
assert settings.failed_dir.is_dir()
assert settings.model_cache_dir.is_dir()
def test_idempotent(self, tmp_path: Path) -> None:
settings = Settings(
data_dir=tmp_path / "jobs",
model_cache_dir=tmp_path / "models",
)
settings.ensure_dirs()
settings.ensure_dirs()
assert settings.data_dir.is_dir()
def test_construction_has_no_filesystem_side_effects(self, tmp_path: Path) -> None:
# Creating Settings() must NOT touch the filesystem.
target = tmp_path / "jobs"
assert not target.exists()
_ = Settings(data_dir=target, model_cache_dir=tmp_path / "models")
assert not target.exists()

206
tests/unit/test_io.py Normal file
View File

@ -0,0 +1,206 @@
"""Tests for :mod:`neuropose.io` schema and helpers."""
from __future__ import annotations
import json
from datetime import UTC, datetime
from pathlib import Path
import pytest
from pydantic import ValidationError
from neuropose.io import (
FramePrediction,
JobResults,
JobStatus,
StatusFile,
VideoPredictions,
load_job_results,
load_status,
load_video_predictions,
save_job_results,
save_status,
save_video_predictions,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def one_frame() -> dict:
"""A minimal valid FramePrediction payload (one person, two joints)."""
return {
"boxes": [[10.0, 20.0, 100.0, 200.0, 0.95]],
"poses3d": [[[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]],
"poses2d": [[[10.0, 20.0], [30.0, 40.0]]],
}
@pytest.fixture
def video_payload(one_frame: dict) -> dict:
return {
"frame_0000.png": one_frame,
"frame_0001.png": one_frame,
}
# ---------------------------------------------------------------------------
# FramePrediction
# ---------------------------------------------------------------------------
class TestFramePrediction:
def test_roundtrip(self, one_frame: dict) -> None:
frame = FramePrediction.model_validate(one_frame)
assert frame.boxes == one_frame["boxes"]
assert frame.poses3d == one_frame["poses3d"]
assert frame.poses2d == one_frame["poses2d"]
def test_rejects_extra_fields(self, one_frame: dict) -> None:
one_frame["extra"] = "bogus"
with pytest.raises(ValidationError):
FramePrediction.model_validate(one_frame)
def test_is_frozen(self, one_frame: dict) -> None:
frame = FramePrediction.model_validate(one_frame)
with pytest.raises(ValidationError):
frame.boxes = []
# ---------------------------------------------------------------------------
# VideoPredictions
# ---------------------------------------------------------------------------
class TestVideoPredictions:
def test_from_dict(self, video_payload: dict) -> None:
vp = VideoPredictions.model_validate(video_payload)
assert len(vp) == 2
assert vp.frames() == ["frame_0000.png", "frame_0001.png"]
assert vp["frame_0000.png"].boxes[0][4] == pytest.approx(0.95)
def test_iteration(self, video_payload: dict) -> None:
vp = VideoPredictions.model_validate(video_payload)
assert list(vp) == ["frame_0000.png", "frame_0001.png"]
def test_save_and_load_roundtrip(self, tmp_path: Path, video_payload: dict) -> None:
vp = VideoPredictions.model_validate(video_payload)
path = tmp_path / "preds" / "video.json"
save_video_predictions(path, vp)
assert path.exists()
loaded = load_video_predictions(path)
assert loaded.frames() == vp.frames()
assert loaded["frame_0000.png"].poses3d == vp["frame_0000.png"].poses3d
# ---------------------------------------------------------------------------
# JobResults
# ---------------------------------------------------------------------------
class TestJobResults:
def test_save_and_load_roundtrip(self, tmp_path: Path, video_payload: dict) -> None:
jr = JobResults.model_validate(
{"video_a.mp4": video_payload, "video_b.mp4": video_payload}
)
path = tmp_path / "results.json"
save_job_results(path, jr)
loaded = load_job_results(path)
assert loaded.videos() == ["video_a.mp4", "video_b.mp4"]
assert len(loaded["video_a.mp4"]) == 2
# ---------------------------------------------------------------------------
# Status file
# ---------------------------------------------------------------------------
class TestStatusFile:
def test_load_missing_returns_empty(self, tmp_path: Path) -> None:
status = load_status(tmp_path / "nope.json")
assert status.is_empty()
def test_load_corrupt_json_returns_empty(self, tmp_path: Path) -> None:
path = tmp_path / "bad.json"
path.write_text("{ not valid json")
status = load_status(path)
assert status.is_empty()
def test_load_non_mapping_returns_empty(self, tmp_path: Path) -> None:
path = tmp_path / "list.json"
path.write_text(json.dumps([1, 2, 3]))
status = load_status(path)
assert status.is_empty()
def test_save_and_load_completed_entry(self, tmp_path: Path) -> None:
started = datetime(2026, 4, 13, 12, 0, 0, tzinfo=UTC)
completed = datetime(2026, 4, 13, 12, 5, 0, tzinfo=UTC)
status = StatusFile.model_validate(
{
"job_001": {
"status": "completed",
"started_at": started.isoformat(),
"completed_at": completed.isoformat(),
"results_path": "/tmp/results.json",
"error": None,
}
}
)
path = tmp_path / "status.json"
save_status(path, status)
loaded = load_status(path)
entry = loaded.root["job_001"]
assert entry.status == JobStatus.COMPLETED
assert entry.started_at == started
assert entry.completed_at == completed
assert entry.error is None
def test_save_is_atomic(self, tmp_path: Path) -> None:
"""``save_status`` leaves no orphan ``.tmp`` file on success."""
started = datetime(2026, 4, 13, tzinfo=UTC)
status = StatusFile.model_validate(
{
"job_001": {
"status": "processing",
"started_at": started.isoformat(),
}
}
)
path = tmp_path / "status.json"
save_status(path, status)
assert path.exists()
tmps = list(tmp_path.glob("status.json.tmp"))
assert tmps == []
def test_failed_entry_carries_error_message(self, tmp_path: Path) -> None:
started = datetime(2026, 4, 13, tzinfo=UTC)
status = StatusFile.model_validate(
{
"job_001": {
"status": "failed",
"started_at": started.isoformat(),
"error": "ffmpeg decode failed: codec not supported",
}
}
)
path = tmp_path / "status.json"
save_status(path, status)
loaded = load_status(path)
entry = loaded.root["job_001"]
assert entry.status == JobStatus.FAILED
assert entry.error is not None
assert "ffmpeg" in entry.error
def test_rejects_unknown_status(self, tmp_path: Path) -> None:
with pytest.raises(ValidationError):
StatusFile.model_validate(
{
"job_001": {
"status": "some-unknown-state",
"started_at": datetime(2026, 4, 13, tzinfo=UTC).isoformat(),
}
}
)