#!/usr/bin/env python3 """Statistical analysis of pqc-bench results. Parses .out files via the Go aggregator, then computes a three-way decomposition of where speedup originates: refo0 → refnv compiler optimisation (O3, no vectorisation) refnv → ref compiler auto-vectorisation ref → avx2 hand-written SIMD Usage: # Run aggregator inline: python3 analysis/analyze.py --data data/raw/kyber # Or pre-generate the raw JSON once, then reuse it: go run ./analysis/cmd/aggregate --raw --out /tmp/bench.json data/raw/kyber python3 analysis/analyze.py --json /tmp/bench.json # Write JSON output for figure generation: python3 analysis/analyze.py --data data/raw/kyber --out analysis/results.json """ import argparse import json import subprocess import sys from pathlib import Path import numpy as np from scipy import stats as scipy_stats # --------------------------------------------------------------------------- # Data loading # --------------------------------------------------------------------------- REPO_ROOT = Path(__file__).resolve().parent.parent def load_json(path: str) -> list[dict]: with open(path) as f: return json.load(f) def run_aggregator(data_dir: str) -> list[dict]: """Run the Go aggregator and return parsed records.""" cmd = ["go", "run", "./cmd/aggregate", "--raw", data_dir] result = subprocess.run(cmd, capture_output=True, text=True, cwd=REPO_ROOT / "analysis") if result.returncode != 0: print(f"aggregator failed:\n{result.stderr}", file=sys.stderr) sys.exit(1) return json.loads(result.stdout) # --------------------------------------------------------------------------- # Statistics # --------------------------------------------------------------------------- def cliffs_delta_from_u(u: float, m: int, n: int) -> float: """Cliff's delta derived from Mann-Whitney U statistic. U = number of pairs (faster_i, baseline_j) where faster_i < baseline_j. delta = (2U - m*n) / (m*n) ∈ [-1, +1] Positive → faster dominates baseline. """ return (2 * u - m * n) / (m * n) def bootstrap_speedup_ci( baseline: np.ndarray, faster: np.ndarray, n_boot: int = 5_000, ci: float = 0.95, rng: np.random.Generator | None = None, ) -> tuple[float, float]: """95% bootstrap CI for speedup = median(baseline) / median(faster). Resamples both arrays independently using vectorised indexing; returns (lo, hi). """ if rng is None: rng = np.random.default_rng(42) m, n = len(baseline), len(faster) # Draw all indices at once: shape (n_boot, m) and (n_boot, n) bi = rng.integers(0, m, size=(n_boot, m)) fi = rng.integers(0, n, size=(n_boot, n)) b_samples = baseline[bi] # (n_boot, m) f_samples = faster[fi] # (n_boot, n) # Median along axis=1 for each bootstrap replicate ratios = np.median(b_samples, axis=1) / np.median(f_samples, axis=1) alpha = (1 - ci) / 2 return float(np.percentile(ratios, alpha * 100)), float(np.percentile(ratios, (1 - alpha) * 100)) def compare(baseline: np.ndarray, faster: np.ndarray, rng: np.random.Generator) -> dict: """Full pairwise comparison: speedup + CI + Mann-Whitney + Cliff's delta.""" speedup = float(np.median(baseline)) / float(np.median(faster)) ci_lo, ci_hi = bootstrap_speedup_ci(baseline, faster, rng=rng) # One-sided Mann-Whitney: is faster < baseline in cycle counts? m, n = len(faster), len(baseline) u_stat, p_val = scipy_stats.mannwhitneyu(faster, baseline, alternative="less") # Cliff's delta derived from U — O(n log n), same cost as Mann-Whitney delta = cliffs_delta_from_u(float(u_stat), m, n) return { "speedup": speedup, "ci95": [ci_lo, ci_hi], "mannwhitney_p": float(p_val), "cliffs_delta": delta, "n_baseline": n, "n_faster": m, } # --------------------------------------------------------------------------- # Analysis # --------------------------------------------------------------------------- VARIANTS = ("refo0", "refnv", "ref", "avx2") # Canonical operation order for display OP_ORDER = [ "NTT", "INVNTT", "basemul", "frommsg", "gen_a", "poly_getnoise_eta1", "poly_getnoise_eta2", "keygen", "enc", "dec", ] def analyze(records: list[dict]) -> list[dict]: # Build lookup: (algorithm, variant, operation) → raw array raw: dict[tuple[str, str, str], np.ndarray] = {} for r in records: if r.get("raw"): raw[(r["algorithm"], r["variant"], r["operation"])] = np.array(r["raw"], dtype=np.float64) # Collect all (algorithm, operation) pairs present across all variants alg_ops = sorted( {(alg, op) for alg, var, op in raw}, key=lambda x: (x[0], _op_rank(x[1])), ) rng = np.random.default_rng(42) results = [] for alg, op in alg_ops: arrays = {v: raw[(alg, v, op)] for v in VARIANTS if (alg, v, op) in raw} if len(arrays) < 2: continue row: dict = { "algorithm": alg, "operation": op, "medians": {v: float(np.median(a)) for v, a in arrays.items()}, "n_obs": {v: len(a) for v, a in arrays.items()}, "comparisons": {}, } comps = row["comparisons"] # Three-way decomposition (each step requires both variants present) if "refo0" in arrays and "refnv" in arrays: comps["refo0_to_refnv"] = compare(arrays["refo0"], arrays["refnv"], rng) if "refnv" in arrays and "ref" in arrays: comps["refnv_to_ref"] = compare(arrays["refnv"], arrays["ref"], rng) if "ref" in arrays and "avx2" in arrays: comps["ref_to_avx2"] = compare(arrays["ref"], arrays["avx2"], rng) # Totals if "refo0" in arrays and "ref" in arrays: comps["refo0_to_ref"] = compare(arrays["refo0"], arrays["ref"], rng) if "refo0" in arrays and "avx2" in arrays: comps["refo0_to_avx2"] = compare(arrays["refo0"], arrays["avx2"], rng) results.append(row) return results def _op_rank(op: str) -> int: try: return OP_ORDER.index(op) except ValueError: return len(OP_ORDER) # --------------------------------------------------------------------------- # Display # --------------------------------------------------------------------------- def _fmt_speedup(comp: dict | None) -> str: if comp is None: return " — " r = comp["speedup"] lo, hi = comp["ci95"] return f"{r:5.2f}x [{lo:.2f},{hi:.2f}]" def _fmt_delta(comp: dict | None) -> str: if comp is None: return " —" return f"{comp['cliffs_delta']:+.3f}" def _fmt_p(comp: dict | None) -> str: if comp is None: return " —" p = comp["mannwhitney_p"] if p < 1e-300: return " <1e-300" if p < 1e-10: return f" {p:.1e}" return f" {p:.4f}" def print_table(results: list[dict]) -> None: algs = sorted({r["algorithm"] for r in results}) for alg in algs: rows = [r for r in results if r["algorithm"] == alg] rows.sort(key=lambda r: -r["comparisons"].get("ref_to_avx2", {}).get("speedup", 0)) print(f"\n{'─'*110}") print(f" {alg.upper()}") print(f"{'─'*110}") print( f" {'Operation':<24}" f" {'O3 (no-vec)':>18}" # refo0→refnv f" {'Auto-vec':>18}" # refnv→ref f" {'Hand SIMD':>18}" # ref→avx2 f" {'Total':>18}" # refo0→avx2 f" {'Cliff δ':>7}" f" {'p-value':>9}" ) print(f" {'':─<24} {'':─<18} {'':─<18} {'':─<18} {'':─<18} {'':─<7} {'':─<9}") for r in rows: c = r["comparisons"] print( f" {r['operation']:<24}" f" {_fmt_speedup(c.get('refo0_to_refnv')):>18}" f" {_fmt_speedup(c.get('refnv_to_ref')):>18}" f" {_fmt_speedup(c.get('ref_to_avx2')):>18}" f" {_fmt_speedup(c.get('refo0_to_avx2')):>18}" f" {_fmt_delta(c.get('ref_to_avx2')):>7}" f" {_fmt_p(c.get('ref_to_avx2')):>9}" ) print(f"\n{'─'*110}") print(" Speedup = median(baseline) / median(variant); CI: 95% bootstrap (5000 iterations)") print(" Cliff δ and p-value are for ref → avx2 comparison (H1: avx2 cycles < ref cycles)") # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser(description="Statistical analysis of pqc-bench results") src = parser.add_mutually_exclusive_group(required=True) src.add_argument("--data", metavar="DIR", help="data directory (runs Go aggregator)") src.add_argument("--json", metavar="FILE", help="pre-generated aggregate JSON with --raw") parser.add_argument("--out", metavar="FILE", help="write analysis JSON to this file") args = parser.parse_args() if args.json: records = load_json(args.json) print(f"Loaded {len(records)} groups from {args.json}.", file=sys.stderr) else: print("Running aggregator...", file=sys.stderr) records = run_aggregator(args.data) print(f"Loaded {len(records)} groups.", file=sys.stderr) results = analyze(records) print_table(results) if args.out: with open(args.out, "w") as f: json.dump(results, f, indent=2) print(f"\nWrote analysis JSON to {args.out}", file=sys.stderr) if __name__ == "__main__": main()