add neuropose reset subcommand for pipeline-wide state wipe

Three-layer module: find_neuropose_processes() scans the process
table via psutil for running watch/serve instances; terminate_processes()
SIGINTs with a configurable grace period before optional SIGKILL
escalation; wipe_state() clears $data_dir/in/, out/, failed/,
the .neuropose.lock file, and leftover .ingest_<uuid>/ staging dirs
while preserving the container directories themselves. reset_pipeline()
composes the three and refuses to wipe while any process survives
termination.

CLI wraps it with --yes/-y, --keep-failed, --force-kill,
--grace-seconds, and --dry-run/-n. Always prints a preview before
prompting; returns EXIT_USAGE=2 when survivors block the wipe.

Unblocks the Mac benchmark iteration loop where partially-complete
runs need to be cleared between experiments.
This commit is contained in:
Levi Neuwirth 2026-04-18 17:15:24 -04:00
parent cc9fcb4adb
commit bcce5315be
6 changed files with 1030 additions and 2 deletions

View File

@ -256,6 +256,28 @@ be split into per-release sections once tagging begins.
now returns a `LoadedModel` dataclass bundling the TF handle with
the pinned SHA and filename so the estimator can build the
`Provenance` without re-hashing the tarball.
- **`neuropose.reset`** — pipeline-wide reset utility for the
benchmark / iteration loop. `find_neuropose_processes()` scans the
OS process table (via `psutil`) for running `neuropose watch` and
`neuropose serve` instances and classifies each as `daemon` or
`monitor`. `terminate_processes()` SIGINTs them, polls for graceful
exit up to a configurable grace period, and optionally escalates
to SIGKILL with `force_kill=True`. `wipe_state()` removes the
contents of `$data_dir/in/`, `$data_dir/out/` (including
`status.json`), `$data_dir/failed/` (unless `keep_failed=True`),
the `.neuropose.lock` file, and any leftover `.ingest_<uuid>/`
staging dirs from interrupted ingests; container directories
themselves are preserved so the daemon does not need to recreate
them on next startup. `reset_pipeline()` composes the three with
one safety guard: if any process survives termination, the wipe
phase is skipped and the returned `ResetReport` flags
`wipe_skipped_due_to_survivors`, because removing `$data_dir`
out from under an active daemon would corrupt its in-flight
writes. Surfaced as `neuropose reset` in the CLI with
`--yes/-y`, `--keep-failed`, `--force-kill`, `--grace-seconds`,
and `--dry-run/-n` flags; the command always prints a preview
before prompting (skipped under `--yes`) and returns
`EXIT_USAGE=2` when survivors block the wipe.
- **`neuropose.benchmark`** — multi-pass inference benchmarking for
a single video. `run_benchmark()` runs `process_video` N times
(default 5), always discards the first pass as warmup (graph
@ -304,7 +326,7 @@ be split into per-release sections once tagging begins.
`slow`) loads the real model and asserts the constant still
matches, so any upstream skeleton drift fails CI.
- **`neuropose.cli`** — Typer-based command-line interface with
seven subcommands: `watch` (run the daemon), `process <video>`
eight subcommands: `watch` (run the daemon), `process <video>`
(run the estimator on a single video), `ingest <archive>` (unzip
a video archive into per-video job directories under
`$data_dir/in/` with validation-before-write and atomic
@ -315,6 +337,13 @@ be split into per-release sections once tagging begins.
KeyboardInterrupt exits with the standard shell-interruption
code and an `OSError` at bind time is translated to a clean
usage error with the bind target in the message),
`reset` (stop the daemon and monitor, then wipe pipeline state
for a clean restart — wraps `neuropose.reset` with a confirmation
prompt, `--dry-run` preview, `--keep-failed` to preserve the
forensic quarantine, `--force-kill` to escalate to SIGKILL after
the SIGINT grace period, and `--grace-seconds` to tune the wait;
refuses to wipe state while any process survives termination so
active writes cannot be corrupted),
`segment <results>` (post-hoc repetition segmentation — loads a
JobResults or a single VideoPredictions, runs
`neuropose.analyzer.segment.segment_predictions` with the chosen

3
docs/api/reset.md Normal file
View File

@ -0,0 +1,3 @@
# `neuropose.reset`
::: neuropose.reset

View File

@ -95,6 +95,7 @@ nav:
- neuropose.interfacer: api/interfacer.md
- neuropose.ingest: api/ingest.md
- neuropose.monitor: api/monitor.md
- neuropose.reset: api/reset.md
- neuropose.io: api/io.md
- neuropose.migrations: api/migrations.md
- neuropose.benchmark: api/benchmark.md

View File

@ -1,6 +1,6 @@
"""NeuroPose command-line interface.
Seven subcommands:
Eight subcommands:
- ``neuropose watch`` run the :class:`~neuropose.interfacer.Interfacer`
daemon against the configured input directory.
@ -12,6 +12,10 @@ Seven subcommands:
- ``neuropose serve`` start the :mod:`~neuropose.monitor` localhost
HTTP dashboard so collaborators can watch a run's progress in a
browser or via ``curl``.
- ``neuropose reset`` stop the daemon and monitor, then wipe pipeline
state (input queue, results, status file, lock file, ingest staging
dirs) for a clean restart. See :mod:`neuropose.reset` for the layered
implementation.
- ``neuropose segment <results>`` post-hoc repetition segmentation of
an existing predictions file. Attaches a named
:class:`~neuropose.io.Segmentation` to every video it contains and
@ -426,6 +430,156 @@ def serve(
raise typer.Exit(code=EXIT_USAGE) from exc
# ---------------------------------------------------------------------------
# reset
# ---------------------------------------------------------------------------
@app.command()
def reset(
ctx: typer.Context,
yes: Annotated[
bool,
typer.Option(
"--yes",
"-y",
help="Skip the interactive confirmation prompt.",
),
] = False,
keep_failed: Annotated[
bool,
typer.Option(
"--keep-failed",
help=(
"Preserve $data_dir/failed/ for forensic review. By "
"default the failed-job quarantine is wiped along with "
"in/ and out/."
),
),
] = False,
force_kill: Annotated[
bool,
typer.Option(
"--force-kill",
help=(
"Escalate to SIGKILL on any daemon or monitor still "
"alive after the SIGINT grace period. Necessary if the "
"daemon is mid-inference on a long video and you do "
"not want to wait for the current video to finish."
),
),
] = False,
grace_seconds: Annotated[
float,
typer.Option(
"--grace-seconds",
min=0.0,
help=(
"Seconds to wait after SIGINT before declaring a "
"process a survivor (or escalating to SIGKILL when "
"--force-kill is set)."
),
),
] = 10.0,
dry_run: Annotated[
bool,
typer.Option(
"--dry-run",
"-n",
help="Show what would be killed and removed without doing it.",
),
] = False,
) -> None:
"""Stop the daemon and monitor, then wipe pipeline state.
Discovers running ``neuropose watch`` and ``neuropose serve``
processes, sends SIGINT, waits ``--grace-seconds`` for graceful
shutdown (optionally escalating to SIGKILL with ``--force-kill``),
then removes the contents of ``$data_dir/in/``, ``$data_dir/out/``
(including ``status.json``), ``$data_dir/failed/`` (unless
``--keep-failed``), the daemon lock file, and any leftover
``.ingest_<uuid>/`` staging directories from interrupted ingests.
Refuses to wipe state if any process survives the termination
phase wiping the data directory out from under an active daemon
would leave it writing into deleted directory entries. Re-run
with ``--force-kill`` or stop the survivor manually.
"""
# Deferred import so reset's psutil scan stays off the watch/process
# hot path. psutil is already a runtime dependency for benchmark
# metrics, so this import is free at install time.
from neuropose.reset import find_neuropose_processes, reset_pipeline, wipe_state
settings: Settings = ctx.obj
discovered = find_neuropose_processes()
preview = wipe_state(settings, keep_failed=keep_failed, dry_run=True)
typer.echo(f"data dir: {settings.data_dir}")
if discovered:
typer.echo(f"would stop: {len(discovered)} process(es)")
for rp in discovered:
typer.echo(f" pid {rp.pid:>7} {rp.role:<7} {rp.cmdline}")
else:
typer.echo("would stop: no daemon or monitor running")
if preview.removed_paths:
size_mb = preview.bytes_freed / (1024 * 1024)
typer.echo(
f"would remove: {len(preview.removed_paths)} path(s) ({size_mb:.1f} MB)"
)
for path in preview.removed_paths:
typer.echo(f" {path}")
else:
typer.echo("would remove: nothing — data dir is already clean")
if dry_run:
typer.echo("(dry-run; no changes made)")
return
if not discovered and not preview.removed_paths:
typer.echo("nothing to do.")
return
if not yes and not typer.confirm("\nproceed?"):
typer.echo("aborted.")
raise typer.Exit(code=EXIT_USAGE)
report = reset_pipeline(
settings,
grace_seconds=grace_seconds,
force_kill=force_kill,
keep_failed=keep_failed,
)
if report.termination.stopped:
typer.echo(f"stopped {len(report.termination.stopped)} process(es) via SIGINT")
if report.termination.force_killed:
typer.echo(
f"force-killed {len(report.termination.force_killed)} process(es) "
f"after {grace_seconds:.0f}s grace period"
)
if report.termination.survivors:
typer.echo(
f"error: {len(report.termination.survivors)} process(es) did not exit:",
err=True,
)
for rp in report.termination.survivors:
typer.echo(f" pid {rp.pid} ({rp.role})", err=True)
if report.wipe_skipped_due_to_survivors:
typer.echo(
" state on disk was NOT wiped — re-run with --force-kill, "
"or stop these processes manually first.",
err=True,
)
raise typer.Exit(code=EXIT_USAGE)
size_mb = report.wipe.bytes_freed / (1024 * 1024)
typer.echo(
f"removed {len(report.wipe.removed_paths)} path(s) "
f"({size_mb:.1f} MB freed)"
)
# ---------------------------------------------------------------------------
# segment
# ---------------------------------------------------------------------------

388
src/neuropose/reset.py Normal file
View File

@ -0,0 +1,388 @@
"""Pipeline-wide reset utility.
Tear down a running NeuroPose deployment back to a clean state: stop
the watch daemon and the monitor server, then wipe the job queue,
results, status file, and ingest staging directories. Intended for
the rapid iteration loop that comes up during benchmark and validation
work, where you want an empty ``$data_dir/in`` without manually
``rm -rf``-ing five separate paths and pkill'ing two processes.
The module is split into three independently-callable layers so each
piece is testable in isolation and reusable from non-CLI contexts:
- :func:`find_neuropose_processes` enumerates running ``neuropose
watch`` and ``neuropose serve`` processes by scanning the OS process
table. Pure read; no side effects.
- :func:`terminate_processes` signals the discovered processes (SIGINT
first, optionally SIGKILL after a grace period) and reports
survivors.
- :func:`wipe_state` removes the data-directory paths that the daemon
and monitor produce. Idempotent; safe against a fresh install.
The top-level :func:`reset_pipeline` orchestrates all three and
returns a :class:`ResetReport` summarizing what happened.
Safety
------
:func:`reset_pipeline` refuses to wipe state while *any* discovered
process is still alive after the termination phase. Wiping
``$data_dir`` out from under an active daemon would leave the daemon
writing into deleted directory entries a guaranteed mess. Callers
that hit a survivor must either raise the grace period, opt into
``force_kill=True`` (SIGKILL), or kill the survivor manually before
re-running.
"""
from __future__ import annotations
import contextlib
import logging
import os
import shutil
import signal
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Literal
import psutil
from neuropose.config import Settings
from neuropose.interfacer import LOCK_FILENAME
logger = logging.getLogger(__name__)
# Cmdline substrings that identify the daemon and monitor.
# Matched against the joined argv so ``uv run neuropose watch -v`` and
# ``python -m neuropose watch`` both hit. The substring choice
# deliberately includes the subcommand name so a generic
# ``neuropose --help`` shell doesn't get caught.
_DAEMON_MARKER = "neuropose watch"
_MONITOR_MARKER = "neuropose serve"
DEFAULT_GRACE_SECONDS = 10.0
"""How long :func:`terminate_processes` waits after SIGINT before
declaring a process a survivor (or escalating to SIGKILL when
``force_kill`` is set). Long enough for an idle daemon to finish its
current poll iteration; short enough that an interactive ``reset``
invocation doesn't feel hung. Override per-call when waiting on a
multi-minute inference."""
_POLL_INTERVAL_SECONDS = 0.2
ProcessRole = Literal["daemon", "monitor"]
@dataclass(frozen=True)
class RunningProcess:
"""A neuropose process discovered in the OS process table."""
pid: int
role: ProcessRole
cmdline: str
@dataclass
class TerminationReport:
"""Outcome of trying to stop a set of running processes."""
stopped: list[RunningProcess] = field(default_factory=list)
survivors: list[RunningProcess] = field(default_factory=list)
force_killed: list[RunningProcess] = field(default_factory=list)
@dataclass
class WipeReport:
"""Outcome of wiping data-directory state."""
removed_paths: list[Path] = field(default_factory=list)
bytes_freed: int = 0
@dataclass
class ResetReport:
"""Aggregate report from a full pipeline reset."""
discovered: list[RunningProcess]
termination: TerminationReport
wipe: WipeReport
dry_run: bool
wipe_skipped_due_to_survivors: bool = False
def find_neuropose_processes(*, exclude_self: bool = True) -> list[RunningProcess]:
"""Scan the process table for ``neuropose watch`` / ``neuropose serve``.
Parameters
----------
exclude_self
Skip the current process. The ``neuropose reset`` command
itself has ``"neuropose"`` in its argv and would otherwise see
itself in the result. Set to ``False`` only in tests where the
caller has constructed a process table that should match
verbatim.
Returns
-------
list[RunningProcess]
Processes whose joined argv contains either marker substring.
Empty list when nothing matches.
"""
self_pid = os.getpid()
found: list[RunningProcess] = []
for proc in psutil.process_iter(["pid", "cmdline"]):
try:
pid = proc.info["pid"]
cmdline = proc.info["cmdline"] or []
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
if exclude_self and pid == self_pid:
continue
joined = " ".join(cmdline)
# Daemon check first because "neuropose serve" and
# "neuropose watch" cannot both appear in a single process.
if _DAEMON_MARKER in joined:
found.append(RunningProcess(pid=pid, role="daemon", cmdline=joined))
elif _MONITOR_MARKER in joined:
found.append(RunningProcess(pid=pid, role="monitor", cmdline=joined))
return found
def terminate_processes(
processes: list[RunningProcess],
*,
grace_seconds: float = DEFAULT_GRACE_SECONDS,
force_kill: bool = False,
) -> TerminationReport:
"""Send SIGINT to each process; optionally escalate to SIGKILL.
Parameters
----------
processes
The processes to stop. Pass an empty list for a no-op.
grace_seconds
Maximum time to wait for processes to exit after SIGINT.
force_kill
When ``True``, any process still alive after ``grace_seconds``
is sent SIGKILL. When ``False``, survivors are reported back
to the caller untouched.
Notes
-----
SIGTERM is *not* used as an intermediate escalation step. The
interfacer's signal handler treats SIGINT and SIGTERM identically
(both call :meth:`Interfacer.stop`), so SIGTERM accomplishes
nothing that SIGINT did not already attempt. The only escalation
that actually forces a stuck daemon to exit is SIGKILL, which
bypasses the handler entirely.
"""
report = TerminationReport()
if not processes:
return report
for rp in processes:
with contextlib.suppress(ProcessLookupError, PermissionError):
os.kill(rp.pid, signal.SIGINT)
logger.info("sent SIGINT to pid %d (%s)", rp.pid, rp.role)
survivors = _wait_for_exit(processes, grace_seconds)
stopped = [p for p in processes if p not in survivors]
report.stopped.extend(stopped)
if not survivors:
return report
if not force_kill:
report.survivors.extend(survivors)
return report
for rp in survivors:
with contextlib.suppress(ProcessLookupError, PermissionError):
os.kill(rp.pid, signal.SIGKILL)
logger.warning("escalated to SIGKILL for pid %d (%s)", rp.pid, rp.role)
# SIGKILL is delivered synchronously enough that a short final
# poll is sufficient — any remaining "survivor" at this point is
# a permission error or a kernel-side hang, not graceful shutdown.
final_survivors = _wait_for_exit(survivors, grace_seconds=2.0)
killed = [p for p in survivors if p not in final_survivors]
report.force_killed.extend(killed)
report.survivors.extend(final_survivors)
return report
def _wait_for_exit(
processes: list[RunningProcess],
grace_seconds: float,
) -> list[RunningProcess]:
"""Poll until every process exits or the deadline passes."""
deadline = time.monotonic() + grace_seconds
while time.monotonic() < deadline:
survivors = [p for p in processes if _is_alive(p.pid)]
if not survivors:
return []
time.sleep(_POLL_INTERVAL_SECONDS)
return [p for p in processes if _is_alive(p.pid)]
def _is_alive(pid: int) -> bool:
"""Return ``True`` if ``pid`` is still running and not a zombie."""
try:
proc = psutil.Process(pid)
except psutil.NoSuchProcess:
return False
try:
return proc.status() != psutil.STATUS_ZOMBIE
except psutil.NoSuchProcess:
return False
def wipe_state(
settings: Settings,
*,
keep_failed: bool = False,
dry_run: bool = False,
) -> WipeReport:
"""Remove data-directory paths produced by the daemon and monitor.
Removes the *contents* of ``in/``, ``out/``, and (unless
``keep_failed`` is set) ``failed/``, plus the daemon lock file and
any leftover ``.ingest_<uuid>/`` staging directories. The
container directories themselves are preserved so the daemon does
not need to recreate them on next startup.
Parameters
----------
settings
Resolved :class:`~neuropose.config.Settings`. Determines all
target paths via the ``input_dir`` / ``output_dir`` /
``failed_dir`` properties.
keep_failed
Preserve ``$data_dir/failed/`` for forensic review of past
crashes. Default removes it along with the rest of the
pipeline state.
dry_run
Compute the report without actually deleting anything. Useful
for previewing the blast radius before confirming a reset.
"""
report = WipeReport()
targets: list[Path] = []
if settings.input_dir.exists():
targets.extend(settings.input_dir.iterdir())
if settings.output_dir.exists():
targets.extend(settings.output_dir.iterdir())
if not keep_failed and settings.failed_dir.exists():
targets.extend(settings.failed_dir.iterdir())
lock_path = settings.data_dir / LOCK_FILENAME
if lock_path.exists():
targets.append(lock_path)
if settings.data_dir.exists():
targets.extend(settings.data_dir.glob(".ingest_*"))
for target in targets:
size = _path_size(target)
if not dry_run:
_remove(target)
report.removed_paths.append(target)
report.bytes_freed += size
return report
def _path_size(path: Path) -> int:
"""Return the cumulative size of ``path``, recursing into directories."""
if path.is_symlink() or path.is_file():
try:
return path.stat().st_size
except OSError:
return 0
total = 0
for sub in path.rglob("*"):
try:
if sub.is_file() and not sub.is_symlink():
total += sub.stat().st_size
except OSError:
continue
return total
def _remove(path: Path) -> None:
"""Remove ``path`` whether file, symlink, or directory."""
if path.is_dir() and not path.is_symlink():
shutil.rmtree(path)
else:
path.unlink()
def reset_pipeline(
settings: Settings,
*,
grace_seconds: float = DEFAULT_GRACE_SECONDS,
force_kill: bool = False,
keep_failed: bool = False,
dry_run: bool = False,
) -> ResetReport:
"""Stop daemon + monitor, then wipe pipeline state.
Composes :func:`find_neuropose_processes`,
:func:`terminate_processes`, and :func:`wipe_state` into a single
operation. The wipe phase is *skipped* if any process survives
termination wiping ``$data_dir`` out from under an active
daemon would corrupt its in-flight writes. The returned
:class:`ResetReport` flags this case via
``wipe_skipped_due_to_survivors``.
Parameters
----------
settings
Resolved :class:`~neuropose.config.Settings`.
grace_seconds
Maximum time to wait for SIGINT to take effect.
force_kill
Escalate to SIGKILL on any process still alive after
``grace_seconds``. Necessary when the daemon is mid-inference
on a long video and you do not want to wait for the current
video to finish.
keep_failed
Preserve ``$data_dir/failed/`` during the wipe.
dry_run
Discover and report without killing anything or removing any
paths. Termination phase is skipped entirely.
"""
discovered = find_neuropose_processes()
if dry_run:
wipe = wipe_state(settings, keep_failed=keep_failed, dry_run=True)
return ResetReport(
discovered=discovered,
termination=TerminationReport(),
wipe=wipe,
dry_run=True,
)
termination = terminate_processes(
discovered,
grace_seconds=grace_seconds,
force_kill=force_kill,
)
if termination.survivors:
return ResetReport(
discovered=discovered,
termination=termination,
wipe=WipeReport(),
dry_run=False,
wipe_skipped_due_to_survivors=True,
)
wipe = wipe_state(settings, keep_failed=keep_failed, dry_run=False)
return ResetReport(
discovered=discovered,
termination=termination,
wipe=wipe,
dry_run=False,
)

453
tests/unit/test_reset.py Normal file
View File

@ -0,0 +1,453 @@
"""Tests for :mod:`neuropose.reset` and the ``neuropose reset`` CLI command.
Coverage:
- :func:`find_neuropose_processes` filters by cmdline marker, classifies
daemon vs monitor, and excludes the calling process.
- :func:`terminate_processes` sends SIGINT, escalates to SIGKILL when
asked, and reports survivors.
- :func:`wipe_state` removes contents of in/, out/, failed/, the lock
file, and ``.ingest_*`` staging dirs; honors ``keep_failed`` and
``dry_run``.
- :func:`reset_pipeline` skips the wipe phase when termination leaves
survivors.
- The ``neuropose reset`` CLI command renders previews, honors
``--dry-run`` and ``--yes``, and exits non-zero on survivors.
The process-killing tests use monkeypatched ``psutil.process_iter``
and ``os.kill`` so the suite never touches the real process table or
sends real signals.
"""
from __future__ import annotations
import signal
from pathlib import Path
from typing import Any
import pytest
from typer.testing import CliRunner
from neuropose.cli import EXIT_OK, EXIT_USAGE, app
from neuropose.config import Settings
from neuropose.interfacer import LOCK_FILENAME
from neuropose.reset import (
DEFAULT_GRACE_SECONDS,
RunningProcess,
TerminationReport,
WipeReport,
find_neuropose_processes,
reset_pipeline,
terminate_processes,
wipe_state,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
class _FakeProc:
"""Minimal stand-in for ``psutil.Process`` for the discovery tests."""
def __init__(self, pid: int, cmdline: list[str]) -> None:
self.info = {"pid": pid, "cmdline": cmdline}
@pytest.fixture
def settings(tmp_path: Path) -> Settings:
"""A Settings pointing at an isolated temp data dir, with subdirs created."""
s = Settings(data_dir=tmp_path / "jobs", model_cache_dir=tmp_path / "models")
s.ensure_dirs()
return s
# ---------------------------------------------------------------------------
# find_neuropose_processes
# ---------------------------------------------------------------------------
class TestFindNeuroposeProcesses:
def test_classifies_watch_as_daemon(self, monkeypatch: pytest.MonkeyPatch) -> None:
procs = [_FakeProc(1234, ["python", "-m", "neuropose", "watch"])]
monkeypatch.setattr("psutil.process_iter", lambda attrs: iter(procs))
found = find_neuropose_processes()
assert len(found) == 1
assert found[0].pid == 1234
assert found[0].role == "daemon"
def test_classifies_serve_as_monitor(self, monkeypatch: pytest.MonkeyPatch) -> None:
procs = [_FakeProc(5678, ["uv", "run", "neuropose", "serve", "--port", "8765"])]
monkeypatch.setattr("psutil.process_iter", lambda attrs: iter(procs))
found = find_neuropose_processes()
assert len(found) == 1
assert found[0].role == "monitor"
def test_ignores_unrelated_processes(self, monkeypatch: pytest.MonkeyPatch) -> None:
procs = [
_FakeProc(1, ["bash"]),
_FakeProc(2, ["python", "-m", "pip", "install", "neuropose"]),
_FakeProc(3, ["neuropose", "--help"]),
]
monkeypatch.setattr("psutil.process_iter", lambda attrs: iter(procs))
assert find_neuropose_processes() == []
def test_excludes_self(self, monkeypatch: pytest.MonkeyPatch) -> None:
import os
self_pid = os.getpid()
procs = [
_FakeProc(self_pid, ["python", "-m", "neuropose", "watch"]),
_FakeProc(9999, ["python", "-m", "neuropose", "watch"]),
]
monkeypatch.setattr("psutil.process_iter", lambda attrs: iter(procs))
found = find_neuropose_processes()
assert [rp.pid for rp in found] == [9999]
def test_includes_self_when_disabled(self, monkeypatch: pytest.MonkeyPatch) -> None:
import os
self_pid = os.getpid()
procs = [_FakeProc(self_pid, ["python", "-m", "neuropose", "watch"])]
monkeypatch.setattr("psutil.process_iter", lambda attrs: iter(procs))
found = find_neuropose_processes(exclude_self=False)
assert [rp.pid for rp in found] == [self_pid]
def test_handles_dead_processes(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""A NoSuchProcess raised mid-iteration must not crash the scan."""
import psutil
class _RaisingProc:
@property
def info(self) -> dict[str, Any]:
raise psutil.NoSuchProcess(pid=1)
procs = [
_RaisingProc(),
_FakeProc(2, ["python", "-m", "neuropose", "watch"]),
]
monkeypatch.setattr("psutil.process_iter", lambda attrs: iter(procs))
found = find_neuropose_processes()
assert [rp.pid for rp in found] == [2]
# ---------------------------------------------------------------------------
# terminate_processes
# ---------------------------------------------------------------------------
class TestTerminateProcesses:
def test_empty_list_is_noop(self) -> None:
report = terminate_processes([])
assert report.stopped == []
assert report.survivors == []
assert report.force_killed == []
def test_sigint_to_each_process(self, monkeypatch: pytest.MonkeyPatch) -> None:
sent: list[tuple[int, int]] = []
monkeypatch.setattr("os.kill", lambda pid, sig: sent.append((pid, sig)))
# Mark all processes immediately dead so the wait loop exits fast.
monkeypatch.setattr("neuropose.reset._is_alive", lambda pid: False)
rps = [
RunningProcess(pid=10, role="daemon", cmdline="x"),
RunningProcess(pid=20, role="monitor", cmdline="y"),
]
report = terminate_processes(rps, grace_seconds=0.0)
assert sent == [(10, signal.SIGINT), (20, signal.SIGINT)]
assert {p.pid for p in report.stopped} == {10, 20}
assert report.survivors == []
def test_survivors_reported_when_force_kill_off(
self, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.setattr("os.kill", lambda pid, sig: None)
# Process 10 always alive; process 20 dies after SIGINT.
alive = {10}
monkeypatch.setattr("neuropose.reset._is_alive", lambda pid: pid in alive)
rps = [
RunningProcess(pid=10, role="daemon", cmdline="x"),
RunningProcess(pid=20, role="monitor", cmdline="y"),
]
# Drop pid 20 so it appears dead immediately.
alive.discard(20)
report = terminate_processes(rps, grace_seconds=0.0, force_kill=False)
assert {p.pid for p in report.stopped} == {20}
assert {p.pid for p in report.survivors} == {10}
assert report.force_killed == []
def test_force_kill_escalates_to_sigkill(self, monkeypatch: pytest.MonkeyPatch) -> None:
sent: list[tuple[int, int]] = []
monkeypatch.setattr("os.kill", lambda pid, sig: sent.append((pid, sig)))
# Start alive; SIGKILL "kills" by toggling the flag from inside _is_alive.
alive = {10}
def fake_is_alive(pid: int) -> bool:
if (pid, signal.SIGKILL) in sent:
return False
return pid in alive
monkeypatch.setattr("neuropose.reset._is_alive", fake_is_alive)
rp = RunningProcess(pid=10, role="daemon", cmdline="x")
report = terminate_processes([rp], grace_seconds=0.0, force_kill=True)
assert (10, signal.SIGINT) in sent
assert (10, signal.SIGKILL) in sent
assert {p.pid for p in report.force_killed} == {10}
assert report.survivors == []
# ---------------------------------------------------------------------------
# wipe_state
# ---------------------------------------------------------------------------
class TestWipeState:
def test_no_op_on_empty_dirs(self, settings: Settings) -> None:
report = wipe_state(settings)
assert report.removed_paths == []
assert report.bytes_freed == 0
def test_removes_in_out_failed_contents(self, settings: Settings) -> None:
(settings.input_dir / "job_a").mkdir()
(settings.input_dir / "job_a" / "video.mp4").write_bytes(b"x" * 100)
(settings.output_dir / "status.json").write_text("{}")
(settings.failed_dir / "job_b").mkdir()
report = wipe_state(settings)
names = {p.name for p in report.removed_paths}
assert names == {"job_a", "status.json", "job_b"}
# Containers themselves preserved.
assert settings.input_dir.exists()
assert settings.output_dir.exists()
assert settings.failed_dir.exists()
def test_keep_failed_preserves_failed_contents(self, settings: Settings) -> None:
(settings.input_dir / "job_a").mkdir()
(settings.failed_dir / "job_b").mkdir()
(settings.failed_dir / "job_b" / "evidence.log").write_text("crash")
report = wipe_state(settings, keep_failed=True)
names = {p.name for p in report.removed_paths}
assert "job_a" in names
assert "job_b" not in names
assert (settings.failed_dir / "job_b" / "evidence.log").exists()
def test_removes_lock_file(self, settings: Settings) -> None:
(settings.data_dir / LOCK_FILENAME).write_text("12345\n")
report = wipe_state(settings)
assert (settings.data_dir / LOCK_FILENAME) in report.removed_paths
assert not (settings.data_dir / LOCK_FILENAME).exists()
def test_removes_ingest_staging_dirs(self, settings: Settings) -> None:
staging_a = settings.data_dir / ".ingest_abc123"
staging_b = settings.data_dir / ".ingest_def456"
staging_a.mkdir()
staging_b.mkdir()
(staging_a / "leftover.mp4").write_bytes(b"y" * 50)
report = wipe_state(settings)
assert staging_a in report.removed_paths
assert staging_b in report.removed_paths
assert not staging_a.exists()
assert not staging_b.exists()
def test_dry_run_reports_without_removing(self, settings: Settings) -> None:
(settings.input_dir / "job_a").mkdir()
(settings.input_dir / "job_a" / "video.mp4").write_bytes(b"z" * 200)
report = wipe_state(settings, dry_run=True)
assert len(report.removed_paths) == 1
assert report.bytes_freed == 200
# Nothing actually deleted.
assert (settings.input_dir / "job_a" / "video.mp4").exists()
def test_bytes_freed_recurses_into_subdirs(self, settings: Settings) -> None:
job = settings.input_dir / "job_a"
job.mkdir()
(job / "a.mp4").write_bytes(b"a" * 100)
(job / "nested").mkdir()
(job / "nested" / "b.mp4").write_bytes(b"b" * 250)
report = wipe_state(settings, dry_run=True)
assert report.bytes_freed == 350
# ---------------------------------------------------------------------------
# reset_pipeline
# ---------------------------------------------------------------------------
class TestResetPipeline:
def test_dry_run_skips_termination(
self,
settings: Settings,
monkeypatch: pytest.MonkeyPatch,
) -> None:
rp = RunningProcess(pid=10, role="daemon", cmdline="x")
monkeypatch.setattr("neuropose.reset.find_neuropose_processes", lambda: [rp])
# Sentinel to detect termination calls.
def _should_not_be_called(*args: Any, **kwargs: Any) -> TerminationReport:
raise AssertionError("dry_run must not invoke terminate_processes")
monkeypatch.setattr("neuropose.reset.terminate_processes", _should_not_be_called)
report = reset_pipeline(settings, dry_run=True)
assert report.dry_run is True
assert report.discovered == [rp]
assert report.termination.stopped == []
def test_skips_wipe_when_survivors_remain(
self,
settings: Settings,
monkeypatch: pytest.MonkeyPatch,
) -> None:
rp = RunningProcess(pid=10, role="daemon", cmdline="x")
monkeypatch.setattr("neuropose.reset.find_neuropose_processes", lambda: [rp])
monkeypatch.setattr(
"neuropose.reset.terminate_processes",
lambda procs, **_: TerminationReport(survivors=list(procs)),
)
# Seed something to wipe so we can confirm it's untouched.
(settings.input_dir / "job_a").mkdir()
report = reset_pipeline(settings, dry_run=False)
assert report.wipe_skipped_due_to_survivors is True
assert report.wipe.removed_paths == []
assert (settings.input_dir / "job_a").exists()
def test_wipes_when_all_processes_stopped(
self,
settings: Settings,
monkeypatch: pytest.MonkeyPatch,
) -> None:
rp = RunningProcess(pid=10, role="daemon", cmdline="x")
monkeypatch.setattr("neuropose.reset.find_neuropose_processes", lambda: [rp])
monkeypatch.setattr(
"neuropose.reset.terminate_processes",
lambda procs, **_: TerminationReport(stopped=list(procs)),
)
(settings.input_dir / "job_a").mkdir()
report = reset_pipeline(settings, dry_run=False)
assert report.wipe_skipped_due_to_survivors is False
assert any(p.name == "job_a" for p in report.wipe.removed_paths)
assert not (settings.input_dir / "job_a").exists()
# ---------------------------------------------------------------------------
# CLI: neuropose reset
# ---------------------------------------------------------------------------
@pytest.fixture
def runner() -> CliRunner:
return CliRunner()
@pytest.fixture
def env_data_dir(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> Path:
"""Point NEUROPOSE_DATA_DIR at an isolated temp dir for CLI tests."""
data_dir = tmp_path / "jobs"
data_dir.mkdir()
(data_dir / "in").mkdir()
(data_dir / "out").mkdir()
(data_dir / "failed").mkdir()
monkeypatch.setenv("NEUROPOSE_DATA_DIR", str(data_dir))
return data_dir
class TestResetCli:
def test_reset_dry_run_does_not_modify(
self,
runner: CliRunner,
env_data_dir: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
(env_data_dir / "in" / "job_a").mkdir()
monkeypatch.setattr("neuropose.reset.find_neuropose_processes", list)
result = runner.invoke(app, ["reset", "--dry-run"])
assert result.exit_code == EXIT_OK, result.output
assert "would remove" in result.output
assert "(dry-run; no changes made)" in result.output
assert (env_data_dir / "in" / "job_a").exists()
def test_reset_yes_skips_confirmation_and_wipes(
self,
runner: CliRunner,
env_data_dir: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
(env_data_dir / "in" / "job_a").mkdir()
(env_data_dir / "in" / "job_a" / "video.mp4").write_bytes(b"x" * 100)
monkeypatch.setattr("neuropose.reset.find_neuropose_processes", list)
result = runner.invoke(app, ["reset", "--yes"])
assert result.exit_code == EXIT_OK, result.output
assert "removed" in result.output
assert not (env_data_dir / "in" / "job_a").exists()
def test_reset_aborts_on_no_confirmation(
self,
runner: CliRunner,
env_data_dir: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
(env_data_dir / "in" / "job_a").mkdir()
monkeypatch.setattr("neuropose.reset.find_neuropose_processes", list)
# typer.confirm reads from stdin; "n\n" declines.
result = runner.invoke(app, ["reset"], input="n\n")
assert result.exit_code == EXIT_USAGE, result.output
assert "aborted" in result.output
assert (env_data_dir / "in" / "job_a").exists()
def test_reset_clean_state_is_noop(
self,
runner: CliRunner,
env_data_dir: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
del env_data_dir
monkeypatch.setattr("neuropose.reset.find_neuropose_processes", list)
result = runner.invoke(app, ["reset"])
assert result.exit_code == EXIT_OK, result.output
assert "nothing to do" in result.output
def test_reset_reports_survivors_with_nonzero_exit(
self,
runner: CliRunner,
env_data_dir: Path,
monkeypatch: pytest.MonkeyPatch,
) -> None:
del env_data_dir
rp = RunningProcess(pid=42, role="daemon", cmdline="neuropose watch")
monkeypatch.setattr("neuropose.reset.find_neuropose_processes", lambda: [rp])
monkeypatch.setattr(
"neuropose.reset.terminate_processes",
lambda procs, **_: TerminationReport(survivors=list(procs)),
)
result = runner.invoke(app, ["reset", "--yes"])
assert result.exit_code == EXIT_USAGE, result.output
assert "did not exit" in result.output
assert "pid 42" in result.output
assert "--force-kill" in result.output
def test_default_grace_seconds_constant_is_reasonable() -> None:
"""Lock the default grace period so a refactor cannot silently lower it."""
assert 5.0 <= DEFAULT_GRACE_SECONDS <= 60.0
def test_wipe_report_default_construction() -> None:
r = WipeReport()
assert r.removed_paths == []
assert r.bytes_freed == 0