#!/usr/bin/env python3 """sim_tripwire.py — Bin-style MMIO tracer adapted for Unicorn-sim. Records every MMIO access (read + write) with enough context to diff two simulator runs at sequence level. Modeled on the Bin project's RAM-ring tripwire primitive, minus the DDR reservation — we're not on silicon, we're in Python, so we just append to a list. Record shape (per Janet, 2026-04-21): (seq_idx, insn_tick, pc, addr, size, rw, val, region_tag, fn_name) Usage from a simulator harness: import sim_tripwire cap = sim_tripwire.Capture(asm_path) # loads funs table # inside UC_HOOK_MEM_READ: cap.rd(pc, addr, size, val, insn_tick) # inside UC_HOOK_MEM_WRITE: cap.wr(pc, addr, size, val, insn_tick) cap.emit_csv("/tmp/vendor-trace.csv") The companion tool `tripwire_diff.py` reads two CSVs and does a PC-bucketed diff (group by fn_name, diff per bucket with difflib). """ import bisect import csv import os import re try: from mmio_regions import classify as _classify except ImportError: def _classify(addr): return "?" # Default location of the vendor disassembly that carries the funs # table. Defaults to ../ddr_conservative_asm.s relative to this file # (repo layout); override via env var or constructor arg. DEFAULT_ASM = os.environ.get( "RK_DDR_ASM", os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "ddr_conservative_asm.s")) BLOB_BASE = 0xFF001000 # where the TPL blob lives in SRAM def parse_fun_table(asm_path): """Parse `// ============ FUN_ @ ============` headers. Returns list of (abs_addr, fun_name) sorted by abs_addr so we can do O(log N) PC → nearest-below lookups. """ pat = re.compile(r'// ============ (FUN_[0-9a-fA-F]+) @ ([0-9a-fA-F]+) ============') out = [] with open(asm_path) as f: for ln in f: m = pat.match(ln) if not m: continue name = m.group(1) off = int(m.group(2), 16) out.append((BLOB_BASE + off, name)) out.sort() return out class PCResolver: """PC → nearest containing FUN_ name. Uses the vendor funs table (parse_fun_table) as ground truth for function entries. A PC resolves to the FUN_ whose entry is the largest ≤ PC. Accuracy depends on the asm covering all functions — missing entries produce attribution to the previous function. """ def __init__(self, asm_path=DEFAULT_ASM): self.table = parse_fun_table(asm_path) if os.path.exists(asm_path) else [] self._keys = [addr for addr, _ in self.table] self._names = [name for _, name in self.table] def resolve(self, pc): if not self.table: return "?" # Find rightmost entry with addr <= pc i = bisect.bisect_right(self._keys, pc) - 1 if i < 0: return "" return self._names[i] class Capture: """Append-only tripwire capture. Records are (seq_idx, insn_tick, pc, addr, size, rw, val, region, fn). Keep this lean — writing this list is in the hot Unicorn callback path. """ def __init__(self, asm_path=DEFAULT_ASM, resolve=True): self.records = [] self._pcr = PCResolver(asm_path) if resolve else None def _append(self, pc, addr, size, rw, val, insn_tick): seq = len(self.records) region = _classify(addr) fn = self._pcr.resolve(pc) if self._pcr else "?" self.records.append( (seq, insn_tick, pc, addr, size, rw, val, region, fn)) def rd(self, pc, addr, size, val, insn_tick): self._append(pc, addr, size, "rd", val, insn_tick) def wr(self, pc, addr, size, val, insn_tick): self._append(pc, addr, size, "wr", val, insn_tick) def emit_csv(self, path): with open(path, "w", newline="") as f: w = csv.writer(f) w.writerow(("seq", "tick", "pc", "addr", "size", "rw", "val", "region", "fn")) for seq, tick, pc, addr, size, rw, val, region, fn in self.records: w.writerow((seq, tick, f"0x{pc:x}", f"0x{addr:x}", size, rw, f"0x{val:x}", region, fn)) def summary(self): """Return (n_total, n_rd, n_wr, per_fn_counter, per_region_counter).""" from collections import Counter fn = Counter() region = Counter() n_rd = n_wr = 0 for _, _, _, _, _, rw, _, reg, fname in self.records: (n_rd if rw == "rd" else n_wr) # no-op; tracked below if rw == "rd": n_rd += 1 else: n_wr += 1 fn[fname] += 1 region[reg] += 1 return len(self.records), n_rd, n_wr, fn, region def load_csv(path): """Read a CSV emitted by emit_csv. Returns list of dict records.""" out = [] with open(path, newline="") as f: r = csv.DictReader(f) for row in r: row["seq"] = int(row["seq"]) row["tick"] = int(row["tick"]) row["pc"] = int(row["pc"], 16) row["addr"] = int(row["addr"], 16) row["size"] = int(row["size"]) row["val"] = int(row["val"], 16) out.append(row) return out if __name__ == "__main__": # Smoke: parse funs table and show first 5 entries t = parse_fun_table(DEFAULT_ASM) print(f"loaded {len(t)} fn entries from {DEFAULT_ASM}") for addr, name in t[:5]: print(f" 0x{addr:08x} {name}") pcr = PCResolver() # BLOB_BASE + offset of known functions for off in (0x4, 0x40, 0x3c48, 0xfcc4, 0xde40, 0xf170): pc = BLOB_BASE + off print(f" resolve 0x{pc:x} (BLOB+0x{off:x}) -> {pcr.resolve(pc)}")