levineuwirth.org/tools/archive.py

1152 lines
47 KiB
Python

#!/usr/bin/env python3
"""
archive.py — Build-time link-archiving tool for levineuwirth.org.
Reads archive/manifest.yaml, fetches any manifest URL that has no local
artifact yet, stores it under archive/<slug>/, extracts readable text,
writes the per-entry archive/<slug>/PROVENANCE.json, and (re)writes the
Hakyll input data/archive-index.json.
Two artifact types:
* pdf — downloaded directly, stored as document.pdf, text via pdftotext.
* html — snapshotted with `monolith` into a single self-contained
snapshot.html (JavaScript stripped, assets inlined as data
URIs), a restrictive Content-Security-Policy <meta> injected,
text extracted with BeautifulSoup.
Subcommands:
fetch download missing artifacts, (re)generate sidecars + index
refresh deliberately re-snapshot a single entry, recording the prior
SHA in the new PROVENANCE.json's `previous-sha256`
wayback submit archived URLs to the Wayback Machine as a second,
independent copy; backfill the capture URL into PROVENANCE.json
check HEAD/GET-probe every manifest URL for link rot, updating
data/archive-state.json with asymmetric hysteresis
gc delete archive/<slug>/ directories listed in archive/removed.yaml
Failure policy:
* Integrity errors — a committed artifact whose SHA-256 no longer
matches PROVENANCE.json, or a slug whose manifest URL has changed —
print loudly and exit non-zero, halting `make build`.
* Transient errors — a network failure, an over-cap download, a missing
`monolith` binary, a manifest entry missing its `url:` — print a
warning, skip that entry, and exit zero so the build proceeds (the
entry is retried on the next build).
See ARCHIVE.md for the full design.
Gated on .venv by the Makefile (same convention as embed.py). Non-stdlib
dependencies: PyYAML and beautifulsoup4, both already in pyproject.toml.
External tools: `pdftotext` (poppler) for PDF text, and the `monolith`
binary — vendored at tools/bin/monolith, see tools/monolith-version.txt.
"""
from __future__ import annotations
import datetime
import hashlib
import json
import os
import re
import shutil
import subprocess
import sys
import urllib.error
import urllib.request
from pathlib import Path
from urllib.parse import parse_qsl, quote, urlencode, urlparse, urlunparse
import yaml
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
REPO_ROOT = Path(__file__).resolve().parent.parent
ARCHIVE_DIR = REPO_ROOT / "archive"
MANIFEST = ARCHIVE_DIR / "manifest.yaml"
REMOVED = ARCHIVE_DIR / "removed.yaml"
INDEX_OUT = REPO_ROOT / "data" / "archive-index.json"
STATE_OUT = REPO_ROOT / "data" / "archive-state.json"
ROT_FAILS = 3 # consecutive failed scans before `rotted` is considered
ROT_DAYS = 14 # ... and the streak must also span at least this many days
SIZE_CAP = 25 * 1024 * 1024 # 25 MB per-artifact cap
TIMEOUT = 60 # seconds, per network request
WAYBACK_TIMEOUT = 120 # seconds — Save Page Now is slow
USER_AGENT = ("levineuwirth.org/archive "
"(ln@levineuwirth.org; removal requests honored)")
# Per-type on-disk names. The artifact is committed; the .txt is generated
# (gitignored) and regenerated whenever the artifact's SHA-256 changes.
ARTIFACT = {"pdf": "document.pdf", "html": "snapshot.html"}
TEXTFILE = {"pdf": "document.txt", "html": "snapshot.txt"}
# Injected into every HTML snapshot's <head>. Permits exactly what a
# faithful monolith capture needs — inlined images/fonts as data URIs and
# inline styles (as <style> elements and as style="" attributes) — and
# blocks every network fetch and every script a broken or hostile snapshot
# might attempt. Defense-in-depth behind the iframe sandbox; see ARCHIVE.md.
ARCHIVE_CSP = (
"default-src 'none'; img-src data:; "
"style-src 'unsafe-inline'; style-src-elem 'unsafe-inline'; "
"style-src-attr 'unsafe-inline'; font-src data:; "
"script-src 'none'; object-src 'none'; frame-src 'none'"
)
def log(msg: str) -> None:
print(f"[archive] {msg}")
def err(msg: str) -> None:
print(f"[archive] ERROR: {msg}", file=sys.stderr)
# ---------------------------------------------------------------------------
# Manifest / removed.yaml
# ---------------------------------------------------------------------------
def load_yaml_list(path: Path) -> list[dict]:
"""Load a YAML file expected to hold a list of mappings. An empty or
absent file yields an empty list."""
if not path.exists():
return []
data = yaml.safe_load(path.read_text(encoding="utf-8"))
if data is None:
return []
if not isinstance(data, list):
err(f"{path.name}: expected a YAML list, got {type(data).__name__}")
sys.exit(1)
return data
def derive_slug(url: str) -> str:
"""Auto-derive a slug as {domain-label}-{path-tail}, slugified and
truncated. A manifest `slug:` override is preferred over this."""
p = urlparse(url)
host = p.netloc.lower().removeprefix("www.")
labels = host.split(".")
domain = labels[-2] if len(labels) >= 2 else (host or "url")
tail = (p.path.rstrip("/").split("/") or [""])[-1] or "index"
slug = re.sub(r"[^a-z0-9]+", "-", f"{domain}-{tail}".lower()).strip("-")
slug = slug[:64].strip("-")
return slug or hashlib.sha1(url.encode()).hexdigest()[:12]
def entry_slug(entry: dict) -> str:
slug = entry.get("slug")
return slug if slug else derive_slug(entry["url"])
# ---------------------------------------------------------------------------
# Hashing / type detection
# ---------------------------------------------------------------------------
def sha256_of(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as fh:
for chunk in iter(lambda: fh.read(1 << 16), b""):
h.update(chunk)
return h.hexdigest()
def probe_headers(url: str) -> dict[str, str]:
"""Best-effort HEAD request. Returns the response headers as a
lowercased-key dict, or {} on any failure (some servers reject HEAD)."""
req = urllib.request.Request(url, method="HEAD",
headers={"User-Agent": USER_AGENT})
try:
with urllib.request.urlopen(req, timeout=TIMEOUT) as resp:
return {k.lower(): v for k, v in resp.headers.items()}
except Exception: # noqa: BLE001
return {}
def probe_headers_get(url: str) -> dict[str, str]:
"""Best-effort ranged GET, returning lowercased-key response headers
or {} on any failure. Used alongside 'probe_headers' so an
@X-Robots-Tag: noarchive@ that appears only on GET (some servers omit
it on HEAD) is still honoured."""
req = urllib.request.Request(
url, method="GET",
headers={"User-Agent": USER_AGENT, "Range": "bytes=0-0"})
try:
with urllib.request.urlopen(req, timeout=TIMEOUT) as resp:
return {k.lower(): v for k, v in resp.headers.items()}
except Exception: # noqa: BLE001
return {}
def detect_type(url: str, override) -> str | None:
"""Resolve an entry's artifact type. A manifest `type:` wins; then the
URL extension; then a Content-Type probe; HTML is the final default
(most non-PDF cited URLs are pages). Returns None on a bad override."""
if override:
o = str(override).strip().lower()
if o in ARTIFACT:
return o
err(f"{url}: manifest type: {override!r} not recognised "
f"(expected pdf | html)")
return None
path = urlparse(url).path.lower()
if path.endswith(".pdf"):
return "pdf"
if path.endswith((".html", ".htm")):
return "html"
ct = (probe_headers(url).get("content-type") or "").lower()
if "pdf" in ct:
return "pdf"
return "html"
# ---------------------------------------------------------------------------
# PDF fetch + text extraction
# ---------------------------------------------------------------------------
def fetch_pdf(url: str, dest: Path) -> bool:
"""Download `url` to `dest`, enforcing the size cap. Returns True on
success. A partial / over-cap download leaves no file behind."""
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
tmp = dest.with_suffix(dest.suffix + ".part")
try:
with urllib.request.urlopen(req, timeout=TIMEOUT) as resp:
# X-Robots-Tag: noarchive — honour the archiving-specific
# directive even though robots.txt itself is not gated.
robots = (resp.headers.get("X-Robots-Tag") or "").lower()
if "noarchive" in robots:
err(f"{url}: response carries X-Robots-Tag: noarchive — skipped")
return False
total = 0
with tmp.open("wb") as fh:
for chunk in iter(lambda: resp.read(1 << 16), b""):
total += len(chunk)
if total > SIZE_CAP:
fh.close()
tmp.unlink(missing_ok=True)
err(f"{url}: exceeds {SIZE_CAP // (1024*1024)} MB cap "
f"— skipped (commit deliberately with `git add -f`)")
return False
fh.write(chunk)
tmp.replace(dest)
return True
except Exception as exc: # noqa: BLE001 — report any failure
tmp.unlink(missing_ok=True)
err(f"{url}: fetch failed — {exc}")
return False
def extract_text_pdf(pdf: Path, txt: Path) -> None:
"""Extract plain text from `pdf` into `txt` via pdftotext. On any
failure an empty file is written so downstream steps still find it."""
try:
subprocess.run(["pdftotext", "-q", str(pdf), str(txt)], check=True)
except (subprocess.CalledProcessError, FileNotFoundError) as exc:
err(f"{pdf.name}: pdftotext failed ({exc}); writing empty text sidecar")
txt.write_text("", encoding="utf-8")
# ---------------------------------------------------------------------------
# HTML snapshot (monolith) + CSP + text extraction + quality classification
# ---------------------------------------------------------------------------
def find_monolith() -> str | None:
"""Locate the monolith binary: $MONOLITH_BIN, then the vendored
tools/bin/monolith, then $PATH. None if unavailable."""
env = os.environ.get("MONOLITH_BIN")
if env and Path(env).is_file():
return env
vendored = REPO_ROOT / "tools" / "bin" / "monolith"
if vendored.is_file():
return str(vendored)
return shutil.which("monolith")
def body_noarchive(path: Path) -> bool:
"""True if the snapshot declares <meta name=robots ... noarchive> —
the in-document equivalent of the X-Robots-Tag header."""
from bs4 import BeautifulSoup
soup = BeautifulSoup(path.read_text(encoding="utf-8", errors="replace"),
"html.parser")
for m in soup.find_all("meta"):
if (m.get("name") or "").lower() in ("robots", "googlebot"):
if "noarchive" in (m.get("content") or "").lower():
return True
return False
def inject_archive_metas(path: Path) -> None:
"""Insert the archive CSP and a robots `noindex, noarchive` <meta> as
the first <head> children, dropping any CSP or robots <meta> the
original shipped: two intersecting CSPs could block resources a
faithful snapshot legitimately needs, and we own the indexing posture
for the served snapshot regardless of what the original said."""
from bs4 import BeautifulSoup
soup = BeautifulSoup(path.read_text(encoding="utf-8", errors="replace"),
"html.parser")
head = soup.head
if head is None:
head = soup.new_tag("head")
(soup.html if soup.html is not None else soup).insert(0, head)
for m in list(head.find_all("meta")):
if (m.get("http-equiv") or "").lower() == "content-security-policy":
m.decompose()
elif (m.get("name") or "").lower() == "robots":
m.decompose()
# Inserted in reverse so the final head order is CSP first, robots
# second (deterministic, easy to grep).
robots = soup.new_tag("meta")
robots["name"] = "robots"
robots["content"] = "noindex, noarchive"
head.insert(0, robots)
csp = soup.new_tag("meta")
csp["http-equiv"] = "Content-Security-Policy"
csp["content"] = ARCHIVE_CSP
head.insert(0, csp)
path.write_text(str(soup), encoding="utf-8")
def fetch_html(url: str, dest: Path) -> bool:
"""Snapshot an HTML page with monolith into a single self-contained
file at `dest`, then inject the archive CSP. Returns True on success;
every failure path is non-fatal (warn + skip)."""
# Honour directives returned by preliminary probes before performing
# the document fetch. The full document response is inspected below
# and is also the exact body passed to monolith; do not let monolith
# perform a second unobservable fetch of the primary document.
if any("noarchive" in (h.get("x-robots-tag") or "").lower()
for h in (probe_headers(url),
probe_headers_get(url))):
err(f"{url}: response carries X-Robots-Tag: noarchive — skipped")
return False
mono = find_monolith()
if mono is None:
err(f"{url}: monolith not found — vendor the binary at "
f"tools/bin/monolith (see tools/monolith-version.txt) or set "
f"$MONOLITH_BIN; HTML snapshot skipped")
return False
source = dest.with_suffix(dest.suffix + ".source.part")
tmp = dest.with_suffix(dest.suffix + ".part")
effective_url = url
try:
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
with urllib.request.urlopen(req, timeout=TIMEOUT) as resp:
robots = (resp.headers.get("X-Robots-Tag") or "").lower()
if "noarchive" in robots:
err(f"{url}: response carries X-Robots-Tag: noarchive — skipped")
return False
effective_url = resp.geturl()
total = 0
with source.open("wb") as fh:
for chunk in iter(lambda: resp.read(1 << 16), b""):
total += len(chunk)
if total > SIZE_CAP:
fh.close()
source.unlink(missing_ok=True)
err(f"{url}: source HTML exceeds "
f"{SIZE_CAP // (1024*1024)} MB cap — skipped")
return False
fh.write(chunk)
except Exception as exc: # noqa: BLE001
source.unlink(missing_ok=True)
err(f"{url}: fetch failed — {exc}")
return False
if body_noarchive(source):
source.unlink(missing_ok=True)
err(f"{url}: response declares <meta name=robots> noarchive — skipped")
return False
cmd = [mono, "--no-js", "--ignore-errors", "--quiet",
"--timeout", str(TIMEOUT), "--user-agent", USER_AGENT,
"--base-url", effective_url, "--output", str(tmp), "-"]
try:
proc = subprocess.run(cmd, input=source.read_bytes(),
capture_output=True, timeout=TIMEOUT * 6)
except subprocess.TimeoutExpired:
source.unlink(missing_ok=True)
tmp.unlink(missing_ok=True)
err(f"{url}: monolith timed out — skipped")
return False
except Exception as exc: # noqa: BLE001
source.unlink(missing_ok=True)
tmp.unlink(missing_ok=True)
err(f"{url}: monolith failed to run — {exc}")
return False
finally:
source.unlink(missing_ok=True)
if proc.returncode != 0:
tmp.unlink(missing_ok=True)
output = proc.stderr or proc.stdout or b""
tail = output.decode("utf-8", errors="replace").strip().splitlines()
err(f"{url}: monolith exited {proc.returncode} "
f"({tail[-1] if tail else 'no output'}) — skipped")
return False
if not tmp.exists() or tmp.stat().st_size == 0:
tmp.unlink(missing_ok=True)
err(f"{url}: monolith produced no output — skipped")
return False
if tmp.stat().st_size > SIZE_CAP:
size_mb = tmp.stat().st_size // (1024 * 1024)
tmp.unlink(missing_ok=True)
err(f"{url}: snapshot is {size_mb} MB, over the "
f"{SIZE_CAP // (1024*1024)} MB cap — skipped "
f"(commit deliberately with `git add -f`)")
return False
inject_archive_metas(tmp)
tmp.replace(dest)
return True
def extract_text_html(snapshot: Path, txt: Path) -> None:
"""Extract readable, block-separated text from an HTML snapshot. Block
boundaries become blank lines so the archive page can render the text
as paragraphs. On any failure an empty file is written."""
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(snapshot.read_text(encoding="utf-8",
errors="replace"),
"html.parser")
for tag in soup(["script", "style", "noscript", "template", "head"]):
tag.decompose()
blocks = ["h1", "h2", "h3", "h4", "h5", "h6", "p", "li", "blockquote",
"pre", "tr", "figcaption", "section", "article", "div",
"header", "footer", "ul", "ol", "dl", "dd", "dt", "table",
"br", "hr"]
# Append a NUL after every block element, then split the flattened
# text on it: each chunk is the text between two block boundaries,
# i.e. one paragraph. NUL never occurs in real HTML text content.
sentinel = "\x00"
for tag in soup.find_all(blocks):
tag.append(sentinel)
body = soup.body or soup
paras = []
for chunk in body.get_text(" ").split(sentinel):
words = chunk.split()
if words:
paras.append(" ".join(words))
txt.write_text("\n\n".join(paras) + "\n", encoding="utf-8")
except Exception as exc: # noqa: BLE001
err(f"{snapshot.name}: HTML text extraction failed ({exc}); "
f"writing empty text sidecar")
txt.write_text("", encoding="utf-8")
def classify_snapshot(path: Path) -> str:
"""Heuristic capture-quality grade: 'ok' / 'degraded' / 'js-required'.
A near-empty snapshot is a JS app shell `--no-js` hollowed out; an
<img> whose src is still remote (or only lazy-load attrs) is one
monolith failed to inline. The author reviews the rendered snapshot
before committing regardless — this only drives an automated flag."""
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(path.read_text(encoding="utf-8", errors="replace"),
"html.parser")
for tag in soup(["script", "style", "noscript", "template"]):
tag.decompose()
body = soup.body or soup
if len(body.get_text(" ", strip=True)) < 200:
return "js-required"
remote = 0
for img in body.find_all("img"):
src = (img.get("src") or "").strip()
if src.startswith(("http://", "https://")):
remote += 1
elif not src and (img.get("data-src") or img.get("data-lazy-src")
or img.get("srcset")):
remote += 1
return "degraded" if remote else "ok"
except Exception: # noqa: BLE001
return "degraded"
# ---------------------------------------------------------------------------
# Equivalent-URL aliases
# ---------------------------------------------------------------------------
# Query parameters whose presence/absence is semantically irrelevant — a
# citation written with `?utm_source=…` should match the canonical form.
# Non-tracking parameters (`?v=`, `?id=`, Wayback timestamps) are
# load-bearing and must be preserved.
TRACKING_PARAMS = frozenset({
"utm_source", "utm_medium", "utm_campaign", "utm_term", "utm_content",
"fbclid", "gclid", "mc_eid", "mc_cid", "ref", "igshid",
"_hsenc", "_hsmi", "mkt_tok",
})
# Matches @https://arxiv.org/(abs|pdf)/<id>[v<n>][.pdf]@ — the family of
# forms a single paper has in the wild.
_ARXIV_RE = re.compile(
r"(https?://arxiv\.org/)(abs|pdf)/([\w.]+?)(v\d+)?(\.pdf)?$"
)
def strip_tracking(url: str) -> str:
"""Remove tracking query parameters, leaving every other parameter in
place. An empty query is preserved as empty (no trailing `?`)."""
p = urlparse(url)
if not p.query:
return url
kept = [(k, v) for k, v in parse_qsl(p.query, keep_blank_values=True)
if k not in TRACKING_PARAMS]
return urlunparse(p._replace(query=urlencode(kept)))
def arxiv_aliases(url: str) -> set[str]:
"""For an arXiv URL, the set of equivalent forms: abs ↔ pdf, with and
without version, with and without trailing @.pdf@. Empty for any URL
that isn't arXiv."""
m = _ARXIV_RE.match(url)
if not m:
return set()
scheme_host, _kind, paper_id, version, _ext = m.groups()
out: set[str] = set()
for kind in ("abs", "pdf"):
for ver in ("", version or ""):
tails = (".pdf", "") if kind == "pdf" else ("",)
for tail in tails:
out.add(f"{scheme_host}{kind}/{paper_id}{ver}{tail}")
return out
def url_aliases(url: str) -> list[str]:
"""The equivalent-URL set: tracking parameters stripped, http/https
folded, trailing slashes tolerated, arXiv abs/pdf/versioned forms
expanded. The canonical URL itself is omitted (it is the index key)."""
out: set[str] = {url, strip_tracking(url)}
for u in list(out):
if u.startswith("https://"):
out.add("http://" + u[len("https://"):])
elif u.startswith("http://"):
out.add("https://" + u[len("http://"):])
for u in list(out):
out.add(u.rstrip("/"))
for u in list(out):
out.update(arxiv_aliases(u))
out.discard(url)
return sorted(out)
def arxiv_canonical(url: str) -> str:
"""The canonical form of an arXiv URL: @https://arxiv.org/abs/<id>@
with no version and no @.pdf@. Non-arXiv passes through. Mirrors the
Haskell-side @arxivCanonical@ in @build/ArchiveIndex.hs@."""
m = _ARXIV_RE.match(url)
if not m:
return url
_scheme_host, _kind, paper_id, _ver, _ext = m.groups()
return f"https://arxiv.org/abs/{paper_id}"
def normalize_url(url: str) -> str:
"""The canonical form for *matching* — drop fragment, strip tracking,
fold http→https, arXiv-canonicalise, trim trailing slashes. Mirrors
@normalizeUrl@ in @build/ArchiveIndex.hs@ so removal enforcement and
duplicate detection use the same equivalence the link-annotation
filter uses; keep the two in sync."""
no_frag = url.split("#", 1)[0]
clean = strip_tracking(no_frag)
if clean.startswith("http://"):
clean = "https://" + clean[len("http://"):]
canonical = arxiv_canonical(clean)
return canonical.rstrip("/")
def _is_tracked_and_clean(*paths: Path) -> bool:
"""True if every path is tracked by git AND has no uncommitted
changes — i.e. its committed bytes are recoverable via @git log -S@
once a refresh replaces it. False on any git error (uninitialised
repo, missing git binary, dirty/untracked file)."""
str_paths = [str(p) for p in paths]
try:
for p in str_paths:
rc = subprocess.run(
["git", "ls-files", "--error-unmatch", "--", p],
cwd=str(REPO_ROOT),
capture_output=True,
).returncode
if rc != 0:
return False
rc = subprocess.run(
["git", "diff", "--quiet", "HEAD", "--", *str_paths],
cwd=str(REPO_ROOT),
capture_output=True,
).returncode
return rc == 0
except FileNotFoundError:
return False
# ---------------------------------------------------------------------------
# fetch subcommand
# ---------------------------------------------------------------------------
def cmd_fetch() -> int:
manifest = load_yaml_list(MANIFEST)
# Removed URLs are compared in normalised form so a tracking-laden
# variant cannot bypass a takedown the author already recorded.
removed_norms = {normalize_url(r["url"])
for r in load_yaml_list(REMOVED) if r.get("url")}
# Pre-scan validation: reject canonical-form duplicates *before* any
# fetch I/O, so a first colliding entry never gets partially processed
# while a second's duplicate check halts.
seen: dict[str, str] = {}
for entry in manifest:
url = entry.get("url")
if not url:
continue
norm = normalize_url(url)
if norm in seen:
err(f"manifest: {url!r} and {seen[norm]!r} normalise to the "
f"same canonical form ({norm!r}). Drop one or distinguish "
f"them; the link archive cannot route both under one slug.")
sys.exit(1)
seen[norm] = url
index: dict[str, dict] = {}
skipped = 0
for entry in manifest:
url = entry.get("url")
if not url:
err("manifest entry without a `url:` — skipped")
skipped += 1
continue
norm = normalize_url(url)
# A manifest URL whose canonical form matches a removed entry is a
# deliberate takedown; never silently re-archive it. The author
# either removes the line from removed.yaml ("I want it back") or
# from the manifest.
if norm in removed_norms:
err(f"manifest URL {url!r} (canonical {norm!r}) is recorded in "
f"archive/removed.yaml as a deliberate takedown. To re-archive "
f"it, remove the corresponding line from removed.yaml first.")
sys.exit(1)
slug = entry_slug(entry)
slug_dir = ARCHIVE_DIR / slug
prov_path = slug_dir / "PROVENANCE.json"
# --- resolve the artifact type ------------------------------------
# An archived entry's type is fixed in PROVENANCE.json; a new entry
# is detected from the manifest / URL / Content-Type.
prov = None
if prov_path.exists():
prov = json.loads(prov_path.read_text(encoding="utf-8"))
if prov.get("url") != url:
err(f"{slug}: manifest URL changed "
f"({prov.get('url')!r} -> {url!r}). A committed artifact "
f"is never silently re-fetched; to deliberately "
f"re-snapshot, run `archive.py refresh {slug}`.")
sys.exit(1)
atype = prov.get("type", "pdf")
else:
atype = detect_type(url, entry.get("type"))
if atype is None:
skipped += 1
continue
art = slug_dir / ARTIFACT[atype]
txt = slug_dir / TEXTFILE[atype]
txt_stamp = slug_dir / (TEXTFILE[atype] + ".sha256")
# --- integrity guard (fatal): a committed artifact must verify,
# and a lost artifact must not be silently re-fetched. -------
if prov is not None:
if art.exists():
live = sha256_of(art)
if live != prov.get("sha256"):
err(f"{slug}: {art.name} SHA-256 mismatch "
f"(recorded {prov.get('sha256')}, found {live}) "
f"— the committed artifact is corrupt or was replaced")
sys.exit(1)
else:
err(f"{slug}: PROVENANCE.json is committed but {art.name} "
f"is missing. The committed artifact has been lost; "
f"restore it from git before rebuilding. A refresh "
f"requires a present, verified prior snapshot.")
sys.exit(1)
# --- fetch the artifact if it is not already present --------------
if not art.exists():
slug_dir.mkdir(parents=True, exist_ok=True)
log(f"fetching {url} [{atype}]")
ok = fetch_pdf(url, art) if atype == "pdf" else fetch_html(url, art)
if not ok:
skipped += 1
continue
else:
log(f"{slug}: artifact present, skipping fetch")
digest = sha256_of(art)
# --- regenerate text when the artifact changed (or .txt absent) ---
stale = (not txt.exists()
or not txt_stamp.exists()
or txt_stamp.read_text(encoding="utf-8").strip() != digest)
if stale:
if atype == "pdf":
extract_text_pdf(art, txt)
else:
extract_text_html(art, txt)
txt_stamp.write_text(digest + "\n", encoding="utf-8")
# --- write PROVENANCE.json (once; stable thereafter) --------------
if prov is None:
quality = "ok" if atype == "pdf" else classify_snapshot(art)
prov = {
"url": url,
"slug": slug,
"title": entry.get("title") or slug,
"type": atype,
"artifact": ARTIFACT[atype],
"sha256": digest,
"previous-sha256": None,
"bytes": art.stat().st_size,
"archived": datetime.date.today().isoformat(),
"source-date": entry.get("source-date"),
"snapshot-quality": quality,
"wayback": None,
}
prov_path.write_text(
json.dumps(prov, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
log(f"{slug}: archived [{atype}, {quality}] ({prov['bytes']} bytes)")
# --- contribute to the Hakyll index -------------------------------
index[url] = {
"slug": slug,
"type": prov.get("type", atype),
"title": prov.get("title", slug),
"aliases": url_aliases(url),
}
# archive-index.json is always rewritten to mirror the manifest exactly.
INDEX_OUT.parent.mkdir(parents=True, exist_ok=True)
INDEX_OUT.write_text(
json.dumps(index, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
log(f"wrote {INDEX_OUT.relative_to(REPO_ROOT)} ({len(index)} entries)")
if skipped:
err(f"{skipped} entr{'y' if skipped == 1 else 'ies'} skipped "
f"(network / cap / missing url) — retried next build")
return 0
# ---------------------------------------------------------------------------
# refresh subcommand — deliberate re-snapshot of one entry
# ---------------------------------------------------------------------------
def cmd_refresh(argv: list[str]) -> int:
"""Deliberately re-snapshot a single entry.
Two invariants:
* The prior snapshot is *recoverable* — refresh refuses to replace
an artifact whose committed bytes git does not have, so the
recorded @previous-sha256@ always points at something
retrievable via @git log -S@. Commit the current snapshot first.
* The replacement is *atomic across every exit path* — slug dir and
@data/archive-index.json@ are both staged aside; any failure
(transient fetch error, fatal @cmd_fetch@ exit, exception,
interruption) restores both. We never end up with no snapshot
and never leave the index pointing at a discarded state.
The only way an @archive.py@ invocation replaces a committed artifact
— @cmd_fetch@ itself refuses to."""
if not argv:
err("refresh: pass a slug "
"(e.g. `archive.py refresh nist-fips-203`)")
return 2
slug = argv[0]
manifest = load_yaml_list(MANIFEST)
entry = next((e for e in manifest
if e.get("url") and entry_slug(e) == slug), None)
if entry is None:
err(f"refresh: {slug!r} is not in archive/manifest.yaml")
return 2
slug_dir = ARCHIVE_DIR / slug
prov_path = slug_dir / "PROVENANCE.json"
prev_sha: str | None = None
if prov_path.exists():
try:
prev = json.loads(prov_path.read_text(encoding="utf-8"))
prev_sha = prev.get("sha256")
prev_artifact = slug_dir / prev.get("artifact", "")
except Exception as exc: # noqa: BLE001
err(f"refresh: cannot parse prior provenance for {slug}: {exc}")
return 2
# The prior snapshot must be committed and clean — otherwise
# `previous-sha256` would point at bytes git can no longer give
# back, breaking the auditable replacement contract.
if not prev_sha or not prev_artifact.exists():
err(f"refresh: prior snapshot for {slug} is incomplete; restore "
f"its artifact and provenance before replacing it.")
return 2
live_sha = sha256_of(prev_artifact)
if live_sha != prev_sha:
err(f"refresh: prior snapshot for {slug} fails SHA-256 "
f"verification (recorded {prev_sha}, found {live_sha}); "
f"refusing to replace unverifiable bytes.")
return 2
if not _is_tracked_and_clean(prov_path, prev_artifact):
err(f"refresh: the prior snapshot for {slug} "
f"(archive/{slug}/{{PROVENANCE.json, "
f"{prev_artifact.name}}}) has uncommitted changes or is "
f"not tracked in git. Commit the current snapshot first "
f"— otherwise its bytes cannot be recovered via "
f"`git log -S` once replaced.")
return 2
# Stage the old snapshot AND the current archive-index.json aside —
# cmd_fetch rewrites the index unconditionally, so a failed refresh
# must roll both back.
backup: Path | None = None
if slug_dir.exists():
backup = slug_dir.with_name(slug + ".refresh-backup")
if backup.exists():
err(f"refresh: recovery directory {backup.name} already exists; "
f"resolve it before starting another refresh.")
return 2
slug_dir.rename(backup)
log(f"refresh: staged old archive/{slug}/ aside as {backup.name}")
index_existed = INDEX_OUT.exists()
index_backup: Path | None = None
if index_existed:
index_backup = INDEX_OUT.with_suffix(".json.refresh-backup")
if index_backup.exists():
if backup is not None:
backup.rename(slug_dir)
err(f"refresh: recovery file {index_backup.name} already exists; "
f"resolve it before starting another refresh.")
return 2
shutil.copy2(INDEX_OUT, index_backup)
succeeded = False
try:
rc = cmd_fetch()
# Success requires a new PROVENANCE.json *and* its declared
# artifact on disk. `cmd_fetch` returns 0 even when individual
# entries skip, so the return code alone is not enough.
if rc == 0 and prov_path.exists():
try:
new_prov = json.loads(prov_path.read_text(encoding="utf-8"))
art_name = new_prov.get("artifact", "")
if art_name and (slug_dir / art_name).exists():
if prev_sha:
new_prov["previous-sha256"] = prev_sha
prov_path.write_text(
json.dumps(new_prov, indent=2,
ensure_ascii=False) + "\n",
encoding="utf-8",
)
log(f"refresh: recorded previous-sha256 "
f"{prev_sha[:12]}")
succeeded = True
except Exception: # noqa: BLE001
succeeded = False
finally:
# Runs on every exit path — normal return, exception, SystemExit
# from cmd_fetch, KeyboardInterrupt. We always end with either a
# complete new snapshot or the prior one restored, never neither.
if succeeded:
if backup is not None:
shutil.rmtree(backup)
if index_backup is not None:
index_backup.unlink()
log(f"refresh: {slug} re-snapshotted")
else:
if slug_dir.exists():
shutil.rmtree(slug_dir)
if backup is not None:
backup.rename(slug_dir)
if index_backup is not None:
shutil.move(str(index_backup), str(INDEX_OUT))
elif not index_existed:
INDEX_OUT.unlink(missing_ok=True)
err(f"refresh: re-snapshot of {slug} failed; the prior "
f"snapshot has been restored.")
return 0 if succeeded else 1
# ---------------------------------------------------------------------------
# wayback subcommand
# ---------------------------------------------------------------------------
def wayback_save(url: str) -> None:
"""Trigger a fresh Wayback capture via Save Page Now. Best-effort: any
outcome is tolerated — the resulting URL is read back via the
availability API (which also surfaces a pre-existing capture)."""
req = urllib.request.Request("https://web.archive.org/save/" + url,
headers={"User-Agent": USER_AGENT})
try:
with urllib.request.urlopen(req, timeout=WAYBACK_TIMEOUT):
pass
except Exception as exc: # noqa: BLE001
log(f"wayback: save request for {url} did not complete ({exc})")
def wayback_lookup(url: str) -> str | None:
"""Return the most recent Wayback Machine capture URL for `url`, or
None if there is no capture (or the availability API is unreachable)."""
api = ("https://archive.org/wayback/available?url="
+ quote(url, safe=""))
req = urllib.request.Request(api, headers={"User-Agent": USER_AGENT})
try:
with urllib.request.urlopen(req, timeout=TIMEOUT) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as exc: # noqa: BLE001
err(f"wayback: availability lookup failed for {url} ({exc})")
return None
snap = (data.get("archived_snapshots") or {}).get("closest") or {}
if snap.get("available") and snap.get("url"):
return snap["url"]
return None
def cmd_wayback() -> int:
"""Submit every archived URL whose PROVENANCE.json has no `wayback`
capture yet to the Wayback Machine, then backfill the returned capture
URL. Never on the critical path of a build — a separate target. Always
exits 0: a capture that does not come through is simply retried next
run. URLs recorded in removed.yaml are skipped — a deliberate takedown
must not be re-published to a third-party archive even if its manifest
line is still present during the documented eviction sequence.
"""
manifest = load_yaml_list(MANIFEST)
removed_norms = {normalize_url(r["url"])
for r in load_yaml_list(REMOVED) if r.get("url")}
backfilled = pending = 0
for entry in manifest:
url = entry.get("url")
if not url or normalize_url(url) in removed_norms:
continue
slug = entry_slug(entry)
prov_path = ARCHIVE_DIR / slug / "PROVENANCE.json"
if not prov_path.exists():
continue # not fetched yet — run `fetch` first
prov = json.loads(prov_path.read_text(encoding="utf-8"))
if prov.get("wayback"):
continue # already has a capture recorded
log(f"wayback: submitting {url}")
wayback_save(url)
capture = wayback_lookup(url)
if capture:
prov["wayback"] = capture
prov_path.write_text(
json.dumps(prov, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
log(f"{slug}: wayback -> {capture}")
backfilled += 1
else:
log(f"{slug}: no Wayback capture available yet — retried next run")
pending += 1
log(f"wayback: {backfilled} backfilled, {pending} pending")
return 0
# ---------------------------------------------------------------------------
# check subcommand — link-rot detection
# ---------------------------------------------------------------------------
def moved_meaningfully(orig: str, final: str) -> bool:
"""True if `final` (where the request actually landed after redirects)
differs from `orig` by more than an http/https fold or a trailing slash
— i.e. a real relocation, not benign canonicalisation."""
def norm(u: str) -> str:
u = u.split("#", 1)[0]
if u.startswith("http://"):
u = "https://" + u[len("http://"):]
return u.rstrip("/")
return norm(orig) != norm(final)
def probe_url(url: str) -> tuple[str, str | None]:
"""Probe a URL for reachability. Returns @(result, new_url)@ where
result is 'ok' | 'moved' | 'fail'. HEAD first; a server that rejects
HEAD (405/501/403) is retried with a ranged GET."""
for method in ("HEAD", "GET"):
headers = {"User-Agent": USER_AGENT}
if method == "GET":
headers["Range"] = "bytes=0-0"
req = urllib.request.Request(url, method=method, headers=headers)
try:
with urllib.request.urlopen(req, timeout=TIMEOUT) as resp:
final = resp.geturl()
if moved_meaningfully(url, final):
return ("moved", final)
return ("ok", None)
except urllib.error.HTTPError as exc:
if method == "HEAD" and exc.code in (403, 405, 501):
continue # HEAD not allowed — try GET
return ("fail", None) # a definite 4xx/5xx
except Exception: # noqa: BLE001 — network failure
if method == "HEAD":
continue
return ("fail", None)
return ("fail", None)
def next_state(prev: dict, result: str, new_url: str | None,
today: datetime.date) -> dict:
"""Fold a probe result into an entry's state with asymmetric
hysteresis. Recovery is immediate: one 'ok' returns straight to
'live'. Rotting is slow: 'rotted' needs ROT_FAILS consecutive failures
spanning at least ROT_DAYS days; below that the status is the
inconclusive 'error'."""
iso = today.isoformat()
prev_status = prev.get("status", "live")
prev_cf = prev.get("consecutive-failures", 0)
prev_since = prev.get("status-since", iso)
if result == "ok":
return {"status": "live", "checked": iso,
"consecutive-failures": 0,
"status-since": prev_since if prev_status == "live" else iso}
if result == "moved":
rec = {"status": "moved", "checked": iso,
"consecutive-failures": 0,
"status-since": prev_since if prev_status == "moved" else iso}
if new_url:
rec["new-url"] = new_url
return rec
# result == "fail" — increment the streak; 'status-since' marks its start.
cf = prev_cf + 1
streak_since = prev_since if prev_status in ("error", "rotted") else iso
span = (today - datetime.date.fromisoformat(streak_since)).days
status = "rotted" if (cf >= ROT_FAILS and span >= ROT_DAYS) else "error"
return {"status": status, "checked": iso,
"consecutive-failures": cf, "status-since": streak_since}
def cmd_check() -> int:
"""Probe every manifest URL and rewrite data/archive-state.json. The
new state mirrors the manifest exactly (entries for dropped URLs are
discarded). A slow network job — never on a build's critical path;
always exits 0, since a probe failure is the signal, not an error.
URLs listed in removed.yaml are skipped — the link-rot scanner should
not keep probing a deliberately-removed work."""
manifest = load_yaml_list(MANIFEST)
removed_norms = {normalize_url(r["url"])
for r in load_yaml_list(REMOVED) if r.get("url")}
old = {}
if STATE_OUT.exists():
try:
old = json.loads(STATE_OUT.read_text(encoding="utf-8"))
except Exception: # noqa: BLE001
old = {}
today = datetime.date.today()
state: dict[str, dict] = {}
tally = {"live": 0, "moved": 0, "error": 0, "rotted": 0}
for entry in manifest:
url = entry.get("url")
if not url or normalize_url(url) in removed_norms:
continue
result, new_url = probe_url(url)
rec = next_state(old.get(url, {}), result, new_url, today)
state[url] = rec
tally[rec["status"]] = tally.get(rec["status"], 0) + 1
note = f" -> {new_url}" if new_url else ""
log(f"check: {url} [{rec['status']}]{note}")
STATE_OUT.parent.mkdir(parents=True, exist_ok=True)
STATE_OUT.write_text(
json.dumps(state, indent=2, ensure_ascii=False) + "\n",
encoding="utf-8",
)
log(f"check: {tally['live']} live, {tally['moved']} moved, "
f"{tally['error']} error, {tally['rotted']} rotted "
f"-> {STATE_OUT.relative_to(REPO_ROOT)}")
return 0
# ---------------------------------------------------------------------------
# gc subcommand
# ---------------------------------------------------------------------------
def cmd_gc(ignore_orphans: bool) -> int:
manifest = load_yaml_list(MANIFEST)
removed = load_yaml_list(REMOVED)
manifest_slugs = {entry_slug(e) for e in manifest if e.get("url")}
removed_slugs = {r["slug"] for r in removed if r.get("slug")}
if not ARCHIVE_DIR.exists():
log("no archive/ directory — nothing to GC")
return 0
deleted = 0
orphans: list[str] = []
for child in sorted(ARCHIVE_DIR.iterdir()):
if not child.is_dir():
continue
slug = child.name
if slug in removed_slugs:
shutil.rmtree(child)
log(f"gc: removed archive/{slug}/ (in removed.yaml)")
deleted += 1
elif slug not in manifest_slugs:
orphans.append(slug)
for slug in orphans:
err(f"gc: archive/{slug}/ is not in manifest.yaml and not in "
f"removed.yaml — left intact. If you meant to evict it, add it "
f"to removed.yaml first; if it is stale (a branch switch, a "
f"rename), delete the directory by hand.")
log(f"gc: {deleted} director{'y' if deleted == 1 else 'ies'} removed")
if orphans and not ignore_orphans:
err(f"gc: {len(orphans)} orphan(s) present — "
f"resolve them or re-run with --ignore-orphans")
return 1
return 0
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main(argv: list[str]) -> int:
cmd = argv[0] if argv else "fetch"
if cmd == "fetch":
return cmd_fetch()
if cmd == "refresh":
return cmd_refresh(argv[1:])
if cmd == "wayback":
return cmd_wayback()
if cmd == "check":
return cmd_check()
if cmd == "gc":
return cmd_gc(ignore_orphans="--ignore-orphans" in argv[1:])
err(f"unknown subcommand {cmd!r} "
f"(expected: fetch | refresh | wayback | check | gc)")
return 2
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))