linting and formatting

This commit is contained in:
Levi Neuwirth 2026-04-14 13:50:11 -04:00
parent 847fdd3a2b
commit c4c9bffff8
16 changed files with 75 additions and 154 deletions

View File

@ -54,8 +54,7 @@ logger = logging.getLogger(__name__)
# human. These are supply-chain constants.
_MODEL_URL = (
"https://omnomnom.vision.rwth-aachen.de/data/metrabs/"
"metrabs_eff2l_y4_384px_800k_28ds.tar.gz"
"https://omnomnom.vision.rwth-aachen.de/data/metrabs/metrabs_eff2l_y4_384px_800k_28ds.tar.gz"
)
_MODEL_SHA256 = "fa31b5b043f227588c3d224e56db89307d021bfbbb52e36028919f90e1f96c89"
_MODEL_ARCHIVE_NAME = "metrabs_eff2l_y4_384px_800k_28ds.tar.gz"
@ -174,13 +173,13 @@ def _download_with_progress(url: str, dest: Path) -> None:
logger.info("Downloading %s%s", url, dest)
dest.parent.mkdir(parents=True, exist_ok=True)
request = urllib.request.Request( # noqa: S310
request = urllib.request.Request(
url,
headers={"User-Agent": "neuropose/0.1"},
)
try:
with urllib.request.urlopen( # noqa: S310
with urllib.request.urlopen(
request,
timeout=_DOWNLOAD_SOCKET_TIMEOUT,
) as response:
@ -210,15 +209,12 @@ def _download_with_progress(url: str, dest: Path) -> None:
except Exception as exc:
# Clean up partial file so the next call re-downloads cleanly.
dest.unlink(missing_ok=True)
raise RuntimeError(
f"Failed to download MeTRAbs model from {url}: {exc}"
) from exc
raise RuntimeError(f"Failed to download MeTRAbs model from {url}: {exc}") from exc
if total_bytes > 0 and downloaded != total_bytes:
dest.unlink(missing_ok=True)
raise RuntimeError(
f"Download from {url} was truncated: "
f"got {downloaded} bytes, expected {total_bytes}."
f"Download from {url} was truncated: got {downloaded} bytes, expected {total_bytes}."
)
logger.info("Download complete: %d bytes", downloaded)
@ -308,9 +304,7 @@ def _find_saved_model(root: Path) -> Path:
"""
candidates = list(root.rglob("saved_model.pb"))
if not candidates:
raise RuntimeError(
f"no saved_model.pb found under {root}; tarball layout unexpected"
)
raise RuntimeError(f"no saved_model.pb found under {root}; tarball layout unexpected")
if len(candidates) > 1:
raise RuntimeError(
f"multiple saved_model.pb files found under {root}: "
@ -328,7 +322,7 @@ def _tf_load(saved_model_dir: Path) -> Any:
code paths that never reach the loader.
"""
try:
import tensorflow as tf # noqa: PLC0415
import tensorflow as tf
except ImportError as exc:
raise RuntimeError(
"TensorFlow is required to load the MeTRAbs model but is not installed. "
@ -340,9 +334,7 @@ def _tf_load(saved_model_dir: Path) -> Any:
try:
model = tf.saved_model.load(str(saved_model_dir))
except Exception as exc:
raise RuntimeError(
f"Failed to load SavedModel from {saved_model_dir}: {exc}"
) from exc
raise RuntimeError(f"Failed to load SavedModel from {saved_model_dir}: {exc}") from exc
missing = [attr for attr in _REQUIRED_MODEL_ATTRS if not hasattr(model, attr)]
if missing:

View File

@ -67,7 +67,7 @@ def _require_fastdtw() -> tuple[Callable, Callable]:
points the user at the ``analysis`` optional-dependencies extra.
"""
try:
from fastdtw import fastdtw
from fastdtw import fastdtw # type: ignore[attr-defined]
from scipy.spatial.distance import euclidean
except ImportError as exc:
raise ImportError(
@ -143,9 +143,7 @@ def dtw_per_joint(a: np.ndarray, b: np.ndarray) -> list[DTWResult]:
a_joint = a[:, joint_idx, :]
b_joint = b[:, joint_idx, :]
distance, path = fastdtw(a_joint, b_joint, dist=euclidean)
results.append(
DTWResult(distance=float(distance), path=[tuple(p) for p in path])
)
results.append(DTWResult(distance=float(distance), path=[tuple(p) for p in path]))
return results
@ -188,8 +186,7 @@ def dtw_relation(
num_joints = a.shape[1]
if not (0 <= joint_i < num_joints) or not (0 <= joint_j < num_joints):
raise ValueError(
f"joint indices must be in [0, {num_joints}); "
f"got joint_i={joint_i}, joint_j={joint_j}"
f"joint indices must be in [0, {num_joints}); got joint_i={joint_i}, joint_j={joint_j}"
)
fastdtw, euclidean = _require_fastdtw()
disp_a = a[:, joint_j, :] - a[:, joint_i, :]
@ -202,8 +199,7 @@ def _validate_same_joint_count(a: np.ndarray, b: np.ndarray) -> None:
"""Raise :class:`ValueError` if ``a`` and ``b`` disagree on joint count."""
if a.ndim < 2 or b.ndim < 2:
raise ValueError(
f"expected 3D arrays of shape (frames, joints, 3); "
f"got a.ndim={a.ndim}, b.ndim={b.ndim}"
f"expected 3D arrays of shape (frames, joints, 3); got a.ndim={a.ndim}, b.ndim={b.ndim}"
)
if a.shape[1] != b.shape[1]:
raise ValueError(

View File

@ -26,12 +26,12 @@ from __future__ import annotations
from collections.abc import Sequence
from dataclasses import dataclass
from typing import Any
import numpy as np
from neuropose.io import VideoPredictions
# ---------------------------------------------------------------------------
# VideoPredictions → numpy
# ---------------------------------------------------------------------------
@ -122,9 +122,7 @@ def normalize_pose_sequence(
sequence is degenerate (zero extent on every axis).
"""
if sequence.ndim != 3 or sequence.shape[-1] != 3:
raise ValueError(
f"expected (frames, joints, 3); got shape {sequence.shape}"
)
raise ValueError(f"expected (frames, joints, 3); got shape {sequence.shape}")
result = sequence.astype(float, copy=True)
mins = result.reshape(-1, 3).min(axis=0)
maxs = result.reshape(-1, 3).max(axis=0)
@ -187,9 +185,7 @@ def pad_sequences(
"""
if not sequences:
if target_length is None:
raise ValueError(
"cannot infer target_length from an empty sequence list"
)
raise ValueError("cannot infer target_length from an empty sequence list")
return []
first = sequences[0]
@ -197,13 +193,10 @@ def pad_sequences(
for idx, seq in enumerate(sequences):
if seq.shape[1:] != trailing_shape:
raise ValueError(
f"sequence {idx} has trailing shape {seq.shape[1:]}; "
f"expected {trailing_shape}"
f"sequence {idx} has trailing shape {seq.shape[1:]}; expected {trailing_shape}"
)
length = target_length if target_length is not None else max(
s.shape[0] for s in sequences
)
length = target_length if target_length is not None else max(s.shape[0] for s in sequences)
padded: list[np.ndarray] = []
for seq in sequences:
@ -255,17 +248,13 @@ def extract_joint_angles(
If any joint index in ``triplets`` is out of range.
"""
if sequence.ndim != 3 or sequence.shape[-1] != 3:
raise ValueError(
f"expected (frames, joints, 3); got shape {sequence.shape}"
)
raise ValueError(f"expected (frames, joints, 3); got shape {sequence.shape}")
num_joints = sequence.shape[1]
columns: list[np.ndarray] = []
for a_idx, b_idx, c_idx in triplets:
for idx in (a_idx, b_idx, c_idx):
if not (0 <= idx < num_joints):
raise ValueError(
f"joint index {idx} out of range [0, {num_joints})"
)
raise ValueError(f"joint index {idx} out of range [0, {num_joints})")
v1 = sequence[:, a_idx, :] - sequence[:, b_idx, :]
v2 = sequence[:, c_idx, :] - sequence[:, b_idx, :]
n1 = np.linalg.norm(v1, axis=1)
@ -343,7 +332,7 @@ def extract_feature_statistics(values: np.ndarray) -> FeatureStatistics:
# ---------------------------------------------------------------------------
def find_peaks(values: np.ndarray, **kwargs: object) -> np.ndarray:
def find_peaks(values: np.ndarray, **kwargs: Any) -> np.ndarray:
"""Return indices of local maxima in a 1D series.
Thin wrapper around :func:`scipy.signal.find_peaks` that returns

View File

@ -84,8 +84,7 @@ class Settings(BaseSettings):
def _validate_device(cls, value: str) -> str:
if not _DEVICE_PATTERN.match(value):
raise ValueError(
f"device must match '/(CPU|GPU):<index>' "
f"(e.g. '/CPU:0', '/GPU:0'); got {value!r}"
f"device must match '/(CPU|GPU):<index>' (e.g. '/CPU:0', '/GPU:0'); got {value!r}"
)
return value
@ -140,9 +139,7 @@ class Settings(BaseSettings):
if data is None:
data = {}
if not isinstance(data, dict):
raise ValueError(
f"config file must contain a YAML mapping; got {type(data).__name__}"
)
raise ValueError(f"config file must contain a YAML mapping; got {type(data).__name__}")
return cls(**data)
def ensure_dirs(self) -> None:

View File

@ -26,6 +26,7 @@ Key guarantees
from __future__ import annotations
import contextlib
import fcntl
import logging
import os
@ -381,15 +382,11 @@ class Interfacer:
def _restore_signal_handlers(self) -> None:
"""Restore the signal handlers that were in place before :meth:`run`."""
if self._prev_sigint is not None:
try:
with contextlib.suppress(ValueError, TypeError):
signal.signal(signal.SIGINT, self._prev_sigint) # type: ignore[arg-type]
except (ValueError, TypeError):
pass
if self._prev_sigterm is not None:
try:
with contextlib.suppress(ValueError, TypeError):
signal.signal(signal.SIGTERM, self._prev_sigterm) # type: ignore[arg-type]
except (ValueError, TypeError):
pass
self._prev_sigint = None
self._prev_sigterm = None
@ -421,14 +418,10 @@ class Interfacer:
"""Release the single-instance lock if held."""
if self._lock_fd is None:
return
try:
with contextlib.suppress(OSError):
fcntl.flock(self._lock_fd, fcntl.LOCK_UN)
except OSError:
pass
try:
with contextlib.suppress(OSError):
os.close(self._lock_fd)
except OSError:
pass
self._lock_fd = None
@ -439,11 +432,7 @@ class Interfacer:
def _discover_videos(job_dir: Path) -> Iterable[Path]:
"""Yield paths to all supported video files in ``job_dir`` (non-recursive)."""
return (
p
for p in job_dir.iterdir()
if p.is_file() and p.suffix.lower() in VIDEO_EXTENSIONS
)
return (p for p in job_dir.iterdir() if p.is_file() and p.suffix.lower() in VIDEO_EXTENSIONS)
def _is_empty_dir(path: Path) -> bool:

View File

@ -17,14 +17,14 @@ from __future__ import annotations
import json
from collections.abc import Iterator
from datetime import datetime
from enum import Enum
from enum import StrEnum
from pathlib import Path
from typing import Any
from pydantic import BaseModel, ConfigDict, Field, RootModel
class JobStatus(str, Enum):
class JobStatus(StrEnum):
"""Lifecycle state of a single processing job."""
PROCESSING = "processing"
@ -94,12 +94,15 @@ class VideoPredictions(BaseModel):
return list(self.frames.keys())
def __len__(self) -> int:
"""Return the number of frames."""
return len(self.frames)
def __iter__(self) -> Iterator[str]: # type: ignore[override]
"""Iterate over frame identifiers in insertion order."""
return iter(self.frames)
def __getitem__(self, key: str) -> FramePrediction:
"""Return the :class:`FramePrediction` for ``key``."""
return self.frames[key]
@ -115,12 +118,15 @@ class JobResults(RootModel[dict[str, VideoPredictions]]):
return list(self.root.keys())
def __len__(self) -> int:
"""Return the number of videos in the job."""
return len(self.root)
def __iter__(self) -> Iterator[str]: # type: ignore[override]
"""Iterate over video names in insertion order."""
return iter(self.root)
def __getitem__(self, key: str) -> VideoPredictions:
"""Return the :class:`VideoPredictions` for ``key``."""
return self.root[key]
@ -150,9 +156,11 @@ class StatusFile(RootModel[dict[str, JobStatusEntry]]):
return len(self.root) == 0
def __len__(self) -> int:
"""Return the number of job entries."""
return len(self.root)
def __iter__(self) -> Iterator[str]: # type: ignore[override]
"""Iterate over job names in insertion order."""
return iter(self.root)

View File

@ -145,7 +145,7 @@ def _render_frame(
view: str,
joint_edges: Sequence[tuple[int, int]] | None,
plt: Any,
Rectangle: Any,
Rectangle: Any, # noqa: N803 — matches matplotlib.patches.Rectangle class name
) -> None:
"""Render one frame's 2D overlay + 3D scatter to ``out_path``."""
# Explicit copies. The previous prototype mutated the caller's data via

View File

@ -3,7 +3,6 @@
from __future__ import annotations
import os
from collections.abc import Iterator
from pathlib import Path
from typing import Any
@ -11,7 +10,6 @@ import cv2
import numpy as np
import pytest
# ---------------------------------------------------------------------------
# Slow test opt-in
# ---------------------------------------------------------------------------
@ -57,7 +55,7 @@ def pytest_collection_modifyitems(
def _isolate_environment(
monkeypatch: pytest.MonkeyPatch,
tmp_path_factory: pytest.TempPathFactory,
) -> Iterator[None]:
) -> None:
"""Isolate every test from the developer's real home directory.
Points ``$HOME`` and ``$XDG_DATA_HOME`` at per-test temp directories so
@ -73,7 +71,6 @@ def _isolate_environment(
for key in list(os.environ):
if key.startswith("NEUROPOSE_"):
monkeypatch.delenv(key, raising=False)
yield
@pytest.fixture
@ -91,13 +88,13 @@ def xdg_home() -> Path:
def synthetic_video(tmp_path: Path) -> Path:
"""Generate a tiny synthetic video at test time.
The fixture writes a 5-frame, 32×32 MJPG-encoded ``.avi`` file. MJPG is
The fixture writes a 5-frame, 32x32 MJPG-encoded ``.avi`` file. MJPG is
chosen over ``mp4v`` because it ships with ``opencv-python-headless`` on
every platform we target, whereas ``mp4v`` occasionally requires an
ffmpeg binary that may not be present on minimal CI runners.
"""
path = tmp_path / "synthetic.avi"
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
fourcc = cv2.VideoWriter_fourcc(*"MJPG") # type: ignore[attr-defined]
writer = cv2.VideoWriter(str(path), fourcc, 30.0, (32, 32))
assert writer.isOpened(), "cv2.VideoWriter failed to open; MJPG codec missing?"
for i in range(5):
@ -106,7 +103,8 @@ def synthetic_video(tmp_path: Path) -> Path:
frame = np.full((32, 32, 3), i * 40, dtype=np.uint8)
writer.write(frame)
writer.release()
assert path.exists() and path.stat().st_size > 0, "Synthetic video is empty."
assert path.exists(), "Synthetic video was not written."
assert path.stat().st_size > 0, "Synthetic video is empty."
return path

View File

@ -41,16 +41,16 @@ pytestmark = pytest.mark.slow
@pytest.fixture
def integration_video(tmp_path: Path) -> Path:
"""Generate a 384×288 synthetic video sized for MeTRAbs input.
"""Generate a 384x288 synthetic video sized for MeTRAbs input.
The default ``synthetic_video`` fixture in ``tests/conftest.py``
produces 32×32 frames, which is too small for MeTRAbs's 384 px
produces 32x32 frames, which is too small for MeTRAbs's 384 px
input and may cause the detector pipeline to short-circuit
unpredictably. This fixture produces a modestly-sized video so
the smoke test's plumbing assertions are meaningful.
"""
path = tmp_path / "integration.avi"
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
fourcc = cv2.VideoWriter_fourcc(*"MJPG") # type: ignore[attr-defined]
writer = cv2.VideoWriter(str(path), fourcc, 30.0, (384, 288))
assert writer.isOpened(), "cv2.VideoWriter failed to open; MJPG codec missing?"
for i in range(5):
@ -60,7 +60,8 @@ def integration_video(tmp_path: Path) -> Path:
frame = np.full((288, 384, 3), 100 + i * 10, dtype=np.uint8)
writer.write(frame)
writer.release()
assert path.exists() and path.stat().st_size > 0
assert path.exists()
assert path.stat().st_size > 0
return path
@ -95,9 +96,7 @@ class TestMetrabsLoader:
assert hasattr(model_a, "detect_poses")
assert hasattr(model_b, "detect_poses")
def test_berkeley_mhad_skeleton_is_present(
self, shared_model_cache_dir: Path
) -> None:
def test_berkeley_mhad_skeleton_is_present(self, shared_model_cache_dir: Path) -> None:
"""The estimator pins skeleton='berkeley_mhad_43'; verify it exists."""
model = load_metrabs_model(cache_dir=shared_model_cache_dir)
joint_names = model.per_skeleton_joint_names["berkeley_mhad_43"]

View File

@ -12,7 +12,6 @@ from neuropose.analyzer.dtw import (
dtw_relation,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@ -31,18 +30,14 @@ def simple_sequence() -> np.ndarray:
class TestDtwAll:
def test_identical_sequences_distance_zero(
self, simple_sequence: np.ndarray
) -> None:
def test_identical_sequences_distance_zero(self, simple_sequence: np.ndarray) -> None:
result = dtw_all(simple_sequence, simple_sequence)
assert isinstance(result, DTWResult)
assert result.distance == pytest.approx(0.0, abs=1e-9)
# Identical sequences produce a diagonal warping path.
assert all(i == j for i, j in result.path)
def test_shifted_sequences_distance_zero(
self, simple_sequence: np.ndarray
) -> None:
def test_shifted_sequences_distance_zero(self, simple_sequence: np.ndarray) -> None:
"""DTW should absorb a pure time shift without penalty."""
# Duplicate the first frame to create a one-frame shift.
shifted = np.concatenate([simple_sequence[:1], simple_sequence], axis=0)
@ -74,9 +69,7 @@ class TestDtwAll:
class TestDtwPerJoint:
def test_returns_one_result_per_joint(
self, simple_sequence: np.ndarray
) -> None:
def test_returns_one_result_per_joint(self, simple_sequence: np.ndarray) -> None:
results = dtw_per_joint(simple_sequence, simple_sequence)
assert len(results) == simple_sequence.shape[1]
for result in results:
@ -112,9 +105,7 @@ class TestDtwPerJoint:
class TestDtwRelation:
def test_identical_sequences_distance_zero(
self, simple_sequence: np.ndarray
) -> None:
def test_identical_sequences_distance_zero(self, simple_sequence: np.ndarray) -> None:
result = dtw_relation(simple_sequence, simple_sequence, joint_i=0, joint_j=1)
assert result.distance == pytest.approx(0.0, abs=1e-9)

View File

@ -18,7 +18,6 @@ from neuropose.analyzer.features import (
)
from neuropose.io import VideoPredictions
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@ -30,10 +29,7 @@ def _make_predictions(num_frames: int, num_persons: int = 1) -> VideoPredictions
for i in range(num_frames):
frames[f"frame_{i:06d}"] = {
"boxes": [[0.0, 0.0, 1.0, 1.0, 0.9]] * num_persons,
"poses3d": [
[[float(i), float(i) * 2, float(i) * 3], [0.0, 0.0, 0.0]]
]
* num_persons,
"poses3d": [[[float(i), float(i) * 2, float(i) * 3], [0.0, 0.0, 0.0]]] * num_persons,
"poses2d": [[[0.0, 0.0], [1.0, 1.0]]] * num_persons,
}
return VideoPredictions.model_validate(

View File

@ -73,9 +73,7 @@ class TestTopLevelOptions:
result = runner.invoke(app, [subcommand, "--help"])
assert result.exit_code == EXIT_OK, f"{subcommand} --help failed"
def test_verbose_and_quiet_are_mutually_exclusive(
self, runner: CliRunner
) -> None:
def test_verbose_and_quiet_are_mutually_exclusive(self, runner: CliRunner) -> None:
result = runner.invoke(app, ["--verbose", "--quiet", "watch"])
assert result.exit_code != EXIT_OK
@ -94,9 +92,7 @@ class TestConfigOption:
# We just verify it mentions the missing file's name.
assert "nope.yaml" in result.output
def test_invalid_config_yaml_structure(
self, runner: CliRunner, tmp_path: Path
) -> None:
def test_invalid_config_yaml_structure(self, runner: CliRunner, tmp_path: Path) -> None:
path = tmp_path / "bad.yaml"
path.write_text("- not a mapping\n- another item\n")
result = runner.invoke(app, ["--config", str(path), "watch"])
@ -111,9 +107,7 @@ class TestConfigOption:
assert result.exit_code == EXIT_USAGE
assert "invalid config" in result.output.lower()
def test_valid_config_reaches_subcommand(
self, runner: CliRunner, tmp_path: Path
) -> None:
def test_valid_config_reaches_subcommand(self, runner: CliRunner, tmp_path: Path) -> None:
# A valid config should flow through the callback and into the
# subcommand. For ``watch``, the subcommand will then fail on the
# model load (NotImplementedError from the commit-11 stub), which
@ -158,9 +152,7 @@ class TestWatch:
class TestProcess:
def test_missing_video_exits_usage(
self, runner: CliRunner, tmp_path: Path
) -> None:
def test_missing_video_exits_usage(self, runner: CliRunner, tmp_path: Path) -> None:
result = runner.invoke(app, ["process", str(tmp_path / "nope.mp4")])
# click's path existence check fires before our callback, so the
# exit is a usage error.

View File

@ -77,7 +77,7 @@ class TestValidation:
def test_extra_fields_rejected(self, xdg_home: Path) -> None:
with pytest.raises(ValidationError):
Settings(nonexistent_field=True)
Settings(nonexistent_field=True) # type: ignore[call-arg]
class TestYamlLoad:
@ -85,9 +85,7 @@ class TestYamlLoad:
def test_valid(self, tmp_path: Path, xdg_home: Path) -> None:
config_path = tmp_path / "config.yaml"
config_path.write_text(
yaml.safe_dump({"device": "/GPU:0", "poll_interval_seconds": 30})
)
config_path.write_text(yaml.safe_dump({"device": "/GPU:0", "poll_interval_seconds": 30}))
settings = Settings.from_yaml(config_path)
assert settings.device == "/GPU:0"
assert settings.poll_interval_seconds == 30

View File

@ -30,7 +30,6 @@ from neuropose.io import (
save_status,
)
# ---------------------------------------------------------------------------
# Stubs and helpers
# ---------------------------------------------------------------------------
@ -133,9 +132,7 @@ class TestDiscoverNewJobs:
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
assert interfacer._discover_new_jobs(StatusFile(root={})) == []
def test_skips_empty_directories_silently(
self, tmp_path: Path, fake_metrabs_model
) -> None:
def test_skips_empty_directories_silently(self, tmp_path: Path, fake_metrabs_model) -> None:
settings = _make_settings(tmp_path)
settings.ensure_dirs()
(settings.input_dir / "empty_job").mkdir()
@ -179,9 +176,7 @@ class TestDiscoverNewJobs:
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
assert interfacer._discover_new_jobs(status) == ["job_b"]
def test_dir_with_non_video_files_is_returned(
self, tmp_path: Path, fake_metrabs_model
) -> None:
def test_dir_with_non_video_files_is_returned(self, tmp_path: Path, fake_metrabs_model) -> None:
# Dirs that contain files but no *videos* are NOT silently skipped
# — they should be returned so process_job marks them failed.
settings = _make_settings(tmp_path)
@ -286,7 +281,7 @@ class TestProcessJobFailure:
) -> None:
settings = _make_settings(tmp_path)
_prepare_job(settings, "job_a", videos=[synthetic_video])
interfacer = Interfacer(settings, _RaisingEstimator())
interfacer = Interfacer(settings, _RaisingEstimator()) # type: ignore[arg-type]
entry = interfacer.process_job("job_a")
@ -297,9 +292,7 @@ class TestProcessJobFailure:
assert not (settings.input_dir / "job_a").exists()
assert (settings.failed_dir / "job_a").exists()
def test_quarantine_collision_suffixes(
self, tmp_path: Path, fake_metrabs_model
) -> None:
def test_quarantine_collision_suffixes(self, tmp_path: Path, fake_metrabs_model) -> None:
settings = _make_settings(tmp_path)
settings.ensure_dirs()
# Pre-populate failed_dir with an existing entry for "job_a".
@ -324,7 +317,7 @@ class TestProcessJobFailure:
_prepare_job(settings, "job_a", videos=[synthetic_video])
interfacer = Interfacer(
settings,
_RaisingEstimator(exc=JobProcessingError("custom boom")),
_RaisingEstimator(exc=JobProcessingError("custom boom")), # type: ignore[arg-type]
)
entry = interfacer.process_job("job_a")
@ -369,9 +362,7 @@ class TestRecoverStuckJobs:
assert not (settings.input_dir / "job_a").exists()
assert (settings.failed_dir / "job_a").exists()
def test_does_not_touch_completed_entries(
self, tmp_path: Path, fake_metrabs_model
) -> None:
def test_does_not_touch_completed_entries(self, tmp_path: Path, fake_metrabs_model) -> None:
settings = _make_settings(tmp_path)
settings.ensure_dirs()
completed = datetime(2026, 4, 13, 10, 0, 0, tzinfo=UTC)
@ -400,9 +391,7 @@ class TestRecoverStuckJobs:
assert loaded.root["job_b"].status == JobStatus.FAILED
assert loaded.root["job_b"].error == "old failure"
def test_no_status_file_is_noop(
self, tmp_path: Path, fake_metrabs_model
) -> None:
def test_no_status_file_is_noop(self, tmp_path: Path, fake_metrabs_model) -> None:
settings = _make_settings(tmp_path)
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
# Must not raise even though status_file does not exist.
@ -476,9 +465,7 @@ class TestRunOnce:
class TestLock:
def test_first_acquire_succeeds(
self, tmp_path: Path, fake_metrabs_model
) -> None:
def test_first_acquire_succeeds(self, tmp_path: Path, fake_metrabs_model) -> None:
settings = _make_settings(tmp_path)
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
try:
@ -500,9 +487,7 @@ class TestLock:
finally:
first._release_lock()
def test_release_allows_subsequent_acquire(
self, tmp_path: Path, fake_metrabs_model
) -> None:
def test_release_allows_subsequent_acquire(self, tmp_path: Path, fake_metrabs_model) -> None:
settings = _make_settings(tmp_path)
first = Interfacer(settings, Estimator(model=fake_metrabs_model))
first._acquire_lock()
@ -514,9 +499,7 @@ class TestLock:
finally:
second._release_lock()
def test_lock_file_contains_pid(
self, tmp_path: Path, fake_metrabs_model
) -> None:
def test_lock_file_contains_pid(self, tmp_path: Path, fake_metrabs_model) -> None:
settings = _make_settings(tmp_path)
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
try:
@ -536,9 +519,7 @@ class TestLock:
class TestInterruptibleSleep:
def test_zero_returns_immediately(
self, tmp_path: Path, fake_metrabs_model
) -> None:
def test_zero_returns_immediately(self, tmp_path: Path, fake_metrabs_model) -> None:
settings = _make_settings(tmp_path)
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
import time
@ -548,9 +529,7 @@ class TestInterruptibleSleep:
elapsed = time.monotonic() - start
assert elapsed < 0.1
def test_stop_flag_wakes_sleep_early(
self, tmp_path: Path, fake_metrabs_model
) -> None:
def test_stop_flag_wakes_sleep_early(self, tmp_path: Path, fake_metrabs_model) -> None:
settings = _make_settings(tmp_path)
interfacer = Interfacer(settings, Estimator(model=fake_metrabs_model))
interfacer.stop()

View File

@ -24,7 +24,6 @@ from neuropose.io import (
save_video_predictions,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@ -107,7 +106,7 @@ class TestVideoMetadata:
fps=30.0,
width=640,
height=480,
source_path="/leak/me",
source_path="/leak/me", # type: ignore[call-arg]
)
def test_is_frozen(self) -> None:

View File

@ -36,9 +36,7 @@ class TestVisualizePredictions:
predictions_for_synthetic: VideoPredictions,
) -> None:
output_dir = tmp_path / "viz"
written = visualize_predictions(
synthetic_video, predictions_for_synthetic, output_dir
)
written = visualize_predictions(synthetic_video, predictions_for_synthetic, output_dir)
assert len(written) == 5
for path in written:
assert path.exists()