interfacer

This commit is contained in:
Levi Neuwirth 2026-04-13 16:01:46 -04:00
parent 9bbbbd0d52
commit 27464f681a
2 changed files with 1030 additions and 0 deletions

466
src/neuropose/interfacer.py Normal file
View File

@ -0,0 +1,466 @@
"""Filesystem-polling daemon for NeuroPose jobs.
The :class:`Interfacer` watches an input directory for new job
subdirectories, dispatches each job to a supplied
:class:`neuropose.estimator.Estimator`, and persists job state to disk via
:mod:`neuropose.io`. It owns the input output failed directory lifecycle
and the persistent :class:`~neuropose.io.StatusFile`; it does NOT own
inference, which lives on the injected estimator.
Key guarantees
--------------
- **Single-instance**: an exclusive ``fcntl.flock`` on ``data_dir/.neuropose.lock``
blocks a second daemon from running against the same data directory. The
lock is released automatically on process exit, even on SIGKILL.
- **Crash recovery**: on startup, any status entries left in
:attr:`~neuropose.io.JobStatus.PROCESSING` state (i.e. jobs that were in
flight when the previous daemon was killed) are marked failed with a
clear error message and their inputs are quarantined to ``failed_dir``.
The operator can move them back to ``input_dir`` to retry.
- **Graceful shutdown**: ``SIGINT`` and ``SIGTERM`` request an orderly stop.
The current job finishes before the loop exits; no partial writes.
- **Structured errors**: every failed job records a short
``"<ExceptionType>: <message>"`` in the status file's ``error`` field so
operators have a grep target without needing the log file.
"""
from __future__ import annotations
import fcntl
import logging
import os
import signal
import time
from datetime import UTC, datetime
from pathlib import Path
from types import FrameType
from typing import TYPE_CHECKING
from neuropose.config import Settings
from neuropose.estimator import Estimator
from neuropose.io import (
JobResults,
JobStatus,
JobStatusEntry,
StatusFile,
load_status,
save_job_results,
save_status,
)
if TYPE_CHECKING:
from collections.abc import Iterable
logger = logging.getLogger(__name__)
VIDEO_EXTENSIONS = frozenset({".mp4", ".avi", ".mov", ".mkv"})
"""Filename suffixes accepted as job inputs."""
LOCK_FILENAME = ".neuropose.lock"
"""Name of the single-instance lock file, placed under ``data_dir``."""
# ---------------------------------------------------------------------------
# Exceptions
# ---------------------------------------------------------------------------
class InterfacerError(Exception):
"""Base class for errors raised by :class:`Interfacer`."""
class AlreadyRunningError(InterfacerError):
"""Raised when another :class:`Interfacer` instance holds the lock file."""
class JobProcessingError(InterfacerError):
"""Raised for recoverable per-job failures (missing videos, decode error)."""
# ---------------------------------------------------------------------------
# Interfacer
# ---------------------------------------------------------------------------
class Interfacer:
"""Job-dispatching daemon.
Parameters
----------
settings
Runtime configuration. The daemon reads
``input_dir``, ``output_dir``, ``failed_dir``, ``status_file``,
``poll_interval_seconds``, and ``model_cache_dir``.
estimator
A ready-to-use :class:`~neuropose.estimator.Estimator`. If its
model has not yet been loaded, the daemon will call
:meth:`~neuropose.estimator.Estimator.load_model` on its
first :meth:`run` call, passing ``settings.model_cache_dir``.
"""
def __init__(self, settings: Settings, estimator: Estimator) -> None:
self._settings = settings
self._estimator = estimator
self._stop = False
self._lock_fd: int | None = None
self._prev_sigint: object = None
self._prev_sigterm: object = None
# -- public lifecycle ---------------------------------------------------
def run(self) -> None:
"""Run the main daemon loop until :meth:`stop` is called.
Acquires the single-instance lock, installs signal handlers, loads
the model if needed, recovers stuck jobs, then loops calling
:meth:`run_once` until ``self._stop`` is set. On exit (normal or
exceptional) the lock and signal handlers are cleaned up.
"""
self._acquire_lock()
try:
self._install_signal_handlers()
try:
self.recover_stuck_jobs()
if not self._estimator.is_model_loaded:
logger.info("Loading estimator model before entering main loop")
self._estimator.load_model(cache_dir=self._settings.model_cache_dir)
logger.info(
"Interfacer running. Polling %s every %d seconds.",
self._settings.input_dir,
self._settings.poll_interval_seconds,
)
while not self._stop:
try:
self.run_once()
except Exception:
logger.exception("Unexpected error in main loop; backing off")
self._interruptible_sleep(self._settings.poll_interval_seconds * 2)
continue
self._interruptible_sleep(self._settings.poll_interval_seconds)
finally:
self._restore_signal_handlers()
finally:
self._release_lock()
logger.info("Interfacer stopped.")
def run_once(self) -> None:
"""Execute exactly one poll iteration.
Discovers new job subdirectories, processes each one in insertion
order, and returns. Primarily intended for tests and for external
schedulers that want to drive the daemon themselves. Called
internally by :meth:`run`.
"""
self._settings.ensure_dirs()
status = load_status(self._settings.status_file)
new_jobs = self._discover_new_jobs(status)
if new_jobs:
logger.info("Discovered %d new job(s): %s", len(new_jobs), new_jobs)
for job_name in new_jobs:
if self._stop:
logger.info("Stop requested; deferring remaining jobs to next run.")
break
self.process_job(job_name)
def stop(self) -> None:
"""Request a graceful shutdown of the run loop.
Idempotent. Safe to call from a signal handler or a different
thread. The running job (if any) will finish before the loop exits.
"""
self._stop = True
@property
def is_stopping(self) -> bool:
"""Return ``True`` if :meth:`stop` has been called."""
return self._stop
# -- job processing -----------------------------------------------------
def process_job(self, job_name: str) -> JobStatusEntry:
"""Run a single job from discovery through final status.
Transitions the job's status entry ``processing → completed|failed``,
writes ``results.json`` on success, or quarantines the job inputs to
``failed_dir`` on failure. Always persists the status file before
returning.
Parameters
----------
job_name
Name of the job subdirectory under ``settings.input_dir``.
Returns
-------
JobStatusEntry
The final status entry as persisted.
"""
logger.info("[%s] Starting job", job_name)
started_at = datetime.now(UTC)
status = load_status(self._settings.status_file)
status.root[job_name] = JobStatusEntry(
status=JobStatus.PROCESSING,
started_at=started_at,
)
save_status(self._settings.status_file, status)
try:
final_entry = self._run_job_inner(job_name, started_at)
logger.info("[%s] Completed successfully", job_name)
except Exception as exc:
logger.exception("[%s] Failed", job_name)
final_entry = JobStatusEntry(
status=JobStatus.FAILED,
started_at=started_at,
completed_at=datetime.now(UTC),
error=_format_error(exc),
)
self._quarantine_job(job_name)
status = load_status(self._settings.status_file)
status.root[job_name] = final_entry
save_status(self._settings.status_file, status)
return final_entry
def recover_stuck_jobs(self) -> None:
"""Mark any ``processing`` entries as failed and quarantine their inputs.
Intended to run once on daemon startup. A job in ``processing``
state at startup was in flight when the previous daemon instance
died, so its actual state on disk is unknown. Rather than retrying
automatically (which would re-process arbitrary amounts of work),
we mark it failed with a clear interruption message and move its
inputs to ``failed_dir`` for operator review.
"""
status = load_status(self._settings.status_file)
dirty = False
for job_name, entry in list(status.root.items()):
if entry.status != JobStatus.PROCESSING:
continue
logger.warning("[%s] Recovering stuck job from previous run", job_name)
status.root[job_name] = JobStatusEntry(
status=JobStatus.FAILED,
started_at=entry.started_at,
completed_at=datetime.now(UTC),
error="interrupted: daemon shutdown or crash before completion",
)
self._quarantine_job(job_name)
dirty = True
if dirty:
save_status(self._settings.status_file, status)
# -- internals: job lifecycle ------------------------------------------
def _run_job_inner(self, job_name: str, started_at: datetime) -> JobStatusEntry:
"""Do the actual inference work for a single job.
Raises on any failure the caller in :meth:`process_job` handles
the transition to the failed state and the quarantine move.
"""
job_in_path = self._settings.input_dir / job_name
job_out_path = self._settings.output_dir / job_name
job_out_path.mkdir(parents=True, exist_ok=True)
videos = sorted(_discover_videos(job_in_path))
if not videos:
raise JobProcessingError(
f"no supported video files found in {job_in_path} "
f"(accepted extensions: {sorted(VIDEO_EXTENSIONS)})"
)
per_video_predictions = {}
for video_path in videos:
if self._stop:
raise JobProcessingError(
f"stop requested mid-job after processing "
f"{len(per_video_predictions)}/{len(videos)} videos"
)
logger.info("[%s] Processing video %s", job_name, video_path.name)
result = self._estimator.process_video(video_path)
per_video_predictions[video_path.name] = result.predictions
logger.info(
"[%s] Processed %s (%d frames)",
job_name,
video_path.name,
result.frame_count,
)
job_results = JobResults(root=per_video_predictions)
results_path = job_out_path / "results.json"
save_job_results(results_path, job_results)
logger.info("[%s] Wrote aggregated results to %s", job_name, results_path)
return JobStatusEntry(
status=JobStatus.COMPLETED,
started_at=started_at,
completed_at=datetime.now(UTC),
results_path=results_path,
)
def _discover_new_jobs(self, status: StatusFile) -> list[str]:
"""Return names of job subdirectories not yet tracked in ``status``.
Empty directories are **silently skipped** (not returned) so a user
mid-copy does not trigger a spurious "no videos" failure. A
directory containing files that are not videos IS returned here; it
will fail inside :meth:`_run_job_inner` and be quarantined, which
is the correct behaviour for genuinely broken inputs.
"""
input_dir = self._settings.input_dir
if not input_dir.exists():
return []
candidates = sorted(
p for p in input_dir.iterdir() if p.is_dir() and p.name not in status.root
)
new: list[str] = []
for path in candidates:
if _is_empty_dir(path):
logger.debug("[%s] Skipping empty job directory", path.name)
continue
new.append(path.name)
return new
def _quarantine_job(self, job_name: str) -> None:
"""Move a job's input directory to the failed directory.
If a quarantine target with the same name already exists (e.g.
because the same job name has failed before), a numeric suffix is
appended to disambiguate.
"""
source = self._settings.input_dir / job_name
if not source.exists():
logger.debug("[%s] Nothing to quarantine (source absent)", job_name)
return
self._settings.failed_dir.mkdir(parents=True, exist_ok=True)
dest = self._settings.failed_dir / job_name
suffix = 1
while dest.exists():
dest = self._settings.failed_dir / f"{job_name}.{suffix}"
suffix += 1
try:
source.rename(dest)
except OSError:
logger.exception("[%s] Failed to quarantine inputs to %s", job_name, dest)
return
logger.info("[%s] Quarantined inputs to %s", job_name, dest)
# -- internals: sleep / signals / lock ---------------------------------
def _interruptible_sleep(self, seconds: float) -> None:
"""Sleep up to ``seconds`` but wake early if :meth:`stop` is called."""
if seconds <= 0:
return
deadline = time.monotonic() + seconds
while not self._stop:
remaining = deadline - time.monotonic()
if remaining <= 0:
return
time.sleep(min(0.5, remaining))
def _install_signal_handlers(self) -> None:
"""Install SIGINT/SIGTERM handlers that set the stop flag.
Signal handlers can only be installed from the main thread; calling
this from a non-main thread (as tests occasionally do) raises
:class:`ValueError`, which we downgrade to a warning. The stop
mechanism still works via :meth:`stop`.
"""
try:
self._prev_sigint = signal.signal(signal.SIGINT, self._handle_signal)
self._prev_sigterm = signal.signal(signal.SIGTERM, self._handle_signal)
except ValueError:
logger.warning(
"Could not install signal handlers (Interfacer.run() is not running "
"on the main thread); rely on explicit Interfacer.stop() instead."
)
self._prev_sigint = None
self._prev_sigterm = None
def _restore_signal_handlers(self) -> None:
"""Restore the signal handlers that were in place before :meth:`run`."""
if self._prev_sigint is not None:
try:
signal.signal(signal.SIGINT, self._prev_sigint) # type: ignore[arg-type]
except (ValueError, TypeError):
pass
if self._prev_sigterm is not None:
try:
signal.signal(signal.SIGTERM, self._prev_sigterm) # type: ignore[arg-type]
except (ValueError, TypeError):
pass
self._prev_sigint = None
self._prev_sigterm = None
def _handle_signal(self, signum: int, frame: FrameType | None) -> None:
"""Signal handler: request shutdown."""
del frame
logger.info("Received signal %d; requesting graceful shutdown", signum)
self.stop()
def _acquire_lock(self) -> None:
"""Acquire the single-instance lock file, or raise :class:`AlreadyRunningError`."""
self._settings.data_dir.mkdir(parents=True, exist_ok=True)
lock_path = self._settings.data_dir / LOCK_FILENAME
fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT, 0o600)
try:
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError as exc:
os.close(fd)
raise AlreadyRunningError(
f"another NeuroPose daemon already holds {lock_path}"
) from exc
# Overwrite the lock file with our PID so humans can see who owns it.
os.ftruncate(fd, 0)
os.write(fd, f"{os.getpid()}\n".encode())
self._lock_fd = fd
logger.debug("Acquired lock %s (pid %d)", lock_path, os.getpid())
def _release_lock(self) -> None:
"""Release the single-instance lock if held."""
if self._lock_fd is None:
return
try:
fcntl.flock(self._lock_fd, fcntl.LOCK_UN)
except OSError:
pass
try:
os.close(self._lock_fd)
except OSError:
pass
self._lock_fd = None
# ---------------------------------------------------------------------------
# Module helpers
# ---------------------------------------------------------------------------
def _discover_videos(job_dir: Path) -> Iterable[Path]:
"""Yield paths to all supported video files in ``job_dir`` (non-recursive)."""
return (
p
for p in job_dir.iterdir()
if p.is_file() and p.suffix.lower() in VIDEO_EXTENSIONS
)
def _is_empty_dir(path: Path) -> bool:
"""Return ``True`` if ``path`` is a directory containing no entries."""
try:
next(path.iterdir())
except StopIteration:
return True
return False
def _format_error(exc: BaseException) -> str:
"""Return a short ``"ExceptionType: message"`` string for the status file.
Kept deliberately short so ``status.json`` does not balloon with
multi-kilobyte tracebacks. Full tracebacks still land in the log via
:meth:`logging.Logger.exception`.
"""
message = str(exc) or "<no message>"
return f"{type(exc).__name__}: {message}"

