From d22f48ae085c9c06c825abb8b59698fb5916614a Mon Sep 17 00:00:00 2001 From: Levi Neuwirth Date: Sun, 3 May 2026 20:56:46 -0400 Subject: [PATCH] Robustness fixes --- crates/levcs-cli/src/repo_cmds.rs | 106 +++++++-- crates/levcs-cli/tests/merge.rs | 96 +++++++++ crates/levcs-identity/src/verify.rs | 324 ++++++++++++++++++++++++++-- crates/levcs-instance/src/lib.rs | 174 +++++++++++++-- crates/levcs-instance/src/mirror.rs | 21 +- crates/levcs-protocol/src/pack.rs | 84 +++++++- 6 files changed, 749 insertions(+), 56 deletions(-) diff --git a/crates/levcs-cli/src/repo_cmds.rs b/crates/levcs-cli/src/repo_cmds.rs index 4d0e16b..79aae27 100644 --- a/crates/levcs-cli/src/repo_cmds.rs +++ b/crates/levcs-cli/src/repo_cmds.rs @@ -783,6 +783,16 @@ pub fn branch(args: BranchArgs) -> Result<()> { _ => bail!("branch tip is not a commit"), }; repo.checkout_tree(tree_id, &repo.workdir)?; + // Refresh the index from the new tree. Without this the index + // keeps the previous branch's blob hashes — invisible to most + // workflows because the next `commit` rebuilds the index from + // the working tree, but visible to anything that compares + // index-vs-workdir (e.g., the merge command's dirty-tree + // precondition, which would otherwise false-positive on every + // branch switch). + let mut idx = Index::new(); + rebuild_index_from_tree(&repo, tree_id, "", &mut idx)?; + repo.write_index(&idx)?; eprintln!("switched to branch {name}"); } if let Some(name) = args.delete { @@ -806,6 +816,51 @@ pub fn merge(args: MergeArgs) -> Result<()> { merge_run(args) } +/// Return the list of tracked paths whose working-tree contents differ from +/// the index, including paths that are tracked but missing from disk. Used +/// as a precondition for any operation that overwrites the working tree +/// (currently: `merge`, both fast-forward and three-way). Callers should +/// refuse to proceed when the returned list is non-empty so users don't +/// silently lose uncommitted work. +fn dirty_tracked_paths(repo: &Repository) -> Result> { + let idx = repo.read_index()?; + let mut workdir_set: HashSet = HashSet::new(); + for path in repo.walk_workdir()? { + let rel = path + .strip_prefix(&repo.workdir)? + .to_string_lossy() + .replace('\\', "/"); + workdir_set.insert(rel); + } + let mut dirty = Vec::new(); + for entry in &idx.entries { + if !entry.flags.is_tracked() { + continue; + } + let abs = repo.workdir.join(&entry.path); + if !workdir_set.contains(&entry.path) { + // Tracked file removed from working tree without `levcs commit` + // — counts as dirty for merge purposes since the merge would + // resurrect it (or compute against stale on-disk state). + dirty.push(entry.path.clone()); + continue; + } + let bytes = match fs::read(&abs) { + Ok(b) => b, + Err(_) => { + dirty.push(entry.path.clone()); + continue; + } + }; + let id = Blob::new(bytes).object_id(); + if id != entry.blob_hash { + dirty.push(entry.path.clone()); + } + } + dirty.sort(); + Ok(dirty) +} + fn merge_run(args: MergeArgs) -> Result<()> { let branch_name = args .branch @@ -815,6 +870,29 @@ fn merge_run(args: MergeArgs) -> Result<()> { if repo.levcs_dir.join("MERGE_HEAD").exists() { bail!("a merge is already in progress; run `levcs merge --abort` to cancel"); } + // Refuse to start a merge when tracked files have uncommitted changes — + // both the fast-forward and three-way paths overwrite the working + // tree, and silently clobbering local edits is the kind of bug that + // costs users hours of work. Mirrors git's `Your local changes to + // the following files would be overwritten by merge` precondition. + let dirty = dirty_tracked_paths(&repo)?; + if !dirty.is_empty() { + let listing = dirty + .iter() + .take(10) + .map(|p| format!(" {p}")) + .collect::>() + .join("\n"); + let more = if dirty.len() > 10 { + format!("\n ... and {} more", dirty.len() - 10) + } else { + String::new() + }; + bail!( + "uncommitted changes to tracked files would be overwritten by merge:\n{listing}{more}\n\ + commit them (or revert to HEAD) before merging — see `levcs status`." + ); + } let head = repo .refs .resolve_head()? @@ -1001,6 +1079,21 @@ fn merge_run(args: MergeArgs) -> Result<()> { record.files.push(fr); } + // Repo-side policy ceiling: every handler reference in the record must + // be permitted by `.levcs/merge.toml` (§6.6). This used to run *after* + // we applied the merge to the working tree, which left the user's + // files clobbered when the policy check then bailed. Validate up + // front, before any disk write, so a rejected merge leaves the + // working tree exactly as we found it. + let allowed = load_merge_policy_allowed(&repo); + let bad = validate_record_against_policy(&record, &allowed); + if !bad.is_empty() { + bail!( + "merge produced records referencing handlers not in repository policy: {}", + bad.join(", ") + ); + } + // Apply to working tree. for (path, bytes) in &merged_files { let abs = repo.workdir.join(path); @@ -1039,19 +1132,6 @@ fn merge_run(args: MergeArgs) -> Result<()> { } repo.write_index(&idx)?; - // Repo-side policy ceiling: every handler reference in the record must - // be permitted by `.levcs/merge.toml` (§6.6). Catch this before we - // touch the working tree's merge state so an aborted policy-violating - // merge leaves nothing to clean up. - let allowed = load_merge_policy_allowed(&repo); - let bad = validate_record_against_policy(&record, &allowed); - if !bad.is_empty() { - bail!( - "merge produced records referencing handlers not in repository policy: {}", - bad.join(", ") - ); - } - // Persist merge state and the in-progress merge-record. fs::write(repo.levcs_dir.join("MERGE_HEAD"), theirs_id.to_hex())?; fs::write(repo.levcs_dir.join("MERGE_BASE"), base_id.to_hex())?; diff --git a/crates/levcs-cli/tests/merge.rs b/crates/levcs-cli/tests/merge.rs index 0be62e3..4585ba7 100644 --- a/crates/levcs-cli/tests/merge.rs +++ b/crates/levcs-cli/tests/merge.rs @@ -417,6 +417,102 @@ fn merge_local_toml_promotion_is_rejected() { assert!(e.contains("promote"), "error must say 'promote': {e}"); } +/// Regression test for the dirty-tree merge precondition. Before this +/// guard, `levcs merge` would silently overwrite uncommitted edits to +/// tracked files — clobbering work the user hadn't yet committed. +#[test] +fn merge_refuses_when_workdir_has_uncommitted_changes() { + let (work, xdg) = init_repo(); + std::fs::write(work.join("a.txt"), b"original\n").unwrap(); + assert_eq!(run(&["track", "--all"], &work, &xdg).0, 0); + assert_eq!(run(&["commit", "-m", "base"], &work, &xdg).0, 0); + + // Create feat branch with a divergent change so a real merge would run. + assert_eq!(run(&["branch", "--create", "feat"], &work, &xdg).0, 0); + assert_eq!(run(&["branch", "--switch", "feat"], &work, &xdg).0, 0); + std::fs::write(work.join("b.txt"), b"feat side\n").unwrap(); + assert_eq!(run(&["track", "--all"], &work, &xdg).0, 0); + assert_eq!(run(&["commit", "-m", "feat add"], &work, &xdg).0, 0); + + // Back on main, dirty `a.txt` *without committing*. + assert_eq!(run(&["branch", "--switch", "main"], &work, &xdg).0, 0); + std::fs::write(work.join("a.txt"), b"local-uncommitted-edit\n").unwrap(); + + // Merge must refuse and name the dirty file in the error. + let (code, _o, e) = run(&["merge", "feat"], &work, &xdg); + assert_ne!(code, 0, "merge must refuse on dirty tree: stderr={e}"); + assert!( + e.contains("uncommitted changes") && e.contains("a.txt"), + "error must explain refusal and name the file: {e}" + ); + + // Critically: the local edit must NOT have been overwritten, and no + // merge state should have been created. + let bytes = std::fs::read(work.join("a.txt")).unwrap(); + assert_eq!( + bytes, b"local-uncommitted-edit\n", + "user's uncommitted edit must be preserved when merge is refused" + ); + assert!(!work.join(".levcs/MERGE_HEAD").exists()); + assert!(!work.join(".levcs/merge-record").exists()); +} + +/// When the merge engine produces a record that violates the repo's +/// `[policy].allowed_handlers`, the merge must fail BEFORE touching the +/// working tree. Previously the policy check ran after `fs::write`, so +/// a rejected merge still left half-merged content on disk and stale +/// blob hashes in the index. +#[test] +fn policy_violation_rejects_merge_before_writing_working_tree() { + let (work, xdg) = init_repo(); + std::fs::write(work.join("a.txt"), b"original\n").unwrap(); + assert_eq!(run(&["track", "--all"], &work, &xdg).0, 0); + assert_eq!(run(&["commit", "-m", "base"], &work, &xdg).0, 0); + + assert_eq!(run(&["branch", "--create", "feat"], &work, &xdg).0, 0); + std::fs::write(work.join("a.txt"), b"main side\n").unwrap(); + assert_eq!(run(&["commit", "-m", "main"], &work, &xdg).0, 0); + assert_eq!(run(&["branch", "--switch", "feat"], &work, &xdg).0, 0); + std::fs::write(work.join("a.txt"), b"feat side\n").unwrap(); + assert_eq!(run(&["commit", "-m", "feat"], &work, &xdg).0, 0); + + // Switch to main and write a policy that forbids EVERYTHING — every + // file the engine touches will be flagged. (An empty allow-list with + // a non-empty handlers field on every record entry guarantees a + // mismatch; we want to verify that even though the engine produced + // a complete merge plan, the writes never landed.) + assert_eq!(run(&["branch", "--switch", "main"], &work, &xdg).0, 0); + let main_a = std::fs::read(work.join("a.txt")).unwrap(); + std::fs::write( + work.join(".levcs/merge.toml"), + b"[policy]\nallowed_handlers = [\"this-handler-does-not-exist\"]\n", + ) + .unwrap(); + + let (code, _, e) = run(&["merge", "feat"], &work, &xdg); + assert_ne!(code, 0, "merge must fail under restrictive policy"); + assert!( + e.contains("not in repository policy"), + "error must explain policy mismatch: {e}" + ); + + // Working tree must be untouched: `a.txt` still holds main's content, + // not a partial merge result. And no merge state was committed. + let after = std::fs::read(work.join("a.txt")).unwrap(); + assert_eq!( + after, main_a, + "policy-rejected merge must not modify the working tree" + ); + assert!( + !work.join(".levcs/MERGE_HEAD").exists(), + "policy-rejected merge must not leave MERGE_HEAD behind" + ); + assert!( + !work.join(".levcs/merge-record").exists(), + "policy-rejected merge must not persist a merge-record" + ); +} + #[test] fn explain_dumps_merge_record() { let (work, xdg) = init_repo(); diff --git a/crates/levcs-identity/src/verify.rs b/crates/levcs-identity/src/verify.rs index 78c046c..4d7c22e 100644 --- a/crates/levcs-identity/src/verify.rs +++ b/crates/levcs-identity/src/verify.rs @@ -11,6 +11,7 @@ //! remote-fetching shim. use std::collections::HashMap; +use std::sync::Arc; use thiserror::Error; @@ -186,27 +187,125 @@ fn verify_authority_step( /// Walk an authority back to genesis, verifying each step. Returns the body /// of the genesis authority on success. +/// +/// Standalone calls allocate a fresh verifier each time. Callers that +/// verify many tips in a loop (mirror sync, push handler iterating +/// commits) should use `ChainVerifier` directly so the chain walk can be +/// shared — without it, verifying N commits that all cite the same +/// authority is O(N × chain_depth). pub fn verify_authority_chain( src: &S, start: ObjectId, ) -> Verification { - let mut cur_id = start; - let mut cur_signed = read_signed(src, cur_id)?; - let mut cur_body = AuthorityBody::parse(&cur_signed.body) - .map_err(|e| VerifyError::Authority(e.to_string()))?; - while !cur_body.previous_authority.is_zero() { - let prev_id = cur_body.previous_authority; - let prev_signed = read_signed(src, prev_id)?; - let prev_body = AuthorityBody::parse(&prev_signed.body) - .map_err(|e| VerifyError::Authority(e.to_string()))?; - verify_authority_step(&cur_signed, &cur_body, &prev_body, prev_id)?; - cur_signed = prev_signed; - cur_body = prev_body; - cur_id = prev_id; + let mut v = ChainVerifier::new(); + let body = v.verify_chain(src, start)?; + Ok((*body).clone()) +} + +/// Caches authority chains that have been fully verified back to genesis. +/// Hand the same verifier to a sequence of `verify_chain` / `verify_commit` +/// / `verify_release` calls and the per-call cost drops from O(chain +/// depth) to O(1) once an ancestor has been seen. +/// +/// The cache is keyed by authority id, which is a BLAKE3 of the signed +/// object — collisions are infeasible — so a hit is sound: the underlying +/// bytes are guaranteed identical to whatever produced the original +/// success. Insertions are atomic per call: a partial walk that fails +/// midway leaves the cache untouched. +/// +/// Not internally synchronized. Callers that share a verifier across +/// threads should wrap it in `Mutex<_>`. +#[derive(Default)] +pub struct ChainVerifier { + /// Maps any id along a verified chain → the chain's genesis body. + /// The same `Arc` is shared across every entry that + /// belongs to one chain, so memory cost scales with the number of + /// distinct chains, not with the number of authorities. + verified: HashMap>, +} + +impl ChainVerifier { + pub fn new() -> Self { + Self::default() + } + + /// Verify the authority chain rooted at `start` back to genesis. + /// On a cache hit (any id along a previously-verified chain), returns + /// the cached genesis body in O(1). On a cache miss, walks the chain, + /// verifies each step, and on full success records every walked id + /// against a single shared genesis body. + pub fn verify_chain( + &mut self, + src: &S, + start: ObjectId, + ) -> Verification> { + if let Some(g) = self.verified.get(&start) { + return Ok(g.clone()); + } + // Walk back, accumulating the path. We don't insert anything into + // the cache until the entire walk succeeds — a partial walk that + // errors out must not leave half-trusted ids cached. + let mut walked: Vec = Vec::new(); + let mut cur_id = start; + let mut cur_signed = read_signed(src, cur_id)?; + let mut cur_body = AuthorityBody::parse(&cur_signed.body) + .map_err(|e| VerifyError::Authority(e.to_string()))?; + let genesis: Arc = loop { + walked.push(cur_id); + if cur_body.previous_authority.is_zero() { + break Arc::new(verify_genesis(&cur_signed)?); + } + let prev_id = cur_body.previous_authority; + // Cache hit on the predecessor: we still need to verify the + // step from cur → prev (because the *step's* signature isn't + // covered by prev being known-good), but the rest of the + // chain back to genesis is already trusted. + if let Some(g) = self.verified.get(&prev_id).cloned() { + let prev_signed = read_signed(src, prev_id)?; + let prev_body = AuthorityBody::parse(&prev_signed.body) + .map_err(|e| VerifyError::Authority(e.to_string()))?; + verify_authority_step(&cur_signed, &cur_body, &prev_body, prev_id)?; + break g; + } + let prev_signed = read_signed(src, prev_id)?; + let prev_body = AuthorityBody::parse(&prev_signed.body) + .map_err(|e| VerifyError::Authority(e.to_string()))?; + verify_authority_step(&cur_signed, &cur_body, &prev_body, prev_id)?; + cur_signed = prev_signed; + cur_body = prev_body; + cur_id = prev_id; + }; + for id in walked { + self.verified.insert(id, genesis.clone()); + } + Ok(genesis) + } + + /// Cache-aware variant of `verify_commit`. Identical semantics; the + /// only behavioural difference is that authority chains visited in + /// prior calls don't get re-walked. + pub fn verify_commit( + &mut self, + src: &S, + commit_id: ObjectId, + target_ref: Option<&str>, + ) -> Verification<()> { + verify_commit_inner(src, commit_id, target_ref, self) + } + + /// Cache-aware variant of `verify_release`. + pub fn verify_release( + &mut self, + src: &S, + release_id: ObjectId, + ) -> Verification<()> { + verify_release_inner(src, release_id, self) + } + + #[cfg(test)] + pub(crate) fn cache_size(&self) -> usize { + self.verified.len() } - let _ = cur_id; - let body = verify_genesis(&cur_signed)?; - Ok(body) } /// Verify a successor authority object against `A_old`, given the signer key @@ -303,10 +402,23 @@ pub fn role_for_commit( /// Full commit verification per §3.6 algorithm. `target_ref` is the ref the /// commit is being applied to (used for protected-branch role checks); pass /// `None` if not applicable (e.g., during walking). +/// +/// Each call walks the authority chain from scratch. To share that work +/// across many commits — e.g., during a mirror sync that verifies a +/// branch's worth of tips — use `ChainVerifier::verify_commit` instead. pub fn verify_commit( src: &S, commit_id: ObjectId, target_ref: Option<&str>, +) -> Verification<()> { + verify_commit_inner(src, commit_id, target_ref, &mut ChainVerifier::new()) +} + +fn verify_commit_inner( + src: &S, + commit_id: ObjectId, + target_ref: Option<&str>, + verifier: &mut ChainVerifier, ) -> Verification<()> { let bytes = src.read_raw(commit_id)?; let actual = blake3::hash(&bytes); @@ -356,7 +468,7 @@ pub fn verify_commit( let auth_signed = read_signed(src, commit.authority)?; let auth_body = AuthorityBody::parse(&auth_signed.body) .map_err(|e| VerifyError::Authority(e.to_string()))?; - let _ = verify_authority_chain(src, commit.authority)?; + let _ = verifier.verify_chain(src, commit.authority)?; let member = auth_body .find_member(&pk) .ok_or_else(|| VerifyError::Commit { @@ -425,6 +537,14 @@ pub fn verify_commit( /// against the authority body. A release with no listed members signing /// it is rejected. pub fn verify_release(src: &S, release_id: ObjectId) -> Verification<()> { + verify_release_inner(src, release_id, &mut ChainVerifier::new()) +} + +fn verify_release_inner( + src: &S, + release_id: ObjectId, + verifier: &mut ChainVerifier, +) -> Verification<()> { let bytes = src.read_raw(release_id)?; let actual = blake3::hash(&bytes); if *actual.as_bytes() != release_id.0 { @@ -455,7 +575,7 @@ pub fn verify_release(src: &S, release_id: ObjectId) -> Verific hash: release_id.to_hex(), kind: e.to_string(), })?; - let _ = verify_authority_chain(src, release.authority)?; + let _ = verifier.verify_chain(src, release.authority)?; let auth_signed = read_signed(src, release.authority)?; let auth_body = AuthorityBody::parse(&auth_signed.body) .map_err(|e| VerifyError::Authority(e.to_string()))?; @@ -731,4 +851,172 @@ mod tests { signed.body[0] ^= 0xFF; // tamper assert!(verify_signed_object(&signed).is_err()); } + + /// `ObjectSource` wrapper that counts each `read_raw` call. Lets us + /// assert exactly how many bytes the verifier actually fetched, which + /// is the only direct way to observe a chain-cache hit. + struct CountingSource<'a> { + inner: &'a MemorySource, + reads: std::cell::Cell, + } + impl<'a> CountingSource<'a> { + fn new(inner: &'a MemorySource) -> Self { + Self { + inner, + reads: std::cell::Cell::new(0), + } + } + fn reads(&self) -> usize { + self.reads.get() + } + } + impl<'a> ObjectSource for CountingSource<'a> { + fn read_raw(&self, id: ObjectId) -> Verification> { + self.reads.set(self.reads.get() + 1); + self.inner.read_raw(id) + } + } + + /// Build a chain of `length` authorities — genesis at index 0, each + /// later entry a properly-signed successor of the previous — backed + /// by `alice` (an Owner) for the whole walk. Returns the + /// `MemorySource` plus the list of authority ids in order. + fn build_chain(length: usize, alice: &SecretKey) -> (MemorySource, Vec) { + assert!(length >= 1); + let alice_pk = alice.public(); + let now = 1_700_000_000_000_000; + let mut genesis = AuthorityBody { + schema_version: 1, + repo_id: ObjectId([0u8; 32]), + previous_authority: ObjectId([0u8; 32]), + version: 1, + created_micros: now, + members: vec![crate::authority::MemberEntry { + key: alice_pk, + handle: "alice".into(), + role: Role::Owner, + added_micros: now, + added_by: alice_pk, + }], + policy: vec![crate::authority::PolicyEntry { + key: "public_read".into(), + value: vec![0x01], + }], + }; + genesis.normalize().unwrap(); + genesis.assign_genesis_repo_id().unwrap(); + let genesis_signed = sign_authority(&genesis, alice).unwrap(); + let genesis_id = ObjectId(*blake3::hash(&genesis_signed.serialize()).as_bytes()); + + let mut store: HashMap> = HashMap::new(); + let mut ids = vec![genesis_id]; + store.insert(genesis_id, genesis_signed.serialize()); + + let mut prev_id = genesis_id; + let mut prev_body = genesis; + for v in 2..=length as u32 { + let mut body = AuthorityBody { + schema_version: prev_body.schema_version, + repo_id: prev_body.repo_id, + previous_authority: prev_id, + version: v, + created_micros: now + v as i64 * 1_000_000, + members: prev_body.members.clone(), + policy: prev_body.policy.clone(), + }; + body.normalize().unwrap(); + let signed = sign_authority(&body, alice).unwrap(); + let id = ObjectId(*blake3::hash(&signed.serialize()).as_bytes()); + store.insert(id, signed.serialize()); + ids.push(id); + prev_id = id; + prev_body = body; + } + (MemorySource(store), ids) + } + + /// Verifying a chain of length N from the tip touches every + /// authority object exactly once. A second verification of the same + /// tip — using the same `ChainVerifier` — must perform zero reads. + /// This is the user-visible win: O(N²) → O(N) total work for N + /// commits citing the same chain. + #[test] + fn chain_verifier_caches_walked_authorities() { + let alice = SecretKey::generate(); + let (mem, ids) = build_chain(5, &alice); + let src = CountingSource::new(&mem); + let mut verifier = ChainVerifier::new(); + + let _ = verifier.verify_chain(&src, *ids.last().unwrap()).unwrap(); + let first_pass_reads = src.reads(); + assert!( + first_pass_reads >= 5, + "first verification must read every authority at least once; got {first_pass_reads}" + ); + // Every walked authority should now sit in the cache pointing + // at the same shared genesis body. + assert_eq!(verifier.cache_size(), ids.len()); + + let before = src.reads(); + let _ = verifier.verify_chain(&src, *ids.last().unwrap()).unwrap(); + assert_eq!( + src.reads(), + before, + "cached re-verification must perform zero reads" + ); + } + + /// Verifying an *ancestor* after the tip must also be a cache hit — + /// this is the practical case during a sync where one tip points at + /// `A_n` and a second tip points at `A_{n-1}`. + #[test] + fn chain_verifier_serves_ancestors_from_cache() { + let alice = SecretKey::generate(); + let (mem, ids) = build_chain(4, &alice); + let src = CountingSource::new(&mem); + let mut verifier = ChainVerifier::new(); + let _ = verifier.verify_chain(&src, *ids.last().unwrap()).unwrap(); + let after_tip = src.reads(); + // Ask for A_2 — should be a hit. + let _ = verifier.verify_chain(&src, ids[1]).unwrap(); + assert_eq!( + src.reads(), + after_tip, + "ancestor verification must hit cache, not re-walk" + ); + } + + /// A failing verification must NOT pollute the cache. The test + /// rebuilds a chain, corrupts the bytes of the most recent + /// authority, and checks that (a) verification fails, (b) the + /// cache stays empty, and (c) a follow-up valid lookup still walks + /// the chain rather than serving a phantom hit. + #[test] + fn chain_verifier_does_not_cache_on_failure() { + let alice = SecretKey::generate(); + let (mut mem, ids) = build_chain(3, &alice); + let tip_id = *ids.last().unwrap(); + // Mutate the byte at offset 12 of the tip's signed bytes — that + // sits inside the authority body, so the recorded BLAKE3 hash + // will no longer match the id that names it. The verifier reads + // by id and re-hashes the bytes; mismatched bytes propagate as + // a `repo_id derivation invalid` or a parse-time error depending + // on the offset, but in either case the chain walk fails. + let bytes = mem.0.get(&tip_id).unwrap().clone(); + let mut bad = bytes.clone(); + bad[12] ^= 0xFF; + mem.0.insert(tip_id, bad); + + let src = CountingSource::new(&mem); + let mut verifier = ChainVerifier::new(); + assert!( + verifier.verify_chain(&src, tip_id).is_err(), + "corrupted tip must fail verification" + ); + assert_eq!( + verifier.cache_size(), + 0, + "failed verification must leave the cache empty" + ); + } } diff --git a/crates/levcs-instance/src/lib.rs b/crates/levcs-instance/src/lib.rs index 1007ac4..cf2a2e0 100644 --- a/crates/levcs-instance/src/lib.rs +++ b/crates/levcs-instance/src/lib.rs @@ -39,7 +39,7 @@ use levcs_identity::verify::{ }; use levcs_merge::engine::check_handler_allowed; use levcs_merge::record::MergeRecord; -use levcs_protocol::auth::{verify_request, AuthRequest, DEFAULT_CLOCK_SKEW}; +use levcs_protocol::auth::{verify_request, AuthRequest, DEFAULT_CLOCK_SKEW, NONCE_TTL_SECS}; use levcs_protocol::wire::{InfoResponse, InstanceInfo, RefList}; use levcs_protocol::Pack; @@ -149,24 +149,66 @@ impl AppState { } } +/// Replay-protection cache for §5.3 request nonces. +/// +/// `verify_request` already rejects timestamps outside ±`DEFAULT_CLOCK_SKEW`, +/// so a nonce only needs to be remembered while its parent timestamp is +/// still within the skew window — anything older is rejected for skew +/// before the cache is even consulted. We use `NONCE_TTL_SECS` (the +/// protocol-level constant) as the retention horizon, which is wider than +/// the skew window so that a small clock difference between client and +/// server can't open a replay window between the two checks. +/// +/// The earlier implementation was a `HashSet` that called `clear()` once +/// it grew past a count cap. That was a real replay vulnerability: an +/// attacker who captured a recent signed request could replay it the +/// instant the cache wiped, regardless of how long the original was +/// supposed to remain "seen." The TTL approach below is bounded in +/// memory by the rate of accepted requests times the TTL — at typical +/// federation load that's a few thousand entries, kilobytes of state. +/// How many inserts to accept before sweeping expired entries. Eviction +/// is O(len), so amortizing keeps the per-call cost O(1) average. Stale +/// entries that sit in the map a little longer cost nothing — they +/// would just match the TTL skew check upstream and be rejected anyway. +const NONCE_EVICT_BATCH: usize = 1024; + #[derive(Default)] pub struct NonceCache { - /// Maps nonce → expiry epoch (in micros). - seen: HashSet<[u8; 16]>, + /// `nonce → request timestamp (micros since epoch)`. We index by + /// timestamp rather than insertion time so a delayed request whose + /// own clock is slightly behind ours can't sneak past TTL eviction. + seen: HashMap<[u8; 16], i64>, + inserts_since_evict: usize, } impl NonceCache { - pub fn check_and_insert(&mut self, nonce: [u8; 16]) -> bool { - if self.seen.contains(&nonce) { - false - } else { - self.seen.insert(nonce); - // Cap memory by clearing periodically. - if self.seen.len() > 100_000 { - self.seen.clear(); - } - true + /// Check whether `nonce` (carried with `request_ts_micros`) has been + /// seen, and if not, record it. `now_micros` is the verifier's notion + /// of the current time, used to evict stale entries periodically. + /// Returns `true` if the nonce was *new* (request should proceed), + /// `false` if it was a replay. + pub fn check_and_insert( + &mut self, + nonce: [u8; 16], + request_ts_micros: i64, + now_micros: i64, + ) -> bool { + self.inserts_since_evict += 1; + if self.inserts_since_evict >= NONCE_EVICT_BATCH { + let cutoff = now_micros - NONCE_TTL_SECS * 1_000_000; + self.seen.retain(|_, ts| *ts >= cutoff); + self.inserts_since_evict = 0; } + if self.seen.contains_key(&nonce) { + return false; + } + self.seen.insert(nonce, request_ts_micros); + true + } + + #[cfg(test)] + pub fn len(&self) -> usize { + self.seen.len() } } @@ -201,6 +243,18 @@ struct ApiError(StatusCode, String); impl IntoResponse for ApiError { fn into_response(self) -> Response { + // Surface every error response in the server log before sending it + // to the client. Without this, 5xx and auth failures would vanish + // — the client sees the body but nothing reaches the operator. + // 5xx is a server-side bug worth `error!`; 4xx is the caller's + // problem (bad signature, malformed pack, conflict) and lands at + // `warn!` so it's still grep-able but doesn't blow up alerts. + let level_5xx = self.0.is_server_error(); + if level_5xx { + tracing::error!(status = %self.0, error = %self.1, "request failed"); + } else { + tracing::warn!(status = %self.0, error = %self.1, "request rejected"); + } (self.0, self.1).into_response() } } @@ -444,7 +498,7 @@ fn verify_request_against( let auth = verify_request(&req, key, ts, nonce, sig, now, DEFAULT_CLOCK_SKEW) .map_err(|e| err(StatusCode::UNAUTHORIZED, e.to_string()))?; let mut cache = s.nonce_cache.lock().unwrap(); - if !cache.check_and_insert(auth.nonce) { + if !cache.check_and_insert(auth.nonce, auth.timestamp_micros, now) { return Err(err(StatusCode::UNAUTHORIZED, "replayed nonce")); } Ok(AuthCheck { key: auth.key }) @@ -875,3 +929,95 @@ fn find_merge_record( // Allow `verify_authority_chain` to use ObjectStore directly. #[allow(dead_code)] fn _vs(_: &dyn VerifySource) {} + +#[cfg(test)] +mod tests { + use super::*; + + fn micros_from_secs(s: i64) -> i64 { + s * 1_000_000 + } + + /// Re-inserting the same nonce within the TTL window must be rejected. + /// This is the core anti-replay invariant; before the TTL rewrite the + /// cache also satisfied this property, so a green test here is the + /// floor, not the ceiling. + #[test] + fn nonce_replay_within_ttl_is_rejected() { + let mut cache = NonceCache::default(); + let nonce = [0x42u8; 16]; + let ts = micros_from_secs(1_700_000_000); + let now = ts + micros_from_secs(1); + assert!(cache.check_and_insert(nonce, ts, now)); + // Same nonce, slightly later "now": still within TTL, must reject. + assert!(!cache.check_and_insert(nonce, ts, now + micros_from_secs(60))); + } + + /// Once a nonce ages past `NONCE_TTL_SECS` it must be evicted from + /// the cache; what bounds memory growth is precisely this release. + /// (`verify_request` will reject the timestamp for skew long before + /// the cache ever sees a stale request again, so re-accepting the + /// nonce bytes is safe.) + /// + /// We drive `NONCE_EVICT_BATCH` distinct inserts at a fresh timestamp + /// to trigger one full eviction pass, then assert the original + /// (now-stale) entry has been swept. + #[test] + fn nonce_evicted_after_ttl_expires() { + let mut cache = NonceCache::default(); + let stale = [0x42u8; 16]; + let stale_ts = micros_from_secs(1_700_000_000); + assert!(cache.check_and_insert(stale, stale_ts, stale_ts)); + // Fast-forward "now" past the TTL window and force an eviction + // sweep by inserting a batch of fresh nonces. + let later = stale_ts + micros_from_secs(NONCE_TTL_SECS + 1); + for i in 0..(NONCE_EVICT_BATCH as u32) { + let mut n = [0u8; 16]; + n[..4].copy_from_slice(&i.to_le_bytes()); + n[15] = 0xFF; // disambiguate from `stale` + assert!(cache.check_and_insert(n, later, later)); + } + // The stale entry is gone; same-nonce-bytes with a fresh + // timestamp are allowed. + assert!(cache.check_and_insert(stale, later, later)); + } + + /// Regression test for the original CVE-shaped bug: the previous + /// implementation called `seen.clear()` once it grew past 100k + /// entries, which dropped every recently-seen nonce in one step and + /// allowed any captured request still within the 5-minute clock-skew + /// window to be replayed. + /// + /// Here we (a) drive the cache through several eviction passes with + /// junk-but-fresh nonces, then (b) try to replay a still-fresh nonce + /// inserted at the start. With time-bounded eviction the replay + /// must be rejected, because the original entry's timestamp is still + /// inside the TTL window. With the old count-bounded `clear()`, this + /// test would erroneously succeed (the replay would be accepted). + /// The flood size is intentionally a small multiple of + /// `NONCE_EVICT_BATCH` — the property doesn't depend on the exact + /// count, just on triggering the eviction path. + #[test] + fn nonce_cache_does_not_drop_fresh_entries_under_load() { + let mut cache = NonceCache::default(); + let base_ts = micros_from_secs(1_700_000_000); + let mut victim = [0u8; 16]; + victim[..8].copy_from_slice(&u64::MAX.to_le_bytes()); + // Insert the "captured" request first. + assert!(cache.check_and_insert(victim, base_ts, base_ts)); + // Flood with NONCE_EVICT_BATCH * 4 fresh-but-distinct nonces, all + // dated within the same TTL window so eviction can't help us. + let flood: u32 = (NONCE_EVICT_BATCH as u32) * 4; + for i in 0..flood { + let mut n = [0u8; 16]; + n[..4].copy_from_slice(&i.to_le_bytes()); + let now = base_ts + (i as i64) * 1_000; + assert!(cache.check_and_insert(n, now, now)); + } + let replay_now = base_ts + micros_from_secs(60); + assert!( + !cache.check_and_insert(victim, base_ts, replay_now), + "replay of fresh nonce must be rejected even when cache is large" + ); + } +} diff --git a/crates/levcs-instance/src/mirror.rs b/crates/levcs-instance/src/mirror.rs index 8396964..c3c44cd 100644 --- a/crates/levcs-instance/src/mirror.rs +++ b/crates/levcs-instance/src/mirror.rs @@ -17,7 +17,7 @@ use std::time::Duration; use levcs_client::{Client, ClientError}; use levcs_core::{ObjectId, ObjectStore, Refs, Repository}; -use levcs_identity::verify::{verify_authority_chain, verify_commit, verify_release}; +use levcs_identity::verify::ChainVerifier; use thiserror::Error; use crate::{InstanceConfig, MirrorConfig}; @@ -122,22 +122,25 @@ pub fn sync_mirror( store.write_raw(&ent.bytes)?; } + // Share one chain-verification cache across every per-tip check below. + // Without this, each tip independently walks its authority chain back + // to genesis — repeating identical work for every tip that cites the + // same authority. With the cache the second tip onwards is O(1). + let mut verifier = ChainVerifier::new(); + // Verify the authority chain on the announced current authority. - // Every commit / release we're about to advance to must chain back to - // a member that is rooted in `genesis_authority` — the chain walk - // checks that, so doing it once here covers all the per-tip checks - // below. (verify_commit re-walks the chain internally; that is - // redundant but cheap and keeps the per-tip checks self-contained.) + // This populates the cache with the entire chain so the per-tip + // checks below get cache hits. if !info.current_authority.is_empty() { let cur_auth = parse_hash(&info.current_authority)?; - verify_authority_chain(&store, cur_auth)?; + verifier.verify_chain(&store, cur_auth)?; } // Per-branch verification — fully checks signature, author membership, // and authority chain. If verification fails on any tip we abort // before touching local refs, so a bad source can never poison us. for (name, id) in &want_branches { - verify_commit(&store, *id, Some(&format!("refs/branches/{name}")))?; + verifier.verify_commit(&store, *id, Some(&format!("refs/branches/{name}")))?; } // Per-release verification: check the signed object itself and its // authority. The release object's full schema check (predecessor / @@ -146,7 +149,7 @@ pub fn sync_mirror( // the signing key is actually a member of the chain rooted in our // local genesis. for (_, id) in &want_releases { - verify_release(&store, *id)?; + verifier.verify_release(&store, *id)?; } // All checks passed — advance local refs. We do branches first, then diff --git a/crates/levcs-protocol/src/pack.rs b/crates/levcs-protocol/src/pack.rs index af5cfde..d45255a 100644 --- a/crates/levcs-protocol/src/pack.rs +++ b/crates/levcs-protocol/src/pack.rs @@ -52,6 +52,19 @@ pub const COMPRESSION_THRESHOLD: usize = 256; /// 3 is libzstd's default — a sensible balance of throughput and ratio. pub const COMPRESSION_LEVEL: i32 = 3; +/// Default ceiling on a single object's *uncompressed* size when decoding +/// a pack. The recorded `size` field is read straight off the wire and is +/// otherwise used as the destination capacity for decompression — a +/// hostile peer can declare `size = 1 TiB` against a tiny zstd frame and +/// trigger a multi-gigabyte allocation before any data has been +/// validated. Capping `size` at decode time short-circuits that. +/// +/// 256 MiB is generous for normal repository content (source files, even +/// large binaries) while remaining well below practical RAM limits on a +/// modest VPS. Callers that genuinely need to move larger blobs should +/// use `Pack::decode_prefix_with_limit` and pick their own ceiling. +pub const DEFAULT_MAX_OBJECT_BYTES: usize = 256 * 1024 * 1024; + #[derive(Debug, Error)] pub enum PackError { #[error("malformed pack: {0}")] @@ -171,11 +184,22 @@ impl Pack { Ok(pack) } - /// Decode a pack from the start of `bytes`. Returns the pack and the + /// Decode a pack from the start of `bytes` using the default per-object + /// size ceiling (`DEFAULT_MAX_OBJECT_BYTES`). Returns the pack and the /// number of bytes consumed; trailing bytes are not an error. This is /// used by the push wire format, which appends a manifest after the /// pack. pub fn decode_prefix(bytes: &[u8]) -> Result<(Self, usize), PackError> { + Self::decode_prefix_with_limit(bytes, DEFAULT_MAX_OBJECT_BYTES) + } + + /// Like `decode_prefix`, but lets the caller pick the per-object size + /// ceiling. Any entry whose recorded `size` exceeds `max_object_bytes` + /// is rejected before any allocation or decompression takes place. + pub fn decode_prefix_with_limit( + bytes: &[u8], + max_object_bytes: usize, + ) -> Result<(Self, usize), PackError> { if bytes.len() < 16 { return Err(PackError::Malformed("header truncated".into())); } @@ -206,8 +230,20 @@ impl Pack { } let object_type = bytes[p]; p += 1; - let size = LittleEndian::read_u64(&bytes[p..p + 8]) as usize; + let size_u64 = LittleEndian::read_u64(&bytes[p..p + 8]); p += 8; + // Reject implausibly-large `size` declarations *before* we + // touch the data. zstd's `decompress(_, size)` allocates the + // declared size up front, so leaving this unbounded is a + // memory-exhaustion vector. We also reject `size > usize::MAX` + // explicitly on 32-bit targets where the cast below would + // truncate. + if size_u64 > max_object_bytes as u64 { + return Err(PackError::Malformed(format!( + "entry size {size_u64} exceeds limit {max_object_bytes}" + ))); + } + let size = size_u64 as usize; let flags = bytes[p]; p += 1; let unknown = flags & !(FLAG_ZSTD | FLAG_DELTA); @@ -555,6 +591,50 @@ mod tests { } } + #[test] + fn pack_rejects_oversized_object_declaration() { + // Hand-craft a pack whose single entry declares a uncompressed + // size of 1 TiB. The decoder must reject this before allocating + // anything, regardless of how much data actually follows on the + // wire — a hostile peer can pair this with a tiny zstd frame to + // trigger a multi-gigabyte allocation in `zstd::bulk::decompress`. + let mut bytes = Vec::new(); + bytes.extend_from_slice(&PACK_MAGIC); + bytes.extend_from_slice(&PACK_VERSION.to_le_bytes()); + bytes.extend_from_slice(&1u64.to_le_bytes()); + bytes.push(1); // type + bytes.extend_from_slice(&(1u64 << 40).to_le_bytes()); // 1 TiB + bytes.push(0); // flags: raw + // (no body needed — the size check should fire before we look) + let err = Pack::decode(&bytes).unwrap_err(); + match err { + PackError::Malformed(s) => assert!( + s.contains("exceeds limit"), + "error must mention size limit: {s}" + ), + } + } + + #[test] + fn pack_decode_with_limit_admits_objects_under_caller_ceiling() { + // The custom-limit decoder should accept any entry up to its + // configured ceiling, even when smaller than the default. Build + // a 1 KiB raw entry, then decode with a 4 KiB limit. + let payload = vec![0xABu8; 1024]; + let mut pk = Pack::new(); + pk.push(1, payload.clone()); + let encoded = pk.encode(); + let (pk2, _) = Pack::decode_prefix_with_limit(&encoded, 4096).unwrap(); + assert_eq!(pk2.entries.len(), 1); + assert_eq!(pk2.entries[0].bytes, payload); + + // Same encoded pack, decoded with a 512-byte limit, must reject. + let err = Pack::decode_prefix_with_limit(&encoded, 512).unwrap_err(); + match err { + PackError::Malformed(s) => assert!(s.contains("exceeds limit")), + } + } + #[test] fn pack_rejects_delta_with_truncated_base_hash() { let mut bytes = Vec::new();