diff --git a/src/neuropose/interfacer.py b/src/neuropose/interfacer.py new file mode 100644 index 0000000..8af09f4 --- /dev/null +++ b/src/neuropose/interfacer.py @@ -0,0 +1,466 @@ +"""Filesystem-polling daemon for NeuroPose jobs. + +The :class:`Interfacer` watches an input directory for new job +subdirectories, dispatches each job to a supplied +:class:`neuropose.estimator.Estimator`, and persists job state to disk via +:mod:`neuropose.io`. It owns the input → output → failed directory lifecycle +and the persistent :class:`~neuropose.io.StatusFile`; it does NOT own +inference, which lives on the injected estimator. + +Key guarantees +-------------- +- **Single-instance**: an exclusive ``fcntl.flock`` on ``data_dir/.neuropose.lock`` + blocks a second daemon from running against the same data directory. The + lock is released automatically on process exit, even on SIGKILL. +- **Crash recovery**: on startup, any status entries left in + :attr:`~neuropose.io.JobStatus.PROCESSING` state (i.e. jobs that were in + flight when the previous daemon was killed) are marked failed with a + clear error message and their inputs are quarantined to ``failed_dir``. + The operator can move them back to ``input_dir`` to retry. +- **Graceful shutdown**: ``SIGINT`` and ``SIGTERM`` request an orderly stop. + The current job finishes before the loop exits; no partial writes. +- **Structured errors**: every failed job records a short + ``": "`` in the status file's ``error`` field so + operators have a grep target without needing the log file. +""" + +from __future__ import annotations + +import fcntl +import logging +import os +import signal +import time +from datetime import UTC, datetime +from pathlib import Path +from types import FrameType +from typing import TYPE_CHECKING + +from neuropose.config import Settings +from neuropose.estimator import Estimator +from neuropose.io import ( + JobResults, + JobStatus, + JobStatusEntry, + StatusFile, + load_status, + save_job_results, + save_status, +) + +if TYPE_CHECKING: + from collections.abc import Iterable + +logger = logging.getLogger(__name__) + + +VIDEO_EXTENSIONS = frozenset({".mp4", ".avi", ".mov", ".mkv"}) +"""Filename suffixes accepted as job inputs.""" + +LOCK_FILENAME = ".neuropose.lock" +"""Name of the single-instance lock file, placed under ``data_dir``.""" + + +# --------------------------------------------------------------------------- +# Exceptions +# --------------------------------------------------------------------------- + + +class InterfacerError(Exception): + """Base class for errors raised by :class:`Interfacer`.""" + + +class AlreadyRunningError(InterfacerError): + """Raised when another :class:`Interfacer` instance holds the lock file.""" + + +class JobProcessingError(InterfacerError): + """Raised for recoverable per-job failures (missing videos, decode error).""" + + +# --------------------------------------------------------------------------- +# Interfacer +# --------------------------------------------------------------------------- + + +class Interfacer: + """Job-dispatching daemon. + + Parameters + ---------- + settings + Runtime configuration. The daemon reads + ``input_dir``, ``output_dir``, ``failed_dir``, ``status_file``, + ``poll_interval_seconds``, and ``model_cache_dir``. + estimator + A ready-to-use :class:`~neuropose.estimator.Estimator`. If its + model has not yet been loaded, the daemon will call + :meth:`~neuropose.estimator.Estimator.load_model` on its + first :meth:`run` call, passing ``settings.model_cache_dir``. + """ + + def __init__(self, settings: Settings, estimator: Estimator) -> None: + self._settings = settings + self._estimator = estimator + self._stop = False + self._lock_fd: int | None = None + self._prev_sigint: object = None + self._prev_sigterm: object = None + + # -- public lifecycle --------------------------------------------------- + + def run(self) -> None: + """Run the main daemon loop until :meth:`stop` is called. + + Acquires the single-instance lock, installs signal handlers, loads + the model if needed, recovers stuck jobs, then loops calling + :meth:`run_once` until ``self._stop`` is set. On exit (normal or + exceptional) the lock and signal handlers are cleaned up. + """ + self._acquire_lock() + try: + self._install_signal_handlers() + try: + self.recover_stuck_jobs() + if not self._estimator.is_model_loaded: + logger.info("Loading estimator model before entering main loop") + self._estimator.load_model(cache_dir=self._settings.model_cache_dir) + logger.info( + "Interfacer running. Polling %s every %d seconds.", + self._settings.input_dir, + self._settings.poll_interval_seconds, + ) + while not self._stop: + try: + self.run_once() + except Exception: + logger.exception("Unexpected error in main loop; backing off") + self._interruptible_sleep(self._settings.poll_interval_seconds * 2) + continue + self._interruptible_sleep(self._settings.poll_interval_seconds) + finally: + self._restore_signal_handlers() + finally: + self._release_lock() + logger.info("Interfacer stopped.") + + def run_once(self) -> None: + """Execute exactly one poll iteration. + + Discovers new job subdirectories, processes each one in insertion + order, and returns. Primarily intended for tests and for external + schedulers that want to drive the daemon themselves. Called + internally by :meth:`run`. + """ + self._settings.ensure_dirs() + status = load_status(self._settings.status_file) + new_jobs = self._discover_new_jobs(status) + if new_jobs: + logger.info("Discovered %d new job(s): %s", len(new_jobs), new_jobs) + for job_name in new_jobs: + if self._stop: + logger.info("Stop requested; deferring remaining jobs to next run.") + break + self.process_job(job_name) + + def stop(self) -> None: + """Request a graceful shutdown of the run loop. + + Idempotent. Safe to call from a signal handler or a different + thread. The running job (if any) will finish before the loop exits. + """ + self._stop = True + + @property + def is_stopping(self) -> bool: + """Return ``True`` if :meth:`stop` has been called.""" + return self._stop + + # -- job processing ----------------------------------------------------- + + def process_job(self, job_name: str) -> JobStatusEntry: + """Run a single job from discovery through final status. + + Transitions the job's status entry ``processing → completed|failed``, + writes ``results.json`` on success, or quarantines the job inputs to + ``failed_dir`` on failure. Always persists the status file before + returning. + + Parameters + ---------- + job_name + Name of the job subdirectory under ``settings.input_dir``. + + Returns + ------- + JobStatusEntry + The final status entry as persisted. + """ + logger.info("[%s] Starting job", job_name) + started_at = datetime.now(UTC) + + status = load_status(self._settings.status_file) + status.root[job_name] = JobStatusEntry( + status=JobStatus.PROCESSING, + started_at=started_at, + ) + save_status(self._settings.status_file, status) + + try: + final_entry = self._run_job_inner(job_name, started_at) + logger.info("[%s] Completed successfully", job_name) + except Exception as exc: + logger.exception("[%s] Failed", job_name) + final_entry = JobStatusEntry( + status=JobStatus.FAILED, + started_at=started_at, + completed_at=datetime.now(UTC), + error=_format_error(exc), + ) + self._quarantine_job(job_name) + + status = load_status(self._settings.status_file) + status.root[job_name] = final_entry + save_status(self._settings.status_file, status) + return final_entry + + def recover_stuck_jobs(self) -> None: + """Mark any ``processing`` entries as failed and quarantine their inputs. + + Intended to run once on daemon startup. A job in ``processing`` + state at startup was in flight when the previous daemon instance + died, so its actual state on disk is unknown. Rather than retrying + automatically (which would re-process arbitrary amounts of work), + we mark it failed with a clear interruption message and move its + inputs to ``failed_dir`` for operator review. + """ + status = load_status(self._settings.status_file) + dirty = False + for job_name, entry in list(status.root.items()): + if entry.status != JobStatus.PROCESSING: + continue + logger.warning("[%s] Recovering stuck job from previous run", job_name) + status.root[job_name] = JobStatusEntry( + status=JobStatus.FAILED, + started_at=entry.started_at, + completed_at=datetime.now(UTC), + error="interrupted: daemon shutdown or crash before completion", + ) + self._quarantine_job(job_name) + dirty = True + if dirty: + save_status(self._settings.status_file, status) + + # -- internals: job lifecycle ------------------------------------------ + + def _run_job_inner(self, job_name: str, started_at: datetime) -> JobStatusEntry: + """Do the actual inference work for a single job. + + Raises on any failure — the caller in :meth:`process_job` handles + the transition to the failed state and the quarantine move. + """ + job_in_path = self._settings.input_dir / job_name + job_out_path = self._settings.output_dir / job_name + job_out_path.mkdir(parents=True, exist_ok=True) + + videos = sorted(_discover_videos(job_in_path)) + if not videos: + raise JobProcessingError( + f"no supported video files found in {job_in_path} " + f"(accepted extensions: {sorted(VIDEO_EXTENSIONS)})" + ) + + per_video_predictions = {} + for video_path in videos: + if self._stop: + raise JobProcessingError( + f"stop requested mid-job after processing " + f"{len(per_video_predictions)}/{len(videos)} videos" + ) + logger.info("[%s] Processing video %s", job_name, video_path.name) + result = self._estimator.process_video(video_path) + per_video_predictions[video_path.name] = result.predictions + logger.info( + "[%s] Processed %s (%d frames)", + job_name, + video_path.name, + result.frame_count, + ) + + job_results = JobResults(root=per_video_predictions) + results_path = job_out_path / "results.json" + save_job_results(results_path, job_results) + logger.info("[%s] Wrote aggregated results to %s", job_name, results_path) + + return JobStatusEntry( + status=JobStatus.COMPLETED, + started_at=started_at, + completed_at=datetime.now(UTC), + results_path=results_path, + ) + + def _discover_new_jobs(self, status: StatusFile) -> list[str]: + """Return names of job subdirectories not yet tracked in ``status``. + + Empty directories are **silently skipped** (not returned) so a user + mid-copy does not trigger a spurious "no videos" failure. A + directory containing files that are not videos IS returned here; it + will fail inside :meth:`_run_job_inner` and be quarantined, which + is the correct behaviour for genuinely broken inputs. + """ + input_dir = self._settings.input_dir + if not input_dir.exists(): + return [] + candidates = sorted( + p for p in input_dir.iterdir() if p.is_dir() and p.name not in status.root + ) + new: list[str] = [] + for path in candidates: + if _is_empty_dir(path): + logger.debug("[%s] Skipping empty job directory", path.name) + continue + new.append(path.name) + return new + + def _quarantine_job(self, job_name: str) -> None: + """Move a job's input directory to the failed directory. + + If a quarantine target with the same name already exists (e.g. + because the same job name has failed before), a numeric suffix is + appended to disambiguate. + """ + source = self._settings.input_dir / job_name + if not source.exists(): + logger.debug("[%s] Nothing to quarantine (source absent)", job_name) + return + self._settings.failed_dir.mkdir(parents=True, exist_ok=True) + dest = self._settings.failed_dir / job_name + suffix = 1 + while dest.exists(): + dest = self._settings.failed_dir / f"{job_name}.{suffix}" + suffix += 1 + try: + source.rename(dest) + except OSError: + logger.exception("[%s] Failed to quarantine inputs to %s", job_name, dest) + return + logger.info("[%s] Quarantined inputs to %s", job_name, dest) + + # -- internals: sleep / signals / lock --------------------------------- + + def _interruptible_sleep(self, seconds: float) -> None: + """Sleep up to ``seconds`` but wake early if :meth:`stop` is called.""" + if seconds <= 0: + return + deadline = time.monotonic() + seconds + while not self._stop: + remaining = deadline - time.monotonic() + if remaining <= 0: + return + time.sleep(min(0.5, remaining)) + + def _install_signal_handlers(self) -> None: + """Install SIGINT/SIGTERM handlers that set the stop flag. + + Signal handlers can only be installed from the main thread; calling + this from a non-main thread (as tests occasionally do) raises + :class:`ValueError`, which we downgrade to a warning. The stop + mechanism still works via :meth:`stop`. + """ + try: + self._prev_sigint = signal.signal(signal.SIGINT, self._handle_signal) + self._prev_sigterm = signal.signal(signal.SIGTERM, self._handle_signal) + except ValueError: + logger.warning( + "Could not install signal handlers (Interfacer.run() is not running " + "on the main thread); rely on explicit Interfacer.stop() instead." + ) + self._prev_sigint = None + self._prev_sigterm = None + + def _restore_signal_handlers(self) -> None: + """Restore the signal handlers that were in place before :meth:`run`.""" + if self._prev_sigint is not None: + try: + signal.signal(signal.SIGINT, self._prev_sigint) # type: ignore[arg-type] + except (ValueError, TypeError): + pass + if self._prev_sigterm is not None: + try: + signal.signal(signal.SIGTERM, self._prev_sigterm) # type: ignore[arg-type] + except (ValueError, TypeError): + pass + self._prev_sigint = None + self._prev_sigterm = None + + def _handle_signal(self, signum: int, frame: FrameType | None) -> None: + """Signal handler: request shutdown.""" + del frame + logger.info("Received signal %d; requesting graceful shutdown", signum) + self.stop() + + def _acquire_lock(self) -> None: + """Acquire the single-instance lock file, or raise :class:`AlreadyRunningError`.""" + self._settings.data_dir.mkdir(parents=True, exist_ok=True) + lock_path = self._settings.data_dir / LOCK_FILENAME + fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT, 0o600) + try: + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except BlockingIOError as exc: + os.close(fd) + raise AlreadyRunningError( + f"another NeuroPose daemon already holds {lock_path}" + ) from exc + # Overwrite the lock file with our PID so humans can see who owns it. + os.ftruncate(fd, 0) + os.write(fd, f"{os.getpid()}\n".encode()) + self._lock_fd = fd + logger.debug("Acquired lock %s (pid %d)", lock_path, os.getpid()) + + def _release_lock(self) -> None: + """Release the single-instance lock if held.""" + if self._lock_fd is None: + return + try: + fcntl.flock(self._lock_fd, fcntl.LOCK_UN) + except OSError: + pass + try: + os.close(self._lock_fd) + except OSError: + pass + self._lock_fd = None + + +# --------------------------------------------------------------------------- +# Module helpers +# --------------------------------------------------------------------------- + + +def _discover_videos(job_dir: Path) -> Iterable[Path]: + """Yield paths to all supported video files in ``job_dir`` (non-recursive).""" + return ( + p + for p in job_dir.iterdir() + if p.is_file() and p.suffix.lower() in VIDEO_EXTENSIONS + ) + + +def _is_empty_dir(path: Path) -> bool: + """Return ``True`` if ``path`` is a directory containing no entries.""" + try: + next(path.iterdir()) + except StopIteration: + return True + return False + + +def _format_error(exc: BaseException) -> str: + """Return a short ``"ExceptionType: message"`` string for the status file. + + Kept deliberately short so ``status.json`` does not balloon with + multi-kilobyte tracebacks. Full tracebacks still land in the log via + :meth:`logging.Logger.exception`. + """ + message = str(exc) or "" + return f"{type(exc).__name__}: {message}" diff --git a/tests/unit/test_interfacer.py b/tests/unit/test_interfacer.py new file mode 100644 index 0000000..496d3d5 --- /dev/null +++ b/tests/unit/test_interfacer.py @@ -0,0 +1,564 @@ +"""Tests for :class:`neuropose.interfacer.Interfacer`. + +Exercises the daemon's job-lifecycle and state-transition logic with an +injected fake estimator. The full fcntl lock test depends on the +behaviour that within a single process, two ``fcntl.flock`` calls on the +same file through independent file descriptors block each other. +""" + +from __future__ import annotations + +import shutil +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +import pytest + +from neuropose.config import Settings +from neuropose.estimator import Estimator +from neuropose.interfacer import ( + AlreadyRunningError, + Interfacer, + JobProcessingError, +) +from neuropose.io import ( + JobStatus, + JobStatusEntry, + StatusFile, + load_status, + save_status, +) + + +# --------------------------------------------------------------------------- +# Stubs and helpers +# --------------------------------------------------------------------------- + + +class _RaisingEstimator: + """Stub estimator whose ``process_video`` always raises.""" + + def __init__(self, exc: Exception | None = None) -> None: + self._exc = exc or RuntimeError("simulated estimator failure") + self.is_model_loaded = True + + def load_model(self, cache_dir: Path | None = None) -> None: + del cache_dir + + def process_video(self, video_path: Path) -> Any: + del video_path + raise self._exc + + +def _make_settings(tmp_path: Path) -> Settings: + """Construct a Settings object pointing at an isolated tmp_path.""" + return Settings( + data_dir=tmp_path / "jobs", + model_cache_dir=tmp_path / "models", + ) + + +def _prepare_job( + settings: Settings, + job_name: str, + videos: list[Path] | None = None, + extra_files: list[tuple[str, bytes]] | None = None, +) -> Path: + """Create ``input_dir/`` and populate it. + + Parameters + ---------- + videos + Video files to copy into the job directory. Relative filenames are + taken from the source path's ``name`` attribute. + extra_files + Additional ``(name, bytes)`` tuples to drop into the job directory. + Useful for exercising the "directory with files but no videos" + failure path. + """ + settings.ensure_dirs() + job_dir = settings.input_dir / job_name + job_dir.mkdir(parents=True, exist_ok=True) + for video in videos or []: + shutil.copy(video, job_dir / video.name) + for name, blob in extra_files or []: + (job_dir / name).write_bytes(blob) + return job_dir + + +# --------------------------------------------------------------------------- +# Construction and stop flag +# --------------------------------------------------------------------------- + + +class TestConstruction: + def test_initial_state(self, tmp_path: Path, fake_metrabs_model) -> None: + settings = _make_settings(tmp_path) + estimator = Estimator(model=fake_metrabs_model) + interfacer = Interfacer(settings, estimator) + assert not interfacer.is_stopping + + def test_stop_sets_flag(self, tmp_path: Path, fake_metrabs_model) -> None: + settings = _make_settings(tmp_path) + estimator = Estimator(model=fake_metrabs_model) + interfacer = Interfacer(settings, estimator) + interfacer.stop() + assert interfacer.is_stopping + + def test_stop_is_idempotent(self, tmp_path: Path, fake_metrabs_model) -> None: + settings = _make_settings(tmp_path) + estimator = Estimator(model=fake_metrabs_model) + interfacer = Interfacer(settings, estimator) + interfacer.stop() + interfacer.stop() + assert interfacer.is_stopping + + +# --------------------------------------------------------------------------- +# Job discovery +# --------------------------------------------------------------------------- + + +class TestDiscoverNewJobs: + def test_empty_input_dir(self, tmp_path: Path, fake_metrabs_model) -> None: + settings = _make_settings(tmp_path) + settings.ensure_dirs() + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + assert interfacer._discover_new_jobs(StatusFile(root={})) == [] + + def test_missing_input_dir(self, tmp_path: Path, fake_metrabs_model) -> None: + # data_dir not yet created; ensure_dirs has NOT been called. + settings = _make_settings(tmp_path) + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + assert interfacer._discover_new_jobs(StatusFile(root={})) == [] + + def test_skips_empty_directories_silently( + self, tmp_path: Path, fake_metrabs_model + ) -> None: + settings = _make_settings(tmp_path) + settings.ensure_dirs() + (settings.input_dir / "empty_job").mkdir() + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + assert interfacer._discover_new_jobs(StatusFile(root={})) == [] + + def test_returns_non_empty_jobs_in_sorted_order( + self, + tmp_path: Path, + synthetic_video: Path, + fake_metrabs_model, + ) -> None: + settings = _make_settings(tmp_path) + _prepare_job(settings, "job_c", videos=[synthetic_video]) + _prepare_job(settings, "job_a", videos=[synthetic_video]) + _prepare_job(settings, "job_b", videos=[synthetic_video]) + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + assert interfacer._discover_new_jobs(StatusFile(root={})) == [ + "job_a", + "job_b", + "job_c", + ] + + def test_excludes_jobs_already_in_status( + self, + tmp_path: Path, + synthetic_video: Path, + fake_metrabs_model, + ) -> None: + settings = _make_settings(tmp_path) + _prepare_job(settings, "job_a", videos=[synthetic_video]) + _prepare_job(settings, "job_b", videos=[synthetic_video]) + status = StatusFile.model_validate( + { + "job_a": { + "status": "completed", + "started_at": datetime(2026, 4, 13, tzinfo=UTC).isoformat(), + } + } + ) + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + assert interfacer._discover_new_jobs(status) == ["job_b"] + + def test_dir_with_non_video_files_is_returned( + self, tmp_path: Path, fake_metrabs_model + ) -> None: + # Dirs that contain files but no *videos* are NOT silently skipped + # — they should be returned so process_job marks them failed. + settings = _make_settings(tmp_path) + _prepare_job( + settings, + "job_a", + extra_files=[("README.txt", b"nothing to see")], + ) + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + assert interfacer._discover_new_jobs(StatusFile(root={})) == ["job_a"] + + +# --------------------------------------------------------------------------- +# process_job happy path +# --------------------------------------------------------------------------- + + +class TestProcessJobSuccess: + def test_happy_path_marks_completed( + self, + tmp_path: Path, + synthetic_video: Path, + fake_metrabs_model, + ) -> None: + settings = _make_settings(tmp_path) + _prepare_job(settings, "job_a", videos=[synthetic_video]) + estimator = Estimator(model=fake_metrabs_model) + interfacer = Interfacer(settings, estimator) + + entry = interfacer.process_job("job_a") + + assert entry.status == JobStatus.COMPLETED + assert entry.results_path is not None + assert entry.results_path.exists() + assert entry.error is None + assert entry.completed_at is not None + + def test_happy_path_persists_status( + self, + tmp_path: Path, + synthetic_video: Path, + fake_metrabs_model, + ) -> None: + settings = _make_settings(tmp_path) + _prepare_job(settings, "job_a", videos=[synthetic_video]) + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + + interfacer.process_job("job_a") + + loaded = load_status(settings.status_file) + assert "job_a" in loaded.root + assert loaded.root["job_a"].status == JobStatus.COMPLETED + + def test_happy_path_leaves_inputs_in_place( + self, + tmp_path: Path, + synthetic_video: Path, + fake_metrabs_model, + ) -> None: + # A successful job should NOT be quarantined — its inputs stay in + # input_dir. (An operator might want to rename / archive them + # separately, but that's not the daemon's job.) + settings = _make_settings(tmp_path) + _prepare_job(settings, "job_a", videos=[synthetic_video]) + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + + interfacer.process_job("job_a") + + assert (settings.input_dir / "job_a").exists() + assert not (settings.failed_dir / "job_a").exists() + + +# --------------------------------------------------------------------------- +# process_job failure paths +# --------------------------------------------------------------------------- + + +class TestProcessJobFailure: + def test_no_videos_marks_failed_and_quarantines( + self, tmp_path: Path, fake_metrabs_model + ) -> None: + settings = _make_settings(tmp_path) + _prepare_job( + settings, + "job_a", + extra_files=[("README.txt", b"nothing to see")], + ) + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + + entry = interfacer.process_job("job_a") + + assert entry.status == JobStatus.FAILED + assert entry.error is not None + assert "no supported video files" in entry.error + # Inputs moved to failed_dir. + assert not (settings.input_dir / "job_a").exists() + assert (settings.failed_dir / "job_a").exists() + assert (settings.failed_dir / "job_a" / "README.txt").exists() + + def test_estimator_exception_marks_failed_and_quarantines( + self, tmp_path: Path, synthetic_video: Path + ) -> None: + settings = _make_settings(tmp_path) + _prepare_job(settings, "job_a", videos=[synthetic_video]) + interfacer = Interfacer(settings, _RaisingEstimator()) + + entry = interfacer.process_job("job_a") + + assert entry.status == JobStatus.FAILED + assert entry.error is not None + assert "RuntimeError" in entry.error + assert "simulated estimator failure" in entry.error + assert not (settings.input_dir / "job_a").exists() + assert (settings.failed_dir / "job_a").exists() + + def test_quarantine_collision_suffixes( + self, tmp_path: Path, fake_metrabs_model + ) -> None: + settings = _make_settings(tmp_path) + settings.ensure_dirs() + # Pre-populate failed_dir with an existing entry for "job_a". + (settings.failed_dir / "job_a").mkdir() + _prepare_job( + settings, + "job_a", + extra_files=[("not_a_video.txt", b"x")], + ) + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + + interfacer.process_job("job_a") + + assert (settings.failed_dir / "job_a").exists() + assert (settings.failed_dir / "job_a.1").exists() + assert (settings.failed_dir / "job_a.1" / "not_a_video.txt").exists() + + def test_raising_estimator_error_type_is_reported( + self, tmp_path: Path, synthetic_video: Path + ) -> None: + settings = _make_settings(tmp_path) + _prepare_job(settings, "job_a", videos=[synthetic_video]) + interfacer = Interfacer( + settings, + _RaisingEstimator(exc=JobProcessingError("custom boom")), + ) + + entry = interfacer.process_job("job_a") + + assert entry.status == JobStatus.FAILED + assert entry.error is not None + assert "JobProcessingError" in entry.error + assert "custom boom" in entry.error + + +# --------------------------------------------------------------------------- +# Stuck-processing recovery +# --------------------------------------------------------------------------- + + +class TestRecoverStuckJobs: + def test_recovers_single_stuck_entry( + self, tmp_path: Path, synthetic_video: Path, fake_metrabs_model + ) -> None: + settings = _make_settings(tmp_path) + _prepare_job(settings, "job_a", videos=[synthetic_video]) + status = StatusFile.model_validate( + { + "job_a": { + "status": "processing", + "started_at": datetime(2026, 4, 13, tzinfo=UTC).isoformat(), + } + } + ) + save_status(settings.status_file, status) + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + + interfacer.recover_stuck_jobs() + + loaded = load_status(settings.status_file) + entry = loaded.root["job_a"] + assert entry.status == JobStatus.FAILED + assert entry.error is not None + assert "interrupted" in entry.error + assert entry.completed_at is not None + # Inputs were quarantined. + assert not (settings.input_dir / "job_a").exists() + assert (settings.failed_dir / "job_a").exists() + + def test_does_not_touch_completed_entries( + self, tmp_path: Path, fake_metrabs_model + ) -> None: + settings = _make_settings(tmp_path) + settings.ensure_dirs() + completed = datetime(2026, 4, 13, 10, 0, 0, tzinfo=UTC) + status = StatusFile.model_validate( + { + "job_a": { + "status": "completed", + "started_at": completed.isoformat(), + "completed_at": completed.isoformat(), + }, + "job_b": { + "status": "failed", + "started_at": completed.isoformat(), + "completed_at": completed.isoformat(), + "error": "old failure", + }, + } + ) + save_status(settings.status_file, status) + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + + interfacer.recover_stuck_jobs() + + loaded = load_status(settings.status_file) + assert loaded.root["job_a"].status == JobStatus.COMPLETED + assert loaded.root["job_b"].status == JobStatus.FAILED + assert loaded.root["job_b"].error == "old failure" + + def test_no_status_file_is_noop( + self, tmp_path: Path, fake_metrabs_model + ) -> None: + settings = _make_settings(tmp_path) + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + # Must not raise even though status_file does not exist. + interfacer.recover_stuck_jobs() + + +# --------------------------------------------------------------------------- +# run_once +# --------------------------------------------------------------------------- + + +class TestRunOnce: + def test_no_jobs_is_noop(self, tmp_path: Path, fake_metrabs_model) -> None: + settings = _make_settings(tmp_path) + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + interfacer.run_once() + loaded = load_status(settings.status_file) + assert loaded.is_empty() + + def test_processes_all_new_jobs( + self, + tmp_path: Path, + synthetic_video: Path, + fake_metrabs_model, + ) -> None: + settings = _make_settings(tmp_path) + _prepare_job(settings, "job_a", videos=[synthetic_video]) + _prepare_job(settings, "job_b", videos=[synthetic_video]) + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + + interfacer.run_once() + + loaded = load_status(settings.status_file) + assert loaded.root["job_a"].status == JobStatus.COMPLETED + assert loaded.root["job_b"].status == JobStatus.COMPLETED + + def test_stop_between_jobs_defers_remaining( + self, + tmp_path: Path, + synthetic_video: Path, + fake_metrabs_model, + ) -> None: + settings = _make_settings(tmp_path) + _prepare_job(settings, "job_a", videos=[synthetic_video]) + _prepare_job(settings, "job_b", videos=[synthetic_video]) + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + + # Override process_job so the first call flips the stop flag + # before returning. The loop should then break before job_b. + original = interfacer.process_job + call_log: list[str] = [] + + def recording_process_job(job_name: str) -> JobStatusEntry: + call_log.append(job_name) + result = original(job_name) + interfacer.stop() + return result + + interfacer.process_job = recording_process_job # type: ignore[method-assign] + interfacer.run_once() + + assert call_log == ["job_a"] + loaded = load_status(settings.status_file) + assert "job_a" in loaded.root + assert "job_b" not in loaded.root + + +# --------------------------------------------------------------------------- +# Single-instance lock +# --------------------------------------------------------------------------- + + +class TestLock: + def test_first_acquire_succeeds( + self, tmp_path: Path, fake_metrabs_model + ) -> None: + settings = _make_settings(tmp_path) + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + try: + interfacer._acquire_lock() + assert interfacer._lock_fd is not None + finally: + interfacer._release_lock() + + def test_second_acquire_raises_already_running( + self, tmp_path: Path, fake_metrabs_model + ) -> None: + settings = _make_settings(tmp_path) + first = Interfacer(settings, Estimator(model=fake_metrabs_model)) + second = Interfacer(settings, Estimator(model=fake_metrabs_model)) + first._acquire_lock() + try: + with pytest.raises(AlreadyRunningError): + second._acquire_lock() + finally: + first._release_lock() + + def test_release_allows_subsequent_acquire( + self, tmp_path: Path, fake_metrabs_model + ) -> None: + settings = _make_settings(tmp_path) + first = Interfacer(settings, Estimator(model=fake_metrabs_model)) + first._acquire_lock() + first._release_lock() + + second = Interfacer(settings, Estimator(model=fake_metrabs_model)) + try: + second._acquire_lock() # Should succeed after release. + finally: + second._release_lock() + + def test_lock_file_contains_pid( + self, tmp_path: Path, fake_metrabs_model + ) -> None: + settings = _make_settings(tmp_path) + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + try: + interfacer._acquire_lock() + lock_path = settings.data_dir / ".neuropose.lock" + content = lock_path.read_text().strip() + import os + + assert content == str(os.getpid()) + finally: + interfacer._release_lock() + + +# --------------------------------------------------------------------------- +# Interruptible sleep +# --------------------------------------------------------------------------- + + +class TestInterruptibleSleep: + def test_zero_returns_immediately( + self, tmp_path: Path, fake_metrabs_model + ) -> None: + settings = _make_settings(tmp_path) + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + import time + + start = time.monotonic() + interfacer._interruptible_sleep(0) + elapsed = time.monotonic() - start + assert elapsed < 0.1 + + def test_stop_flag_wakes_sleep_early( + self, tmp_path: Path, fake_metrabs_model + ) -> None: + settings = _make_settings(tmp_path) + interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model)) + interfacer.stop() + import time + + start = time.monotonic() + interfacer._interruptible_sleep(5.0) + elapsed = time.monotonic() - start + # With stop flag already set, the sleep should return in well under + # the 5-second nominal window. + assert elapsed < 1.0