View File

@ -0,0 +1,564 @@
"""Tests for :class:`neuropose.interfacer.Interfacer`.
Exercises the daemon's job-lifecycle and state-transition logic with an
injected fake estimator. The full fcntl lock test depends on the
behaviour that within a single process, two ``fcntl.flock`` calls on the
same file through independent file descriptors block each other.
"""
from __future__ import annotations
import shutil
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
import pytest
from neuropose.config import Settings
from neuropose.estimator import Estimator
from neuropose.interfacer import (
AlreadyRunningError,
Interfacer,
JobProcessingError,
)
from neuropose.io import (
JobStatus,
JobStatusEntry,
StatusFile,
load_status,
save_status,
)
# ---------------------------------------------------------------------------
# Stubs and helpers
# ---------------------------------------------------------------------------
class _RaisingEstimator:
"""Stub estimator whose ``process_video`` always raises."""
def __init__(self, exc: Exception | None = None) -> None:
self._exc = exc or RuntimeError("simulated estimator failure")
self.is_model_loaded = True
def load_model(self, cache_dir: Path | None = None) -> None:
del cache_dir
def process_video(self, video_path: Path) -> Any:
del video_path
raise self._exc
def _make_settings(tmp_path: Path) -> Settings:
"""Construct a Settings object pointing at an isolated tmp_path."""
return Settings(
data_dir=tmp_path / "jobs",
model_cache_dir=tmp_path / "models",
)
def _prepare_job(
settings: Settings,
job_name: str,
videos: list[Path] | None = None,
extra_files: list[tuple[str, bytes]] | None = None,
) -> Path:
"""Create ``input_dir/<job_name>`` and populate it.
Parameters
----------
videos
Video files to copy into the job directory. Relative filenames are
taken from the source path's ``name`` attribute.
extra_files
Additional ``(name, bytes)`` tuples to drop into the job directory.
Useful for exercising the "directory with files but no videos"
failure path.
"""
settings.ensure_dirs()
job_dir = settings.input_dir / job_name
job_dir.mkdir(parents=True, exist_ok=True)
for video in videos or []:
shutil.copy(video, job_dir / video.name)
for name, blob in extra_files or []:
(job_dir / name).write_bytes(blob)
return job_dir
# ---------------------------------------------------------------------------
# Construction and stop flag
# ---------------------------------------------------------------------------
class TestConstruction:
def test_initial_state(self, tmp_path: Path, fake_metrabs_model) -> None:
settings = _make_settings(tmp_path)
estimator = Estimator(model=fake_metrabs_model)
interfacer = Interfacer(settings, estimator)
assert not interfacer.is_stopping
def test_stop_sets_flag(self, tmp_path: Path, fake_metrabs_model) -> None:
settings = _make_settings(tmp_path)
estimator = Estimator(model=fake_metrabs_model)
interfacer = Interfacer(settings, estimator)
interfacer.stop()
assert interfacer.is_stopping
def test_stop_is_idempotent(self, tmp_path: Path, fake_metrabs_model) -> None:
settings = _make_settings(tmp_path)
estimator = Estimator(model=fake_metrabs_model)
interfacer = Interfacer(settings, estimator)
interfacer.stop()
interfacer.stop()
assert interfacer.is_stopping
# ---------------------------------------------------------------------------
# Job discovery
# ---------------------------------------------------------------------------
class TestDiscoverNewJobs:
def test_empty_input_dir(self, tmp_path: Path, fake_metrabs_model) -> None:
settings = _make_settings(tmp_path)
settings.ensure_dirs()
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
assert interfacer._discover_new_jobs(StatusFile(root={})) == []
def test_missing_input_dir(self, tmp_path: Path, fake_metrabs_model) -> None:
# data_dir not yet created; ensure_dirs has NOT been called.
settings = _make_settings(tmp_path)
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
assert interfacer._discover_new_jobs(StatusFile(root={})) == []
def test_skips_empty_directories_silently(
self, tmp_path: Path, fake_metrabs_model
) -> None:
settings = _make_settings(tmp_path)
settings.ensure_dirs()
(settings.input_dir / "empty_job").mkdir()
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
assert interfacer._discover_new_jobs(StatusFile(root={})) == []
def test_returns_non_empty_jobs_in_sorted_order(
self,
tmp_path: Path,
synthetic_video: Path,
fake_metrabs_model,
) -> None:
settings = _make_settings(tmp_path)
_prepare_job(settings, "job_c", videos=[synthetic_video])
_prepare_job(settings, "job_a", videos=[synthetic_video])
_prepare_job(settings, "job_b", videos=[synthetic_video])
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
assert interfacer._discover_new_jobs(StatusFile(root={})) == [
"job_a",
"job_b",
"job_c",
]
def test_excludes_jobs_already_in_status(
self,
tmp_path: Path,
synthetic_video: Path,
fake_metrabs_model,
) -> None:
settings = _make_settings(tmp_path)
_prepare_job(settings, "job_a", videos=[synthetic_video])
_prepare_job(settings, "job_b", videos=[synthetic_video])
status = StatusFile.model_validate(
{
"job_a": {
"status": "completed",
"started_at": datetime(2026, 4, 13, tzinfo=UTC).isoformat(),
}
}
)
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
assert interfacer._discover_new_jobs(status) == ["job_b"]
def test_dir_with_non_video_files_is_returned(
self, tmp_path: Path, fake_metrabs_model
) -> None:
# Dirs that contain files but no *videos* are NOT silently skipped
# — they should be returned so process_job marks them failed.
settings = _make_settings(tmp_path)
_prepare_job(
settings,
"job_a",
extra_files=[("README.txt", b"nothing to see")],
)
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
assert interfacer._discover_new_jobs(StatusFile(root={})) == ["job_a"]
# ---------------------------------------------------------------------------
# process_job happy path
# ---------------------------------------------------------------------------
class TestProcessJobSuccess:
def test_happy_path_marks_completed(
self,
tmp_path: Path,
synthetic_video: Path,
fake_metrabs_model,
) -> None:
settings = _make_settings(tmp_path)
_prepare_job(settings, "job_a", videos=[synthetic_video])
estimator = Estimator(model=fake_metrabs_model)
interfacer = Interfacer(settings, estimator)
entry = interfacer.process_job("job_a")
assert entry.status == JobStatus.COMPLETED
assert entry.results_path is not None
assert entry.results_path.exists()
assert entry.error is None
assert entry.completed_at is not None
def test_happy_path_persists_status(
self,
tmp_path: Path,
synthetic_video: Path,
fake_metrabs_model,
) -> None:
settings = _make_settings(tmp_path)
_prepare_job(settings, "job_a", videos=[synthetic_video])
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
interfacer.process_job("job_a")
loaded = load_status(settings.status_file)
assert "job_a" in loaded.root
assert loaded.root["job_a"].status == JobStatus.COMPLETED
def test_happy_path_leaves_inputs_in_place(
self,
tmp_path: Path,
synthetic_video: Path,
fake_metrabs_model,
) -> None:
# A successful job should NOT be quarantined — its inputs stay in
# input_dir. (An operator might want to rename / archive them
# separately, but that's not the daemon's job.)
settings = _make_settings(tmp_path)
_prepare_job(settings, "job_a", videos=[synthetic_video])
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
interfacer.process_job("job_a")
assert (settings.input_dir / "job_a").exists()
assert not (settings.failed_dir / "job_a").exists()
# ---------------------------------------------------------------------------
# process_job failure paths
# ---------------------------------------------------------------------------
class TestProcessJobFailure:
def test_no_videos_marks_failed_and_quarantines(
self, tmp_path: Path, fake_metrabs_model
) -> None:
settings = _make_settings(tmp_path)
_prepare_job(
settings,
"job_a",
extra_files=[("README.txt", b"nothing to see")],
)
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
entry = interfacer.process_job("job_a")
assert entry.status == JobStatus.FAILED
assert entry.error is not None
assert "no supported video files" in entry.error
# Inputs moved to failed_dir.
assert not (settings.input_dir / "job_a").exists()
assert (settings.failed_dir / "job_a").exists()
assert (settings.failed_dir / "job_a" / "README.txt").exists()
def test_estimator_exception_marks_failed_and_quarantines(
self, tmp_path: Path, synthetic_video: Path
) -> None:
settings = _make_settings(tmp_path)
_prepare_job(settings, "job_a", videos=[synthetic_video])
interfacer = Interfacer(settings, _RaisingEstimator())
entry = interfacer.process_job("job_a")
assert entry.status == JobStatus.FAILED
assert entry.error is not None
assert "RuntimeError" in entry.error
assert "simulated estimator failure" in entry.error
assert not (settings.input_dir / "job_a").exists()
assert (settings.failed_dir / "job_a").exists()
def test_quarantine_collision_suffixes(
self, tmp_path: Path, fake_metrabs_model
) -> None:
settings = _make_settings(tmp_path)
settings.ensure_dirs()
# Pre-populate failed_dir with an existing entry for "job_a".
(settings.failed_dir / "job_a").mkdir()
_prepare_job(
settings,
"job_a",
extra_files=[("not_a_video.txt", b"x")],
)
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
interfacer.process_job("job_a")
assert (settings.failed_dir / "job_a").exists()
assert (settings.failed_dir / "job_a.1").exists()
assert (settings.failed_dir / "job_a.1" / "not_a_video.txt").exists()
def test_raising_estimator_error_type_is_reported(
self, tmp_path: Path, synthetic_video: Path
) -> None:
settings = _make_settings(tmp_path)
_prepare_job(settings, "job_a", videos=[synthetic_video])
interfacer = Interfacer(
settings,
_RaisingEstimator(exc=JobProcessingError("custom boom")),
)
entry = interfacer.process_job("job_a")
assert entry.status == JobStatus.FAILED
assert entry.error is not None
assert "JobProcessingError" in entry.error
assert "custom boom" in entry.error
# ---------------------------------------------------------------------------
# Stuck-processing recovery
# ---------------------------------------------------------------------------
class TestRecoverStuckJobs:
def test_recovers_single_stuck_entry(
self, tmp_path: Path, synthetic_video: Path, fake_metrabs_model
) -> None:
settings = _make_settings(tmp_path)
_prepare_job(settings, "job_a", videos=[synthetic_video])
status = StatusFile.model_validate(
{
"job_a": {
"status": "processing",
"started_at": datetime(2026, 4, 13, tzinfo=UTC).isoformat(),
}
}
)
save_status(settings.status_file, status)
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
interfacer.recover_stuck_jobs()
loaded = load_status(settings.status_file)
entry = loaded.root["job_a"]
assert entry.status == JobStatus.FAILED
assert entry.error is not None
assert "interrupted" in entry.error
assert entry.completed_at is not None
# Inputs were quarantined.
assert not (settings.input_dir / "job_a").exists()
assert (settings.failed_dir / "job_a").exists()
def test_does_not_touch_completed_entries(
self, tmp_path: Path, fake_metrabs_model
) -> None:
settings = _make_settings(tmp_path)
settings.ensure_dirs()
completed = datetime(2026, 4, 13, 10, 0, 0, tzinfo=UTC)
status = StatusFile.model_validate(
{
"job_a": {
"status": "completed",
"started_at": completed.isoformat(),
"completed_at": completed.isoformat(),
},
"job_b": {
"status": "failed",
"started_at": completed.isoformat(),
"completed_at": completed.isoformat(),
"error": "old failure",
},
}
)
save_status(settings.status_file, status)
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
interfacer.recover_stuck_jobs()
loaded = load_status(settings.status_file)
assert loaded.root["job_a"].status == JobStatus.COMPLETED
assert loaded.root["job_b"].status == JobStatus.FAILED
assert loaded.root["job_b"].error == "old failure"
def test_no_status_file_is_noop(
self, tmp_path: Path, fake_metrabs_model
) -> None:
settings = _make_settings(tmp_path)
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
# Must not raise even though status_file does not exist.
interfacer.recover_stuck_jobs()
# ---------------------------------------------------------------------------
# run_once
# ---------------------------------------------------------------------------
class TestRunOnce:
def test_no_jobs_is_noop(self, tmp_path: Path, fake_metrabs_model) -> None:
settings = _make_settings(tmp_path)
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
interfacer.run_once()
loaded = load_status(settings.status_file)
assert loaded.is_empty()
def test_processes_all_new_jobs(
self,
tmp_path: Path,
synthetic_video: Path,
fake_metrabs_model,
) -> None:
settings = _make_settings(tmp_path)
_prepare_job(settings, "job_a", videos=[synthetic_video])
_prepare_job(settings, "job_b", videos=[synthetic_video])
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
interfacer.run_once()
loaded = load_status(settings.status_file)
assert loaded.root["job_a"].status == JobStatus.COMPLETED
assert loaded.root["job_b"].status == JobStatus.COMPLETED
def test_stop_between_jobs_defers_remaining(
self,
tmp_path: Path,
synthetic_video: Path,
fake_metrabs_model,
) -> None:
settings = _make_settings(tmp_path)
_prepare_job(settings, "job_a", videos=[synthetic_video])
_prepare_job(settings, "job_b", videos=[synthetic_video])
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
# Override process_job so the first call flips the stop flag
# before returning. The loop should then break before job_b.
original = interfacer.process_job
call_log: list[str] = []
def recording_process_job(job_name: str) -> JobStatusEntry:
call_log.append(job_name)
result = original(job_name)
interfacer.stop()
return result
interfacer.process_job = recording_process_job # type: ignore[method-assign]
interfacer.run_once()
assert call_log == ["job_a"]
loaded = load_status(settings.status_file)
assert "job_a" in loaded.root
assert "job_b" not in loaded.root
# ---------------------------------------------------------------------------
# Single-instance lock
# ---------------------------------------------------------------------------
class TestLock:
def test_first_acquire_succeeds(
self, tmp_path: Path, fake_metrabs_model
) -> None:
settings = _make_settings(tmp_path)
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
try:
interfacer._acquire_lock()
assert interfacer._lock_fd is not None
finally:
interfacer._release_lock()
def test_second_acquire_raises_already_running(
self, tmp_path: Path, fake_metrabs_model
) -> None:
settings = _make_settings(tmp_path)
first = Interfacer(settings, Estimator(model=fake_metrabs_model))
second = Interfacer(settings, Estimator(model=fake_metrabs_model))
first._acquire_lock()
try:
with pytest.raises(AlreadyRunningError):
second._acquire_lock()
finally:
first._release_lock()
def test_release_allows_subsequent_acquire(
self, tmp_path: Path, fake_metrabs_model
) -> None:
settings = _make_settings(tmp_path)
first = Interfacer(settings, Estimator(model=fake_metrabs_model))
first._acquire_lock()
first._release_lock()
second = Interfacer(settings, Estimator(model=fake_metrabs_model))
try:
second._acquire_lock() # Should succeed after release.
finally:
second._release_lock()
def test_lock_file_contains_pid(
self, tmp_path: Path, fake_metrabs_model
) -> None:
settings = _make_settings(tmp_path)
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
try:
interfacer._acquire_lock()
lock_path = settings.data_dir / ".neuropose.lock"
content = lock_path.read_text().strip()
import os
assert content == str(os.getpid())
finally:
interfacer._release_lock()
# ---------------------------------------------------------------------------
# Interruptible sleep
# ---------------------------------------------------------------------------
class TestInterruptibleSleep:
def test_zero_returns_immediately(
self, tmp_path: Path, fake_metrabs_model
) -> None:
settings = _make_settings(tmp_path)
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
import time
start = time.monotonic()
interfacer._interruptible_sleep(0)
elapsed = time.monotonic() - start
assert elapsed < 0.1
def test_stop_flag_wakes_sleep_early(
self, tmp_path: Path, fake_metrabs_model
) -> None:
settings = _make_settings(tmp_path)
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
interfacer.stop()
import time
start = time.monotonic()
interfacer._interruptible_sleep(5.0)
elapsed = time.monotonic() - start
# With stop flag already set, the sleep should return in well under
# the 5-second nominal window.
assert elapsed < 1.0