46155bbe91
Ship the new simulation & verification stack under simulation/:
- mmio_regions.py — address → region classifier (DDRCTL, DDRPHY,
OTP, SRAM, …). Shared by every other tool so trace output is
scannable without memorising the memory map.
- sim_tripwire.py — Bin-style per-access capture. Records
(seq, insn_tick, pc, addr, size, rw, val, region, fn_name) per
MMIO access. PCResolver bisects the vendor funs table parsed
from ddr_conservative_asm.s.
- tripwire_diff.py — PC-bucketed difflib.SequenceMatcher diff of
two tripwire CSVs. Buckets by fn_name so bitflip-induced control
flow divergence doesn't cascade noise.
- training_sim.py — DDR training simulator with --mode pass and
--mode bitflip (flip first N reads per training status, exercise
retry paths). BITFLIP_ONLY env var narrows to a single addr for
the sweep.
- bitflip_sweep.py — Flip each of 23 training-status addresses
one-at-a-time and tabulate retry convergence. Surfaces which
function(s) react to a transient fault by writing different
downstream register values.
Plus:
- mmio_diff.py updated: region-tagged divergence output,
--show-regions histogram, --tripwire-out-{vendor,rebuilt} CSV
capture, --capture-stack-writes for stack-allocated buffer diffs.
- debug_probes/tp_slot_{probe,writes}.py — ad-hoc Unicorn probes
for chasing a single-slot divergence in an SRAM buffer. Kept as
reference examples of how to extend the tripwire toolchain.
The stack found 6 silicon-hostile bugs in the rebuilt blob that
mmio_diff's write-sequence gate was structurally blind to, including
three ld-unresolved-symbol NULL derefs (case-mismatched externs,
missing DATA_SYMS) and one C-early-return-skips-shared-tail bug
where vendor's asm fell through to the tail via `b` after a `ret`.
166 lines
5.6 KiB
Python
166 lines
5.6 KiB
Python
#!/usr/bin/env python3
|
|
"""sim_tripwire.py — Bin-style MMIO tracer adapted for Unicorn-sim.
|
|
|
|
Records every MMIO access (read + write) with enough context to diff
|
|
two simulator runs at sequence level. Modeled on the Bin project's
|
|
RAM-ring tripwire primitive, minus the DDR reservation — we're not on
|
|
silicon, we're in Python, so we just append to a list.
|
|
|
|
Record shape (per Janet, 2026-04-21):
|
|
(seq_idx, insn_tick, pc, addr, size, rw, val, region_tag, fn_name)
|
|
|
|
Usage from a simulator harness:
|
|
|
|
import sim_tripwire
|
|
cap = sim_tripwire.Capture(asm_path) # loads funs table
|
|
# inside UC_HOOK_MEM_READ:
|
|
cap.rd(pc, addr, size, val, insn_tick)
|
|
# inside UC_HOOK_MEM_WRITE:
|
|
cap.wr(pc, addr, size, val, insn_tick)
|
|
cap.emit_csv("/tmp/vendor-trace.csv")
|
|
|
|
The companion tool `tripwire_diff.py` reads two CSVs and does a
|
|
PC-bucketed diff (group by fn_name, diff per bucket with difflib).
|
|
"""
|
|
import bisect
|
|
import csv
|
|
import os
|
|
import re
|
|
|
|
try:
|
|
from mmio_regions import classify as _classify
|
|
except ImportError:
|
|
def _classify(addr): return "?"
|
|
|
|
|
|
# Default location of the vendor disassembly that carries the funs
|
|
# table. Defaults to ../ddr_conservative_asm.s relative to this file
|
|
# (repo layout); override via env var or constructor arg.
|
|
DEFAULT_ASM = os.environ.get(
|
|
"RK_DDR_ASM",
|
|
os.path.join(os.path.dirname(os.path.abspath(__file__)),
|
|
"..", "ddr_conservative_asm.s"))
|
|
BLOB_BASE = 0xFF001000 # where the TPL blob lives in SRAM
|
|
|
|
|
|
def parse_fun_table(asm_path):
|
|
"""Parse `// ============ FUN_<hex> @ <offset> ============` headers.
|
|
|
|
Returns list of (abs_addr, fun_name) sorted by abs_addr so we can
|
|
do O(log N) PC → nearest-below lookups.
|
|
"""
|
|
pat = re.compile(r'// ============ (FUN_[0-9a-fA-F]+) @ ([0-9a-fA-F]+) ============')
|
|
out = []
|
|
with open(asm_path) as f:
|
|
for ln in f:
|
|
m = pat.match(ln)
|
|
if not m:
|
|
continue
|
|
name = m.group(1)
|
|
off = int(m.group(2), 16)
|
|
out.append((BLOB_BASE + off, name))
|
|
out.sort()
|
|
return out
|
|
|
|
|
|
class PCResolver:
|
|
"""PC → nearest containing FUN_ name.
|
|
|
|
Uses the vendor funs table (parse_fun_table) as ground truth for
|
|
function entries. A PC resolves to the FUN_ whose entry is the
|
|
largest ≤ PC. Accuracy depends on the asm covering all functions
|
|
— missing entries produce attribution to the previous function.
|
|
"""
|
|
|
|
def __init__(self, asm_path=DEFAULT_ASM):
|
|
self.table = parse_fun_table(asm_path) if os.path.exists(asm_path) else []
|
|
self._keys = [addr for addr, _ in self.table]
|
|
self._names = [name for _, name in self.table]
|
|
|
|
def resolve(self, pc):
|
|
if not self.table:
|
|
return "?"
|
|
# Find rightmost entry with addr <= pc
|
|
i = bisect.bisect_right(self._keys, pc) - 1
|
|
if i < 0:
|
|
return "<pre-blob>"
|
|
return self._names[i]
|
|
|
|
|
|
class Capture:
|
|
"""Append-only tripwire capture.
|
|
|
|
Records are (seq_idx, insn_tick, pc, addr, size, rw, val, region, fn).
|
|
Keep this lean — writing this list is in the hot Unicorn callback
|
|
path.
|
|
"""
|
|
|
|
def __init__(self, asm_path=DEFAULT_ASM, resolve=True):
|
|
self.records = []
|
|
self._pcr = PCResolver(asm_path) if resolve else None
|
|
|
|
def _append(self, pc, addr, size, rw, val, insn_tick):
|
|
seq = len(self.records)
|
|
region = _classify(addr)
|
|
fn = self._pcr.resolve(pc) if self._pcr else "?"
|
|
self.records.append(
|
|
(seq, insn_tick, pc, addr, size, rw, val, region, fn))
|
|
|
|
def rd(self, pc, addr, size, val, insn_tick):
|
|
self._append(pc, addr, size, "rd", val, insn_tick)
|
|
|
|
def wr(self, pc, addr, size, val, insn_tick):
|
|
self._append(pc, addr, size, "wr", val, insn_tick)
|
|
|
|
def emit_csv(self, path):
|
|
with open(path, "w", newline="") as f:
|
|
w = csv.writer(f)
|
|
w.writerow(("seq", "tick", "pc", "addr", "size",
|
|
"rw", "val", "region", "fn"))
|
|
for seq, tick, pc, addr, size, rw, val, region, fn in self.records:
|
|
w.writerow((seq, tick, f"0x{pc:x}", f"0x{addr:x}",
|
|
size, rw, f"0x{val:x}", region, fn))
|
|
|
|
def summary(self):
|
|
"""Return (n_total, n_rd, n_wr, per_fn_counter, per_region_counter)."""
|
|
from collections import Counter
|
|
fn = Counter()
|
|
region = Counter()
|
|
n_rd = n_wr = 0
|
|
for _, _, _, _, _, rw, _, reg, fname in self.records:
|
|
(n_rd if rw == "rd" else n_wr) # no-op; tracked below
|
|
if rw == "rd": n_rd += 1
|
|
else: n_wr += 1
|
|
fn[fname] += 1
|
|
region[reg] += 1
|
|
return len(self.records), n_rd, n_wr, fn, region
|
|
|
|
|
|
def load_csv(path):
|
|
"""Read a CSV emitted by emit_csv. Returns list of dict records."""
|
|
out = []
|
|
with open(path, newline="") as f:
|
|
r = csv.DictReader(f)
|
|
for row in r:
|
|
row["seq"] = int(row["seq"])
|
|
row["tick"] = int(row["tick"])
|
|
row["pc"] = int(row["pc"], 16)
|
|
row["addr"] = int(row["addr"], 16)
|
|
row["size"] = int(row["size"])
|
|
row["val"] = int(row["val"], 16)
|
|
out.append(row)
|
|
return out
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Smoke: parse funs table and show first 5 entries
|
|
t = parse_fun_table(DEFAULT_ASM)
|
|
print(f"loaded {len(t)} fn entries from {DEFAULT_ASM}")
|
|
for addr, name in t[:5]:
|
|
print(f" 0x{addr:08x} {name}")
|
|
pcr = PCResolver()
|
|
# BLOB_BASE + offset of known functions
|
|
for off in (0x4, 0x40, 0x3c48, 0xfcc4, 0xde40, 0xf170):
|
|
pc = BLOB_BASE + off
|
|
print(f" resolve 0x{pc:x} (BLOB+0x{off:x}) -> {pcr.resolve(pc)}")
|