#!/usr/bin/env python3 """bitflip_sweep.py — flip each training-status address one-at-a-time and summarise how the rebuild's retry logic responds. For every training-status address (DDRPHY training + DDRCTL per-ch status), run training_sim twice: (a) --mode pass baseline (b) --mode bitflip with `is_training_status` restricted to just that one address Compare the two tripwire CSVs per run. Report: - how many records diverged - whether mmio writes still converge to the same final sequence - one-line summary: "retry fired? final state same? # write-value divergences?" Output a table row per address so you can scan for any address whose retry loop doesn't converge. """ import argparse import csv import os import subprocess import sys import tempfile BENCH = os.path.dirname(os.path.abspath(__file__)) TRAINING_TARGETS = [ ("DDRPHY:TR", 0xFE0C0000, 0x080, "MicroReset"), ("DDRPHY:TR", 0xFE0C0000, 0x090, "MicroContMux"), ("DDRPHY:TR", 0xFE0C0000, 0x0B4, "TrainingDone(b18)"), ("DDRPHY:TR", 0xFE0C0000, 0x3CC, "TrainingStep(b0)"), ("DDRPHY:TR", 0xFE0C0000, 0x514, "TrainingDone"), ("DDRPHY:TR", 0xFE0C0000, 0x684, "CalBusy"), ("DDRPHY:TR", 0xFE0C0000, 0xA24, "DfiStatus"), ] # Per-channel DDRCTL status addresses: expand for all 4 channels. DDRCTL_CHANNEL_BASES = (0xF7000000, 0xF8000000, 0xF9000000, 0xFA000000) DDRCTL_STATUS_OFFSETS = [ ("DDRCTL:SW", 0x10014, "STAT"), ("DDRCTL:MR", 0x10090, "MRSTAT"), ("DDRCTL:SW", 0x10C84, "DFISTAT"), ("DDRCTL:SW", 0x10514, "SWSTAT"), ] for ch_i, base in enumerate(DDRCTL_CHANNEL_BASES): for region, off, name in DDRCTL_STATUS_OFFSETS: TRAINING_TARGETS.append((region, base, off, f"{name} ch{ch_i}")) def run_sim(blob_path, flip_offset, flip_mask, out_csv, max_insn=500_000): """Run training_sim with a single-address bitflip. Uses env var BITFLIP_ONLY to narrow the is_training_status predicate in the simulator. If offset is None, runs plain pass-mode.""" env = os.environ.copy() if flip_offset is not None: env["BITFLIP_ONLY"] = f"{flip_offset:#x}" mode_args = ["--mode", "bitflip", "--flip-count", "1", "--flip-mask", f"{flip_mask:#x}"] else: env.pop("BITFLIP_ONLY", None) mode_args = ["--mode", "pass"] cmd = ["python3", os.path.join(BENCH, "training_sim.py"), blob_path, *mode_args, "--max-insn", str(max_insn), "--tripwire-out", out_csv] r = subprocess.run(cmd, capture_output=True, text=True, env=env) return r.returncode == 0 def load_csv(path): out = [] with open(path, newline="") as f: r = csv.DictReader(f) for row in r: row["seq"] = int(row["seq"]) row["tick"] = int(row["tick"]) row["pc"] = int(row["pc"], 16) row["addr"] = int(row["addr"], 16) row["val"] = int(row["val"], 16) out.append(row) return out def summarise(pass_csv, flip_csv, addr): """Diff by (addr, rw, val, size) key inside per-fn buckets, not by index — if retry causes a shift, index-by-index gets noisy. """ from collections import defaultdict p = load_csv(pass_csv) f = load_csv(flip_csv) def bucket(records): b = defaultdict(list) for r in records: b[r["fn"]].append((r["addr"], r["rw"], r["val"], r["size"], r)) return b pb = bucket(p) fb = bucket(f) all_fns = set(pb) | set(fb) wr_div_rows = [] # (fn, pass_row_or_None, flip_row_or_None) rd_div_count = 0 for fn in all_fns: pkeys = [(a, rw, v, s) for (a, rw, v, s, _) in pb.get(fn, [])] fkeys = [(a, rw, v, s) for (a, rw, v, s, _) in fb.get(fn, [])] if pkeys == fkeys: continue # SequenceMatcher alignment per-bucket import difflib sm = difflib.SequenceMatcher(a=pkeys, b=fkeys, autojunk=False) for tag, i1, i2, j1, j2 in sm.get_opcodes(): if tag == "equal": continue # Characterise this edit as "read delta" vs "write delta" p_rows = pb.get(fn, [])[i1:i2] f_rows = fb.get(fn, [])[j1:j2] for (_, _, _, _, row) in p_rows: if row["rw"] == "wr": wr_div_rows.append((fn, row, None)) else: rd_div_count += 1 for (_, _, _, _, row) in f_rows: if row["rw"] == "wr": wr_div_rows.append((fn, None, row)) else: rd_div_count += 1 return { "total_records_pass": len(p), "total_records_flip": len(f), "read_divergences": rd_div_count, "write_divergence_rows": wr_div_rows, } def main(): ap = argparse.ArgumentParser() ap.add_argument("blob", help="path to the DDR TPL blob to drive") ap.add_argument("--out-dir", default="/tmp/bitflip-sweep") args = ap.parse_args() os.makedirs(args.out_dir, exist_ok=True) # Baseline pass-mode run baseline = os.path.join(args.out_dir, "pass.csv") print(f"# baseline pass run -> {baseline}") ok = run_sim(args.blob, None, 0, baseline) if not ok: print("baseline run failed", file=sys.stderr); return 1 header = (f"{'address':<12} {'region':<11} {'name':<18} " f"{'rd_div':>6} writes_diverged_in") print() print(header) print("-" * len(header)) all_wr_details = [] for region, base, off, name in TRAINING_TARGETS: addr = base + off tag = f"0x{addr:08x}" flip_csv = os.path.join(args.out_dir, f"flip_{addr:08x}.csv") ok = run_sim(args.blob, addr, 0xFFFFFFFF, flip_csv) if not ok: print(f"{tag} {region} {name} -- sim failed") continue s = summarise(baseline, flip_csv, addr) wr_fns = sorted({row[0] for row in s["write_divergence_rows"]}) preview = ",".join(wr_fns[:4]) if len(wr_fns) > 4: preview += f" +{len(wr_fns)-4}" print(f"{tag:<12} {region:<11} {name:<18} " f"{s['read_divergences']:>6} {preview}") for fn, pr, fr in s["write_divergence_rows"]: all_wr_details.append((addr, name, fn, pr, fr)) if all_wr_details: print("\n## Write-divergence details (retry path changed register values)") for addr, name, fn, pr, fr in all_wr_details[:60]: pv = f"pass: addr=0x{pr['addr']:x} val=0x{pr['val']:x}" if pr else "pass: (missing)" fv = f"flip: addr=0x{fr['addr']:x} val=0x{fr['val']:x}" if fr else "flip: (missing)" print(f" [{name:<14}] {fn:<22} {pv} | {fv}") if len(all_wr_details) > 60: print(f" ... +{len(all_wr_details)-60} more") return 0 if __name__ == "__main__": sys.exit(main())