46155bbe91
Ship the new simulation & verification stack under simulation/:
- mmio_regions.py — address → region classifier (DDRCTL, DDRPHY,
OTP, SRAM, …). Shared by every other tool so trace output is
scannable without memorising the memory map.
- sim_tripwire.py — Bin-style per-access capture. Records
(seq, insn_tick, pc, addr, size, rw, val, region, fn_name) per
MMIO access. PCResolver bisects the vendor funs table parsed
from ddr_conservative_asm.s.
- tripwire_diff.py — PC-bucketed difflib.SequenceMatcher diff of
two tripwire CSVs. Buckets by fn_name so bitflip-induced control
flow divergence doesn't cascade noise.
- training_sim.py — DDR training simulator with --mode pass and
--mode bitflip (flip first N reads per training status, exercise
retry paths). BITFLIP_ONLY env var narrows to a single addr for
the sweep.
- bitflip_sweep.py — Flip each of 23 training-status addresses
one-at-a-time and tabulate retry convergence. Surfaces which
function(s) react to a transient fault by writing different
downstream register values.
Plus:
- mmio_diff.py updated: region-tagged divergence output,
--show-regions histogram, --tripwire-out-{vendor,rebuilt} CSV
capture, --capture-stack-writes for stack-allocated buffer diffs.
- debug_probes/tp_slot_{probe,writes}.py — ad-hoc Unicorn probes
for chasing a single-slot divergence in an SRAM buffer. Kept as
reference examples of how to extend the tripwire toolchain.
The stack found 6 silicon-hostile bugs in the rebuilt blob that
mmio_diff's write-sequence gate was structurally blind to, including
three ld-unresolved-symbol NULL derefs (case-mismatched externs,
missing DATA_SYMS) and one C-early-return-skips-shared-tail bug
where vendor's asm fell through to the tail via `b` after a `ret`.
190 lines
6.4 KiB
Python
190 lines
6.4 KiB
Python
#!/usr/bin/env python3
|
|
"""tripwire_diff.py — PC-bucketed sequence diff of two tripwire CSVs.
|
|
|
|
Per Janet (2026-04-21): cross-index diff is destroyed the moment control
|
|
flow diverges (bitflip mode guarantees this). Group records by fn_name,
|
|
diff per bucket with difflib.SequenceMatcher. Long edit-distance buckets
|
|
get tagged SUSPECT and emit a raw side-by-side sub-sequence for human
|
|
triage — we do not try to auto-resolve.
|
|
|
|
Usage:
|
|
tripwire_diff.py vendor.csv rebuilt.csv [--suspect-threshold 0.9]
|
|
[--show-identical] [--limit-per-bucket N]
|
|
|
|
Key for each record inside a bucket (tunable): region + addr + rw + val.
|
|
PC is excluded because codegen reg-alloc can shift individual load/store
|
|
PCs within a function without changing behavior. `seq` and `tick` are
|
|
excluded because they drift with any upstream path difference.
|
|
"""
|
|
import argparse
|
|
import difflib
|
|
import sys
|
|
from collections import defaultdict
|
|
|
|
from sim_tripwire import load_csv
|
|
|
|
|
|
def bucket_key(rec):
|
|
"""Inside a fn_name bucket, the canonical record key for diffing."""
|
|
return (rec["region"], rec["addr"], rec["rw"], rec["val"], rec["size"])
|
|
|
|
|
|
def bucket_by_fn(records):
|
|
buckets = defaultdict(list)
|
|
for r in records:
|
|
buckets[r["fn"]].append(r)
|
|
return buckets
|
|
|
|
|
|
def ratio(seq_a, seq_b):
|
|
"""Cheap-first ratio. Skips O(n²) SequenceMatcher when obviously not similar."""
|
|
if not seq_a and not seq_b:
|
|
return 1.0
|
|
if not seq_a or not seq_b:
|
|
return 0.0
|
|
if seq_a == seq_b:
|
|
return 1.0
|
|
sm = difflib.SequenceMatcher(a=seq_a, b=seq_b, autojunk=False)
|
|
# quick_ratio is an upper bound computed from set intersection —
|
|
# useful as an early reject when buckets share nothing.
|
|
qr = sm.quick_ratio()
|
|
if qr < 0.5:
|
|
return qr
|
|
return sm.ratio()
|
|
|
|
|
|
def _sm_cache(va, vb):
|
|
"""Return (key_a, key_b, cached SequenceMatcher) once, reuse for opcodes."""
|
|
ka = [bucket_key(r) for r in va]
|
|
kb = [bucket_key(r) for r in vb]
|
|
if ka == kb:
|
|
return ka, kb, None
|
|
sm = difflib.SequenceMatcher(a=ka, b=kb, autojunk=False)
|
|
return ka, kb, sm
|
|
|
|
|
|
def fmt_rec(rec):
|
|
return (f"{rec['region']:11s} {rec['rw']} 0x{rec['addr']:08x} "
|
|
f"sz={rec['size']} val=0x{rec['val']:x} (pc=0x{rec['pc']:x})")
|
|
|
|
|
|
def diff_bucket(name, va, vb, limit, show_identical):
|
|
ka = [bucket_key(r) for r in va]
|
|
kb = [bucket_key(r) for r in vb]
|
|
r = ratio(ka, kb)
|
|
status = "OK " if ka == kb else f"{r:.3f} "
|
|
if ka == kb and not show_identical:
|
|
return status, None
|
|
if ka == kb:
|
|
return status, (f"{name:22s} OK {len(va)} records match "
|
|
"(showing on --show-identical)")
|
|
# Surface an edit-script side-by-side
|
|
sm = difflib.SequenceMatcher(a=ka, b=kb, autojunk=False)
|
|
lines = [f"{name:22s} {status} "
|
|
f"vendor={len(va):5d} rebuilt={len(vb):5d}"]
|
|
shown = 0
|
|
for tag, i1, i2, j1, j2 in sm.get_opcodes():
|
|
if tag == "equal":
|
|
continue
|
|
for i in range(i1, i2):
|
|
if shown >= limit: break
|
|
lines.append(f" - [V#{va[i]['seq']:5d}] {fmt_rec(va[i])}")
|
|
shown += 1
|
|
for j in range(j1, j2):
|
|
if shown >= limit: break
|
|
lines.append(f" + [R#{vb[j]['seq']:5d}] {fmt_rec(vb[j])}")
|
|
shown += 1
|
|
if shown >= limit:
|
|
lines.append(f" ... (truncated at {limit} per bucket)")
|
|
break
|
|
return status, "\n".join(lines)
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("vendor")
|
|
ap.add_argument("rebuilt")
|
|
ap.add_argument("--suspect-threshold", type=float, default=0.9,
|
|
help="buckets with ratio below this get SUSPECT tag")
|
|
ap.add_argument("--show-identical", action="store_true")
|
|
ap.add_argument("--limit-per-bucket", type=int, default=20,
|
|
help="max insert/delete lines per bucket in the report")
|
|
args = ap.parse_args()
|
|
|
|
vrecs = load_csv(args.vendor)
|
|
rrecs = load_csv(args.rebuilt)
|
|
print(f"# vendor: {len(vrecs):6d} records ({args.vendor})")
|
|
print(f"# rebuilt: {len(rrecs):6d} records ({args.rebuilt})")
|
|
|
|
vb = bucket_by_fn(vrecs)
|
|
rb = bucket_by_fn(rrecs)
|
|
fns = sorted(set(vb) | set(rb))
|
|
|
|
print(f"# buckets: {len(fns)} functions touched across either side")
|
|
print()
|
|
ok = susp = diff = 0
|
|
reports = []
|
|
suspects = []
|
|
for fn in fns:
|
|
va = vb.get(fn, [])
|
|
rs = rb.get(fn, [])
|
|
ka, kb, sm = _sm_cache(va, rs)
|
|
if sm is None:
|
|
ok += 1
|
|
if args.show_identical:
|
|
reports.append(f"{fn:22s} OK {len(va)} records")
|
|
continue
|
|
# Fast: set-intersection upper bound; short-circuit on no overlap
|
|
qr = sm.quick_ratio()
|
|
r = qr if qr < 0.5 else sm.ratio()
|
|
tag = "SUSPECT" if r < args.suspect_threshold else " "
|
|
if r < args.suspect_threshold:
|
|
susp += 1
|
|
else:
|
|
diff += 1
|
|
lines = [f"{fn:22s} vendor={len(va):5d} rebuilt={len(rs):5d}"]
|
|
shown = 0
|
|
for op_tag, i1, i2, j1, j2 in sm.get_opcodes():
|
|
if op_tag == "equal":
|
|
continue
|
|
for i in range(i1, i2):
|
|
if shown >= args.limit_per_bucket: break
|
|
lines.append(f" - [V#{va[i]['seq']:5d}] {fmt_rec(va[i])}")
|
|
shown += 1
|
|
for j in range(j1, j2):
|
|
if shown >= args.limit_per_bucket: break
|
|
lines.append(f" + [R#{rs[j]['seq']:5d}] {fmt_rec(rs[j])}")
|
|
shown += 1
|
|
if shown >= args.limit_per_bucket:
|
|
lines.append(f" ... (truncated at {args.limit_per_bucket})")
|
|
break
|
|
rep = "\n".join(lines)
|
|
if r < args.suspect_threshold:
|
|
suspects.append((r, fn, rep))
|
|
else:
|
|
reports.append(f"[{tag}] r={r:.3f} " + rep)
|
|
|
|
print(f"# OK: {ok} minor-diff: {diff} SUSPECT(<{args.suspect_threshold}): {susp}")
|
|
print()
|
|
if suspects:
|
|
suspects.sort()
|
|
print(f"## SUSPECT BUCKETS ({len(suspects)}) — human triage required")
|
|
print()
|
|
for _, fn, rep in suspects:
|
|
print(rep)
|
|
print()
|
|
# Any minor-diff buckets worth dumping too
|
|
for line in reports:
|
|
if "SUSPECT" not in line:
|
|
continue
|
|
for line in reports:
|
|
if "SUSPECT" in line:
|
|
continue
|
|
print(line)
|
|
|
|
return 0 if not suspects else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|