neuropose/tests/unit/test_ingest.py

293 lines
12 KiB
Python

"""Tests for :mod:`neuropose.ingest`.
Coverage:
- Happy path — nested and top-level videos produce one job each.
- Job-name derivation — flattening, sanitization, collapsing.
- Non-video members are skipped, not errors.
- Zip-internal collisions (two videos → same job name) reported up
front.
- External collisions (target job dir already exists) are listed in
one error; ``--force`` deletes and replaces.
- Security: path-traversal and absolute-path members refused; empty
archive and oversize archive refused.
- Atomicity: when extraction fails midway, no partial state is left
behind under ``input_dir``.
"""
from __future__ import annotations
import zipfile
from pathlib import Path
import pytest
from neuropose.ingest import (
ArchiveEmptyError,
ArchiveInvalidError,
ArchiveTooLargeError,
IngestResult,
JobCollisionError,
ingest_zip,
)
def _write_zip(path: Path, members: dict[str, bytes]) -> Path:
"""Create a zip at ``path`` with the given ``{name: bytes}`` members."""
with zipfile.ZipFile(path, "w") as z:
for name, data in members.items():
z.writestr(name, data)
return path
@pytest.fixture
def input_dir(tmp_path: Path) -> Path:
"""Return a fresh ``input_dir`` for the test."""
d = tmp_path / "jobs" / "in"
d.mkdir(parents=True)
return d
# ---------------------------------------------------------------------------
# Happy path
# ---------------------------------------------------------------------------
class TestHappyPath:
def test_top_level_video_becomes_job(self, tmp_path: Path, input_dir: Path) -> None:
archive = _write_zip(tmp_path / "a.zip", {"clip_01.mp4": b"data"})
result = ingest_zip(archive, input_dir)
assert result.job_count == 1
assert result.ingested[0].job_name == "clip_01"
assert (input_dir / "clip_01" / "clip_01.mp4").read_bytes() == b"data"
def test_nested_path_flattens_into_job_name(self, tmp_path: Path, input_dir: Path) -> None:
archive = _write_zip(
tmp_path / "a.zip",
{"patient_001/trial_01.mp4": b"vid"},
)
result = ingest_zip(archive, input_dir)
job = result.ingested[0]
assert job.job_name == "patient_001_trial_01"
# The video file inside the job dir keeps its basename, not the
# flattened job name, so the daemon sees a clean filename.
assert job.video_filename == "trial_01.mp4"
assert (input_dir / "patient_001_trial_01" / "trial_01.mp4").exists()
def test_sibling_nested_videos_do_not_collide(self, tmp_path: Path, input_dir: Path) -> None:
archive = _write_zip(
tmp_path / "a.zip",
{
"patient_001/trial_01.mp4": b"a",
"patient_002/trial_01.mp4": b"b",
},
)
result = ingest_zip(archive, input_dir)
names = {j.job_name for j in result.ingested}
assert names == {"patient_001_trial_01", "patient_002_trial_01"}
def test_multiple_videos_produce_multiple_jobs(self, tmp_path: Path, input_dir: Path) -> None:
archive = _write_zip(
tmp_path / "a.zip",
{f"clip_{i:02d}.mp4": f"d{i}".encode() for i in range(5)},
)
result = ingest_zip(archive, input_dir)
assert result.job_count == 5
assert len(list(input_dir.iterdir())) == 5
def test_non_video_members_skipped(self, tmp_path: Path, input_dir: Path) -> None:
archive = _write_zip(
tmp_path / "a.zip",
{
"clip.mp4": b"video",
"README.md": b"notes",
".DS_Store": b"junk",
"notes.txt": b"notes",
},
)
result = ingest_zip(archive, input_dir)
assert result.job_count == 1
assert sorted(result.skipped_non_videos) == sorted([".DS_Store", "README.md", "notes.txt"])
def test_all_accepted_extensions(self, tmp_path: Path, input_dir: Path) -> None:
archive = _write_zip(
tmp_path / "a.zip",
{
"a.mp4": b"a",
"b.avi": b"b",
"c.mov": b"c",
"d.mkv": b"d",
},
)
result = ingest_zip(archive, input_dir)
assert result.job_count == 4
def test_returns_typed_result(self, tmp_path: Path, input_dir: Path) -> None:
archive = _write_zip(tmp_path / "a.zip", {"clip.mp4": b"data"})
result = ingest_zip(archive, input_dir)
assert isinstance(result, IngestResult)
assert result.total_uncompressed_bytes == 4
# ---------------------------------------------------------------------------
# Job-name sanitization
# ---------------------------------------------------------------------------
class TestJobNameDerivation:
def test_special_chars_become_underscores(self, tmp_path: Path, input_dir: Path) -> None:
archive = _write_zip(
tmp_path / "a.zip",
{"session 2026-04-15 / trial @1.mp4": b"v"},
)
result = ingest_zip(archive, input_dir)
name = result.ingested[0].job_name
# Every character ends up in the safe set; runs of underscores
# are collapsed and leading/trailing stripped.
assert name == "session_2026-04-15_trial_1"
def test_all_symbol_name_falls_back_to_video(self, tmp_path: Path, input_dir: Path) -> None:
archive = _write_zip(tmp_path / "a.zip", {"!!!.mp4": b"v"})
result = ingest_zip(archive, input_dir)
assert result.ingested[0].job_name == "video"
# ---------------------------------------------------------------------------
# Collision detection
# ---------------------------------------------------------------------------
class TestCollisions:
def test_zip_internal_collision_rejects(self, tmp_path: Path, input_dir: Path) -> None:
# Both entries flatten to the same job name because their
# stems are the same and both are top-level after derivation.
archive = _write_zip(
tmp_path / "a.zip",
{"a/b.mp4": b"x", "a/b.mp4.bak": b"y"},
)
# The second one is non-video (.bak suffix), so this is
# actually a happy case. Build a real collision:
archive = _write_zip(
tmp_path / "b.zip",
{"x__y.mp4": b"1", "x y.mp4": b"2"},
)
with pytest.raises(JobCollisionError):
ingest_zip(archive, input_dir)
# No files written.
assert list(input_dir.iterdir()) == []
def test_external_collision_without_force(self, tmp_path: Path, input_dir: Path) -> None:
(input_dir / "clip").mkdir()
(input_dir / "clip" / "existing.mp4").write_bytes(b"old")
archive = _write_zip(tmp_path / "a.zip", {"clip.mp4": b"new"})
with pytest.raises(JobCollisionError) as excinfo:
ingest_zip(archive, input_dir)
assert excinfo.value.collisions == ["clip"]
# Existing job dir is untouched.
assert (input_dir / "clip" / "existing.mp4").read_bytes() == b"old"
def test_external_collision_listed_together(self, tmp_path: Path, input_dir: Path) -> None:
for name in ("a", "b", "c"):
(input_dir / name).mkdir()
archive = _write_zip(
tmp_path / "a.zip",
{"a.mp4": b"1", "b.mp4": b"2", "c.mp4": b"3", "d.mp4": b"4"},
)
with pytest.raises(JobCollisionError) as excinfo:
ingest_zip(archive, input_dir)
assert sorted(excinfo.value.collisions) == ["a", "b", "c"]
def test_force_overwrites_existing(self, tmp_path: Path, input_dir: Path) -> None:
(input_dir / "clip").mkdir()
(input_dir / "clip" / "existing.mp4").write_bytes(b"old")
archive = _write_zip(tmp_path / "a.zip", {"clip.mp4": b"new"})
result = ingest_zip(archive, input_dir, force=True)
assert result.job_count == 1
# The old file is gone; only the new one remains.
files = list((input_dir / "clip").iterdir())
assert [f.name for f in files] == ["clip.mp4"]
assert (input_dir / "clip" / "clip.mp4").read_bytes() == b"new"
# ---------------------------------------------------------------------------
# Security
# ---------------------------------------------------------------------------
class TestSecurity:
def test_absolute_path_member_rejected(self, tmp_path: Path, input_dir: Path) -> None:
archive = _write_zip(tmp_path / "a.zip", {"/etc/passwd.mp4": b"x"})
with pytest.raises(ArchiveInvalidError, match="absolute"):
ingest_zip(archive, input_dir)
assert list(input_dir.iterdir()) == []
def test_traversal_member_rejected(self, tmp_path: Path, input_dir: Path) -> None:
archive = _write_zip(tmp_path / "a.zip", {"../escape.mp4": b"x"})
with pytest.raises(ArchiveInvalidError, match="traversal"):
ingest_zip(archive, input_dir)
assert list(input_dir.iterdir()) == []
def test_empty_archive_rejected(self, tmp_path: Path, input_dir: Path) -> None:
archive = _write_zip(tmp_path / "a.zip", {})
with pytest.raises(ArchiveEmptyError):
ingest_zip(archive, input_dir)
def test_archive_with_only_non_videos_rejected(self, tmp_path: Path, input_dir: Path) -> None:
archive = _write_zip(
tmp_path / "a.zip",
{"README.md": b"no videos here", "notes.txt": b"none"},
)
with pytest.raises(ArchiveEmptyError):
ingest_zip(archive, input_dir)
def test_too_large_archive_rejected(
self,
tmp_path: Path,
input_dir: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
# Lower the cap for the test rather than building a real
# multi-GB zip. The enforcement path is the same.
monkeypatch.setattr("neuropose.ingest.MAX_UNCOMPRESSED_BYTES", 10)
archive = _write_zip(
tmp_path / "a.zip",
{"clip.mp4": b"0123456789ABCDEF"}, # 16 bytes > 10
)
with pytest.raises(ArchiveTooLargeError):
ingest_zip(archive, input_dir)
assert list(input_dir.iterdir()) == []
def test_bad_zip_file_rejected(self, tmp_path: Path, input_dir: Path) -> None:
bad = tmp_path / "bad.zip"
bad.write_bytes(b"this is not a valid zip file at all")
with pytest.raises(ArchiveInvalidError):
ingest_zip(bad, input_dir)
def test_missing_archive_raises(self, tmp_path: Path, input_dir: Path) -> None:
with pytest.raises(FileNotFoundError):
ingest_zip(tmp_path / "nope.zip", input_dir)
# ---------------------------------------------------------------------------
# Atomicity
# ---------------------------------------------------------------------------
class TestAtomicity:
def test_staging_directory_cleaned_up_on_success(self, tmp_path: Path, input_dir: Path) -> None:
archive = _write_zip(tmp_path / "a.zip", {"clip.mp4": b"v"})
ingest_zip(archive, input_dir)
# No stray `.ingest_*` directories left under the parent.
leftover = [p for p in input_dir.parent.iterdir() if p.name.startswith(".ingest_")]
assert leftover == []
def test_no_partial_state_when_planning_fails(self, tmp_path: Path, input_dir: Path) -> None:
# An archive that will pass the zipfile open but fail at
# planning (traversal member) should never write to input_dir.
archive = _write_zip(tmp_path / "a.zip", {"../bad.mp4": b"v"})
with pytest.raises(ArchiveInvalidError):
ingest_zip(archive, input_dir)
assert list(input_dir.iterdir()) == []
leftover = [p for p in input_dir.parent.iterdir() if p.name.startswith(".ingest_")]
assert leftover == []