From 46155bbe9174be2e6bf9ace90e7b9c3d9c141e65 Mon Sep 17 00:00:00 2001 From: Markus Fritsche Date: Wed, 22 Apr 2026 05:55:28 +0200 Subject: [PATCH] simulation: tripwire + PC-bucketed diff + bitflip sweep MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ship the new simulation & verification stack under simulation/: - mmio_regions.py — address → region classifier (DDRCTL, DDRPHY, OTP, SRAM, …). Shared by every other tool so trace output is scannable without memorising the memory map. - sim_tripwire.py — Bin-style per-access capture. Records (seq, insn_tick, pc, addr, size, rw, val, region, fn_name) per MMIO access. PCResolver bisects the vendor funs table parsed from ddr_conservative_asm.s. - tripwire_diff.py — PC-bucketed difflib.SequenceMatcher diff of two tripwire CSVs. Buckets by fn_name so bitflip-induced control flow divergence doesn't cascade noise. - training_sim.py — DDR training simulator with --mode pass and --mode bitflip (flip first N reads per training status, exercise retry paths). BITFLIP_ONLY env var narrows to a single addr for the sweep. - bitflip_sweep.py — Flip each of 23 training-status addresses one-at-a-time and tabulate retry convergence. Surfaces which function(s) react to a transient fault by writing different downstream register values. Plus: - mmio_diff.py updated: region-tagged divergence output, --show-regions histogram, --tripwire-out-{vendor,rebuilt} CSV capture, --capture-stack-writes for stack-allocated buffer diffs. - debug_probes/tp_slot_{probe,writes}.py — ad-hoc Unicorn probes for chasing a single-slot divergence in an SRAM buffer. Kept as reference examples of how to extend the tripwire toolchain. The stack found 6 silicon-hostile bugs in the rebuilt blob that mmio_diff's write-sequence gate was structurally blind to, including three ld-unresolved-symbol NULL derefs (case-mismatched externs, missing DATA_SYMS) and one C-early-return-skips-shared-tail bug where vendor's asm fell through to the tail via `b` after a `ret`. --- README.md | 41 +++- debug_probes/tp_slot_probe.py | 152 ++++++++++++++ debug_probes/tp_slot_writes.py | 118 +++++++++++ mmio_diff.py | 279 ++++++++++++++++++++++++++ simulation/README.md | 197 +++++++++++++++++++ simulation/bitflip_sweep.py | 188 ++++++++++++++++++ simulation/mmio_regions.py | 121 ++++++++++++ simulation/sim_tripwire.py | 165 ++++++++++++++++ simulation/training_sim.py | 348 +++++++++++++++++++++++++++++++++ simulation/tripwire_diff.py | 189 ++++++++++++++++++ 10 files changed, 1796 insertions(+), 2 deletions(-) create mode 100644 debug_probes/tp_slot_probe.py create mode 100644 debug_probes/tp_slot_writes.py create mode 100644 mmio_diff.py create mode 100644 simulation/README.md create mode 100644 simulation/bitflip_sweep.py create mode 100644 simulation/mmio_regions.py create mode 100644 simulation/sim_tripwire.py create mode 100644 simulation/training_sim.py create mode 100644 simulation/tripwire_diff.py diff --git a/README.md b/README.md index c450759..f71470d 100644 --- a/README.md +++ b/README.md @@ -14,8 +14,27 @@ n## Prerequisites -Decompilation, analysis, and patching of the closed-source Rockchip RK3588 -DDR initialization binary blobs. +Decompilation, analysis, patching, and **pre-silicon simulation** of the +closed-source Rockchip RK3588 DDR initialization binary blobs. + +The project has three layers: +1. **Static RE** — Ghidra-exported decompiled C + disassembly + + annotated register map (`ddr_annotated.c`, `rk3588_ddr.h`). +2. **Patch + flash** — `patch_prod.py` rewrites specific poll loops + in the vendor blob to work around known hangs. Validated under + Unicorn via `blob_emu.py` before flash. +3. **Matching-decomp rebuild + simulation** — the goal is a buildable + working DDR blob (not bit-identical reproduction). Per-function C + ports are spliced into the vendor blob; `mmio_diff.py` gates + behavioral equivalence by MMIO write sequence. The `simulation/` + subdir adds read-side tripwire capture, PC-bucketed diff, and + per-address bitflip fault injection for retry-path validation. + +> "Markus' insistence on simulation before flashing paid off. Big +> time. Again." — 2026-04-21. Tripwire + PC-bucket diff caught three +> silent NULL-derefs hidden behind `mmio_diff=3173/3173` green. +> `ld --unresolved-symbols=ignore-all` had zeroed undefined +> DATA_SYMS externs; silicon would have bricked. ## Quick Start @@ -73,8 +92,26 @@ gcc -O2 -o ddr_emu ddr_emu2.c -lunicorn -lm | File | Description | |------|-------------| | `ddr_emu2.c` | Unicorn-based C emulator with MMIO stubs | +| `blob_emu.py` | Python Unicorn harness — runs a blob, stubs MMIO, captures UART TX | +| `mmio_diff.py` | Runs vendor + rebuilt, diffs MMIO write sequences; region-tagged output | | Ghidra project | On oppenheimer (CT131): `/opt/work/ghidra_project/` | +### Simulation & Verification Stack (`simulation/`) +| File | Description | +|------|-------------| +| `simulation/mmio_regions.py` | Address → region classifier (`DDRCTL`, `DDRPHY`, `OTP`, `SRAM`, …) | +| `simulation/sim_tripwire.py` | Bin-style per-access capture with PC → fn name resolver | +| `simulation/tripwire_diff.py` | PC-bucketed SequenceMatcher diff of two tripwire CSVs | +| `simulation/training_sim.py` | DDR-training simulator: `pass` and `bitflip-first-pass` modes | +| `simulation/bitflip_sweep.py` | Flip each training-status addr one-at-a-time, report retry convergence | +| `simulation/README.md` | Synopsis + usage for the above | + +### Debug Probes (`debug_probes/`) +| File | Description | +|------|-------------| +| `debug_probes/tp_slot_probe.py` | Snapshot the `tp` timing buffer at fn_5540's read site | +| `debug_probes/tp_slot_writes.py` | List every write to a specific `tp` slot, both vendor and rebuilt | + ### Ghidra Export Scripts | File | Description | |------|-------------| diff --git a/debug_probes/tp_slot_probe.py b/debug_probes/tp_slot_probe.py new file mode 100644 index 0000000..644afe3 --- /dev/null +++ b/debug_probes/tp_slot_probe.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python3 +"""tp_slot_probe.py — snapshot tp[0x4f] and tp[0x55] at the exact PC +where fn_5540 reads them, and dump tp[base..base+0x2ac] for diff. + +Runs on vendor.bin and rebuilt.bin side-by-side. The PCs for the two +ldrs differ between vendor and rebuilt codegen, so we probe multiple +candidate PCs by looking for `ldr w, [x19, #0x154]` and +`ldr w, [x19, #0x13c]` equivalents. +""" +import argparse +import os +import sys +from unicorn import * +from unicorn.arm64_const import * + +sys.path.insert(0, os.path.join( + os.path.dirname(os.path.abspath(__file__)), '..')) +from mmio_diff import (SRAM_BASE, BLOB_BASE, STACK_BASE, RET_STUB, + MMIO, XREG, stub_value, reset_stub_state) + +# Search window: PCs between first fn_5540 MMIO write (match 194 = 0x6978) +# and the diverging write (0x69c8/0x6a74). Capture x19 + read the two +# slots as soon as we see an instruction pattern `ldr w?, [x19, #0x154]` +# or `ldr w?, [x19, #0x13c]` (encoding 0xb94_154.. / 0xb94_13c..). +# Simpler: hook every PC in fn_5540 [0x5540..0x6040) and, on each, if +# the insn looks like such an ldr, snapshot x19 and memory contents. + + +def match_ldr_w_imm(ins, imm): + """Match `ldr w?, [x?, #imm]` — any base reg. Returns rn or None.""" + if (ins >> 22) != 0b1011100101: + return None + imm12 = (ins >> 10) & 0xFFF + if imm12 * 4 != imm: + return None + return (ins >> 5) & 0x1F + + +def run(blob_path, max_insn=500_000): + reset_stub_state() + blob = open(blob_path, 'rb').read() + uc = Uc(UC_ARCH_ARM64, UC_MODE_ARM) + uc.mem_map(SRAM_BASE, 0x100000, UC_PROT_ALL) + uc.mem_write(BLOB_BASE, blob) + uc.mem_map(STACK_BASE, 0x100000, UC_PROT_ALL) + uc.mem_map(RET_STUB, 0x1000, UC_PROT_ALL) + uc.mem_write(RET_STUB, b'\x00\x00\x20\xd4') + for b, s in MMIO: + uc.mem_map(b, s, UC_PROT_ALL) + + state = {'count': 0, 'prev_pc': 0, 'same_pc': 0, + 'snap': None, 'tp_dump': None} + + def hook_code(uc, addr, size, ud): + state['count'] += 1 + if addr == state.get('prev_pc'): + state['same_pc'] += 1 + if state['same_pc'] > 10000: uc.emu_stop() + else: + state['same_pc'] = 0; state['prev_pc'] = addr + if state['count'] >= max_insn: uc.emu_stop() + # Whole-blob search — the rebuilt's ldr site may be anywhere in + # fn_5540 post-recompile. + if not (0xff001000 <= addr < 0xff020000): + return + try: + ins = int.from_bytes(uc.mem_read(addr, 4), 'little') + except UcError: + return + # Look for ldr w, [x?, #0x13c] — this is the tp[0x4f] load + rn = match_ldr_w_imm(ins, 0x13c) + if rn is not None and state['snap'] is None: + base = uc.reg_read(XREG[rn]) + try: + tp4f = int.from_bytes(uc.mem_read(base + 0x13c, 4), 'little') + tp55 = int.from_bytes(uc.mem_read(base + 0x154, 4), 'little') + dump = uc.mem_read(base, 0x2ac) + except UcError: + return + state['snap'] = (addr, base, rn, tp4f, tp55) + state['tp_dump'] = bytes(dump) + + def hook_mmio_read(uc, typ, addr, size, val, ud): + v = stub_value(addr) & ((1 << size*8) - 1) + uc.mem_write(addr, v.to_bytes(size, 'little')) + + def hook_mmio_write(uc, typ, addr, size, val, ud): + pass + + def hook_unmapped(uc, typ, addr, size, val, ud): + page = addr & ~0xFFFF + try: uc.mem_map(page, 0x10000, UC_PROT_ALL) + except UcError: pass + if typ == UC_MEM_READ_UNMAPPED: + v = stub_value(addr) & ((1 << size*8) - 1) + uc.mem_write(addr, v.to_bytes(size, 'little')) + return True + + uc.hook_add(UC_HOOK_CODE, hook_code) + for b, s in MMIO: + uc.hook_add(UC_HOOK_MEM_READ, hook_mmio_read, begin=b, end=b + s) + uc.hook_add(UC_HOOK_MEM_WRITE, hook_mmio_write, begin=b, end=b + s) + uc.hook_add(UC_HOOK_MEM_UNMAPPED, hook_unmapped) + + uc.reg_write(UC_ARM64_REG_SP, STACK_BASE + 0xF0000) + uc.reg_write(UC_ARM64_REG_X30, BLOB_BASE + 0x40) + + pc = BLOB_BASE + remaining = max_insn + while remaining > 0: + try: + uc.emu_start(pc, RET_STUB, count=remaining); break + except UcError as e: + pc = uc.reg_read(UC_ARM64_REG_PC) + try: insn = int.from_bytes(uc.mem_read(pc, 4), 'little') + except UcError: break + if (insn >> 20) == 0xD53: + rt = insn & 0x1F + if rt < 31: uc.reg_write(XREG[rt], 0) + pc += 4; uc.reg_write(UC_ARM64_REG_PC, pc); remaining -= 1; continue + if (insn >> 20) in (0xD51, 0xD50): + pc += 4; uc.reg_write(UC_ARM64_REG_PC, pc); remaining -= 1; continue + break + + return state + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument('blob') + args = ap.parse_args() + state = run(args.blob) + if state['snap'] is None: + print(f'NO snapshot captured — did not find ldr w, [x19, #0x13c] in fn_5540 range') + return + pc, tp_base, rn, tp4f, tp55 = state['snap'] + print(f'fn_5540 read site: pc=0x{pc:x} (via x{rn})') + print(f' tp_base=0x{tp_base:x}') + print(f' tp[0x4f] (+0x13c) = 0x{tp4f:08x}') + print(f' tp[0x55] (+0x154) = 0x{tp55:08x}') + print(f' computed write val = 0x{(tp55 | (tp4f << 16)) & 0xFFFFFFFF:08x}') + import sys as _sys + print(' tp dump (32-byte-line, u32):') + dump = state['tp_dump'] + for off in range(0, min(0x2ac, len(dump)), 32): + import struct + words = struct.unpack_from('<8I', dump, off) + print(f' +0x{off:03x}: ' + ' '.join(f'{w:08x}' for w in words)) + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/debug_probes/tp_slot_writes.py b/debug_probes/tp_slot_writes.py new file mode 100644 index 0000000..e5e0427 --- /dev/null +++ b/debug_probes/tp_slot_writes.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +"""tp_slot_writes.py — list every write to the tp buffer's +0x13c slot +(tp[0x4f]) during the run, on both vendor and rebuilt. Since tp lives +in SRAM at 0xff0164f8 (discovered via tp_slot_probe), we just hook +SRAM writes to that exact address. +""" +import argparse +import os +import sys +from unicorn import * +from unicorn.arm64_const import * + +sys.path.insert(0, os.path.join( + os.path.dirname(os.path.abspath(__file__)), '..')) +from mmio_diff import (SRAM_BASE, BLOB_BASE, STACK_BASE, RET_STUB, + MMIO, XREG, stub_value, reset_stub_state) + +TP_BASE = 0xff0164f8 +TP_SLOT_4F = TP_BASE + 0x13c +TP_SLOT_55 = TP_BASE + 0x154 + + +def run(blob_path, max_insn=500_000): + reset_stub_state() + blob = open(blob_path, 'rb').read() + uc = Uc(UC_ARCH_ARM64, UC_MODE_ARM) + uc.mem_map(SRAM_BASE, 0x100000, UC_PROT_ALL) + uc.mem_write(BLOB_BASE, blob) + uc.mem_map(STACK_BASE, 0x100000, UC_PROT_ALL) + uc.mem_map(RET_STUB, 0x1000, UC_PROT_ALL) + uc.mem_write(RET_STUB, b'\x00\x00\x20\xd4') + for b, s in MMIO: + uc.mem_map(b, s, UC_PROT_ALL) + + state = {'count': 0, 'prev_pc': 0, 'same_pc': 0, 'writes_4f': [], + 'writes_55': []} + + def hook_code(uc, addr, size, ud): + state['count'] += 1 + if addr == state.get('prev_pc'): + state['same_pc'] += 1 + if state['same_pc'] > 10000: uc.emu_stop() + else: + state['same_pc'] = 0; state['prev_pc'] = addr + if state['count'] >= max_insn: uc.emu_stop() + + def hook_mmio_read(uc, typ, addr, size, val, ud): + v = stub_value(addr) & ((1 << size*8) - 1) + uc.mem_write(addr, v.to_bytes(size, 'little')) + + def hook_mmio_write(uc, typ, addr, size, val, ud): + pass + + def hook_sram_write(uc, typ, addr, size, val, ud): + if addr == TP_SLOT_4F or (addr <= TP_SLOT_4F < addr + size): + pc = uc.reg_read(UC_ARM64_REG_PC) + state['writes_4f'].append((state['count'], pc, addr, size, val)) + if addr == TP_SLOT_55 or (addr <= TP_SLOT_55 < addr + size): + pc = uc.reg_read(UC_ARM64_REG_PC) + state['writes_55'].append((state['count'], pc, addr, size, val)) + + def hook_unmapped(uc, typ, addr, size, val, ud): + page = addr & ~0xFFFF + try: uc.mem_map(page, 0x10000, UC_PROT_ALL) + except UcError: pass + if typ == UC_MEM_READ_UNMAPPED: + v = stub_value(addr) & ((1 << size*8) - 1) + uc.mem_write(addr, v.to_bytes(size, 'little')) + return True + + uc.hook_add(UC_HOOK_CODE, hook_code) + for b, s in MMIO: + uc.hook_add(UC_HOOK_MEM_READ, hook_mmio_read, begin=b, end=b + s) + uc.hook_add(UC_HOOK_MEM_WRITE, hook_mmio_write, begin=b, end=b + s) + # Hook SRAM writes in the whole blob+data region where tp lives. + uc.hook_add(UC_HOOK_MEM_WRITE, hook_sram_write, + begin=SRAM_BASE, end=SRAM_BASE + 0x100000) + uc.hook_add(UC_HOOK_MEM_UNMAPPED, hook_unmapped) + + uc.reg_write(UC_ARM64_REG_SP, STACK_BASE + 0xF0000) + uc.reg_write(UC_ARM64_REG_X30, BLOB_BASE + 0x40) + + pc = BLOB_BASE + remaining = max_insn + while remaining > 0: + try: + uc.emu_start(pc, RET_STUB, count=remaining); break + except UcError as e: + pc = uc.reg_read(UC_ARM64_REG_PC) + try: insn = int.from_bytes(uc.mem_read(pc, 4), 'little') + except UcError: break + if (insn >> 20) == 0xD53: + rt = insn & 0x1F + if rt < 31: uc.reg_write(XREG[rt], 0) + pc += 4; uc.reg_write(UC_ARM64_REG_PC, pc); remaining -= 1; continue + if (insn >> 20) in (0xD51, 0xD50): + pc += 4; uc.reg_write(UC_ARM64_REG_PC, pc); remaining -= 1; continue + break + return state + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument('blob') + args = ap.parse_args() + state = run(args.blob) + print(f'=== writes to tp[0x4f] (@{TP_SLOT_4F:#x}) — {len(state["writes_4f"])} total ===') + for tick, pc, addr, size, val in state['writes_4f']: + off = pc - 0xff001000 + print(f' tick={tick:7d} pc=0x{pc:x} (blob+0x{off:05x}) addr=0x{addr:x} sz={size} val=0x{val:x}') + print(f'=== writes to tp[0x55] (@{TP_SLOT_55:#x}) — {len(state["writes_55"])} total ===') + for tick, pc, addr, size, val in state['writes_55']: + off = pc - 0xff001000 + print(f' tick={tick:7d} pc=0x{pc:x} (blob+0x{off:05x}) addr=0x{addr:x} sz={size} val=0x{val:x}') + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/mmio_diff.py b/mmio_diff.py new file mode 100644 index 0000000..fe130a3 --- /dev/null +++ b/mmio_diff.py @@ -0,0 +1,279 @@ +#!/usr/bin/env python3 +"""mmio_diff.py — log MMIO writes from vendor + rebuilt, diff sequences. + +MMIO writes are the externally observable behavior. If the rebuilt writes +the same address/value/order as vendor, it is behaviorally equivalent — +register-level differences from compiler reg-alloc are irrelevant. + +First divergent write identifies the function that generates wrong output. + +Usage: + mmio_diff.py [--max N] +""" +import argparse, sys, os +from unicorn import * +from unicorn.arm64_const import * + +# Region classifier — stamps each address with a human-readable tag +# (DDRCTL:SW, DDRPHY:TR, SRAM, UART, ...). Keeps diff output scannable +# by readers who don't have the address map memorised. +try: + from mmio_regions import classify as _classify_region +except ImportError: + def _classify_region(addr): return "?" + +# Tripwire module — full PC-resolved access capture for CSV emit. +try: + from sim_tripwire import Capture as _TripwireCapture +except ImportError: + _TripwireCapture = None + +SRAM_BASE = 0xFF000000 +BLOB_BASE = 0xFF001000 +STACK_BASE = 0x00400000 +RET_STUB = 0x00800000 + +MMIO = [ + (0xFD580000, 0x00020000), (0xFD5F0000, 0x00010000), + (0xFD7C0000, 0x00040000), (0xFD800000, 0x00010000), + (0xFD8C0000, 0x00010000), + (0xFE010000, 0x00020000), (0xFE030000, 0x00010000), + (0xFE050000, 0x00010000), (0xFE0C0000, 0x00040000), + (0xFE400000, 0x00010000), (0xFECC0000, 0x00010000), + (0xFEB50000, 0x00010000), (0xFF100000, 0x00010000), + # DDR per-channel bases: ch0-ch3. ddrctl = ch+0x10000; MRCTRL0 at + # ch+0x10080 (bit 31 = mr_wr trigger, hw auto-clears on completion); + # MRSTAT at ch+0x10090 (bit 0 = busy). Stubs return 0 so polls exit + # immediately. Vendor prod.bin NOPs the polls; rebuilt keeps them. + (0xF7000000, 0x00040000), (0xF8000000, 0x00040000), + (0xF9000000, 0x00040000), (0xFA000000, 0x00040000), +] +ABS_STUB = {0xFE0500E0:0, 0xFE050054:1, 0xFE0500E4:0, 0xFEB50014:0x60, 0xFEB5007C:2} +REGION_OFF = [ + (0xFE0C0000, 0xFE100000, 0xFFF, 0xA24, 0x00000002), + (0xFE0C0000, 0xFE100000, 0xFFF, 0x684, 0x00000000), + (0xFE0C0000, 0xFE100000, 0xFFF, 0x090, 0x00000000), + (0xFE0C0000, 0xFE100000, 0xFFF, 0x080, 0x00000000), + (0xFE0C0000, 0xFE100000, 0xFFF, 0x514, 0x00000000), + # DDRPHY +0x3cc bit 0 = training-step done (fn_8b40 post-fn_27e0 poll). + (0xFE0C0000, 0xFE100000, 0xFFF, 0x3CC, 0x00000001), + # DDRPHY +0x0b4 bit 18 = phy-training done (fn_8b40 final-pass poll). + (0xFE0C0000, 0xFE100000, 0xFFF, 0x0B4, 0x00040000), + # DDR per-channel DFISTAT (ch+0x10c84): bit 0 = dfi_init_complete. + # fn_27e0 commits DDRCTL then polls tbz bit 0 → must return 1 to exit. + (0xF7000000, 0xFB000000, 0xFFFFFF, 0x10C84, 0x00000001), + # DDR per-channel MRSTAT (ch+0x10090): bit 0 = mr_wr_busy (want 0 to + # exit busy-poll), bit 16 = mr_rd_done (want 1 to exit mr_read done-poll). + (0xF7000000, 0xFB000000, 0xFFFFFF, 0x10090, 0x00010000), + # DDRCTL STAT (ch+0x10014): bits [2:0] operating_mode. fn_8b40 polls + # `(STAT & 7) == 1` after init — "normal" state. + (0xF7000000, 0xFB000000, 0xFFFFFF, 0x10014, 0x00000001), +] +REGION_CONST = [(0xFD8C0000, 0xFD8D0000, 0x00000001)] + +XREG = [getattr(__import__("unicorn.arm64_const", fromlist=["X"]), + f"UC_ARM64_REG_X{i}") for i in range(31)] + + +_SWSTAT_TOGGLE_COUNT = {} + + +def reset_stub_state(): + _SWSTAT_TOGGLE_COUNT.clear() + + +def stub_value(addr): + if addr in ABS_STUB: return ABS_STUB[addr] + for rbase, rend, mask, off_val, rv in REGION_OFF: + if rbase <= addr < rend and (addr & mask) == off_val: return rv + for rbase, rend, rv in REGION_CONST: + if rbase <= addr < rend: return rv + # SWSTAT-like toggle: per-channel ch+0x10514 alternates 0/1 per read. + # fn_29f4 has two back-to-back polls at this reg with OPPOSITE polarity + # (first waits CLEAR, second waits SET). Real HW reflects SWCTL writes; + # the toggle gives each poll one "correct" iteration to exit. + if 0xF7000000 <= addr < 0xFB000000 and (addr & 0xFFFFFF) == 0x10514: + n = _SWSTAT_TOGGLE_COUNT.get(addr, 0) + _SWSTAT_TOGGLE_COUNT[addr] = n + 1 + return 1 if (n & 1) else 0 + return 0 + + +def run_and_log_writes(blob_path, max_insn, tripwire=None, + capture_stack_writes=False): + """Run blob under Unicorn, return list of (write_idx, pc, addr, size, val). + + If `tripwire` is a sim_tripwire.Capture, every MMIO read and write + is also appended to it for CSV emit. If `capture_stack_writes` is + True, writes to the emulator-scratch stack region (0x00400000.. + 0x00500000) are also appended — useful for bisecting divergences + in stack-allocated buffers like fn_de40's param_2[]. + """ + reset_stub_state() + blob = open(blob_path, "rb").read() + uc = Uc(UC_ARCH_ARM64, UC_MODE_ARM) + uc.mem_map(SRAM_BASE, 0x100000, UC_PROT_ALL) + uc.mem_write(BLOB_BASE, blob) + uc.mem_map(STACK_BASE, 0x100000, UC_PROT_ALL) + uc.mem_map(RET_STUB, 0x1000, UC_PROT_ALL) + uc.mem_write(RET_STUB, b"\x00\x00\x20\xd4") + for b, s in MMIO: uc.mem_map(b, s, UC_PROT_ALL) + + writes = [] + state = {"count": 0, "last_pc": 0, "same_pc": 0} + + def hook_code(uc, addr, size, ud): + state["count"] += 1 + state["last_pc"] = addr + if addr == state.get("prev_pc"): + state["same_pc"] += 1 + if state["same_pc"] > 10000: uc.emu_stop() + else: + state["same_pc"] = 0; state["prev_pc"] = addr + if state["count"] >= max_insn: uc.emu_stop() + + def hook_mmio_read(uc, typ, addr, size, val, ud): + v = stub_value(addr) & ((1 << size*8) - 1) + uc.mem_write(addr, v.to_bytes(size, "little")) + if tripwire is not None: + pc = uc.reg_read(UC_ARM64_REG_PC) + tripwire.rd(pc, addr, size, v, state["count"]) + + def hook_mmio_write(uc, typ, addr, size, val, ud): + pc = uc.reg_read(UC_ARM64_REG_PC) + writes.append((len(writes), pc, addr, size, val)) + if tripwire is not None: + tripwire.wr(pc, addr, size, val, state["count"]) + + def hook_unmapped(uc, typ, addr, size, val, ud): + page = addr & ~0xFFFF + try: uc.mem_map(page, 0x10000, UC_PROT_ALL) + except UcError: pass + if typ == UC_MEM_READ_UNMAPPED: + v = stub_value(addr) & ((1 << size*8) - 1) + uc.mem_write(addr, v.to_bytes(size, "little")) + if tripwire is not None: + pc = uc.reg_read(UC_ARM64_REG_PC) + tripwire.rd(pc, addr, size, v, state["count"]) + elif typ == UC_MEM_WRITE_UNMAPPED: + pc = uc.reg_read(UC_ARM64_REG_PC) + writes.append((len(writes), pc, addr, size, val)) + if tripwire is not None: + tripwire.wr(pc, addr, size, val, state["count"]) + return True + + uc.hook_add(UC_HOOK_CODE, hook_code) + for b, s in MMIO: + uc.hook_add(UC_HOOK_MEM_READ, hook_mmio_read, begin=b, end=b + s) + uc.hook_add(UC_HOOK_MEM_WRITE, hook_mmio_write, begin=b, end=b + s) + uc.hook_add(UC_HOOK_MEM_UNMAPPED, hook_unmapped) + + if capture_stack_writes and tripwire is not None: + def hook_stack_write(uc, typ, addr, size, val, ud): + pc = uc.reg_read(UC_ARM64_REG_PC) + tripwire.wr(pc, addr, size, val, state["count"]) + uc.hook_add(UC_HOOK_MEM_WRITE, hook_stack_write, + begin=STACK_BASE, end=STACK_BASE + 0x100000) + + uc.reg_write(UC_ARM64_REG_SP, STACK_BASE + 0xF0000) + uc.reg_write(UC_ARM64_REG_X30, BLOB_BASE + 0x40) + + pc = BLOB_BASE; remaining = max_insn + while remaining > 0: + try: + uc.emu_start(pc, RET_STUB, count=remaining); break + except UcError as e: + pc = uc.reg_read(UC_ARM64_REG_PC) + try: insn = int.from_bytes(uc.mem_read(pc, 4), "little") + except UcError: break + if (insn >> 20) == 0xD53: + rt = insn & 0x1F + if rt < 31: uc.reg_write(XREG[rt], 0) + pc += 4; uc.reg_write(UC_ARM64_REG_PC, pc); remaining -= 1; continue + if (insn >> 20) in (0xD51, 0xD50): + pc += 4; uc.reg_write(UC_ARM64_REG_PC, pc); remaining -= 1; continue + break + return writes + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("vendor"); ap.add_argument("rebuilt") + ap.add_argument("--max", type=int, default=500000) + ap.add_argument("--ignore-pc", action="store_true", + help="ignore PC when comparing (only addr+val)") + ap.add_argument("--show-regions", action="store_true", + help="print region histogram of vendor writes on success") + ap.add_argument("--tripwire-out-vendor", default=None, metavar="CSV", + help="write PC-resolved access trace of the vendor run to CSV") + ap.add_argument("--tripwire-out-rebuilt", default=None, metavar="CSV", + help="write PC-resolved access trace of the rebuilt run to CSV") + ap.add_argument("--capture-stack-writes", action="store_true", + help="also capture writes to the emulator stack " + "(0x00400000..0x00500000) in tripwire CSVs") + args = ap.parse_args() + + print(f"# MMIO-write diff {args.vendor} vs {args.rebuilt}") + tw_v = _TripwireCapture() if (args.tripwire_out_vendor and _TripwireCapture) else None + tw_r = _TripwireCapture() if (args.tripwire_out_rebuilt and _TripwireCapture) else None + vw = run_and_log_writes(args.vendor, args.max, tripwire=tw_v, + capture_stack_writes=args.capture_stack_writes) + rw = run_and_log_writes(args.rebuilt, args.max, tripwire=tw_r, + capture_stack_writes=args.capture_stack_writes) + if tw_v is not None: + tw_v.emit_csv(args.tripwire_out_vendor) + print(f"# tripwire(vendor): {len(tw_v.records)} records -> " + f"{args.tripwire_out_vendor}") + if tw_r is not None: + tw_r.emit_csv(args.tripwire_out_rebuilt) + print(f"# tripwire(rebuilt): {len(tw_r.records)} records -> " + f"{args.tripwire_out_rebuilt}") + print(f"vendor writes: {len(vw)} rebuilt writes: {len(rw)}") + n = min(len(vw), len(rw)) + for i in range(n): + _, vp, va, vs, vv = vw[i] + _, rp, ra, rs, rv = rw[i] + key_v = (va, vs, vv) if args.ignore_pc else (vp, va, vs, vv) + key_r = (ra, rs, rv) if args.ignore_pc else (rp, ra, rs, rv) + if key_v != key_r: + print(f"[write {i}] DIVERGE") + print(f" vendor: pc=0x{vp:x} [{_classify_region(va):10s}] " + f"addr=0x{va:x} sz={vs} val=0x{vv:x}") + print(f" rebuilt: pc=0x{rp:x} [{_classify_region(ra):10s}] " + f"addr=0x{ra:x} sz={rs} val=0x{rv:x}") + # show context: last 3 matching writes + for j in range(max(0, i-3), i): + _, p, a, s, v = vw[j] + print(f" match [{j}]: pc=0x{p:x} [{_classify_region(a):10s}] " + f"addr=0x{a:x} val=0x{v:x}") + return 1 + if len(vw) != len(rw): + print(f"[diverge @ end] length mismatch: vendor={len(vw)} rebuilt={len(rw)}") + # Region histogram of the longer side's tail — tells you which + # subsystem our rebuild hasn't reached yet, or which one it + # reached that vendor doesn't. + longer = rw if len(rw) > len(vw) else vw + side = "rebuilt" if len(rw) > len(vw) else "vendor" + hist = {} + for _, _, a, _, _ in longer[n:]: + r = _classify_region(a) + hist[r] = hist.get(r, 0) + 1 + if hist: + print(f" extra-{side} region histogram:") + for r, c in sorted(hist.items(), key=lambda x: -x[1]): + print(f" {r:12s} {c}") + return 1 + print(f"[OK] all {n} MMIO writes match") + if args.show_regions: + hist = {} + for _, _, a, _, _ in vw: + r = _classify_region(a) + hist[r] = hist.get(r, 0) + 1 + print("# region histogram (vendor write counts):") + for r, c in sorted(hist.items(), key=lambda x: -x[1]): + print(f" {r:12s} {c}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/simulation/README.md b/simulation/README.md new file mode 100644 index 0000000..0cbfa12 --- /dev/null +++ b/simulation/README.md @@ -0,0 +1,197 @@ +# RK3588 DDR TPL — Simulation & Verification Stack + +A set of Unicorn-based tools for pre-silicon simulation, behavioral +diffing, and fault-injection of Rockchip RK3588 DDR TPL blobs (vendor +or rebuilt). + +Built to hunt silicon-corruption bugs that `mmio_diff.py`'s +write-sequence comparison cannot see — NULL derefs, read-side +divergences, retry-path diffs. + +## Synopsis + +| Tool | One-line | +|---|---| +| `mmio_regions.py` | Address → region classifier (`DDRCTL`, `DDRPHY`, `OTP`, `SRAM`, …) | +| `sim_tripwire.py` | Bin-style per-access capture (PC, tick, addr, region, resolved fn name) | +| `tripwire_diff.py` | PC-bucketed `SequenceMatcher` diff of two tripwire CSVs | +| `training_sim.py` | DDR-training simulator with `pass` and `bitflip-first-pass` modes | +| `bitflip_sweep.py` | Flip each training-status address one at a time, report retry convergence | + +The simulator **DOES NOT** need silicon. It runs vendor or rebuilt TPL +blobs under Unicorn with an MMIO stub that returns "pass" values for +all training-status polls, captures every access, and lets you diff +runs behaviorally. + +## Quick start + +Assuming your TPL blob is at `../rk3588_ddr_v1.19_prod.bin` (a copy of +the vendor blob shipped at SPI offset `0x8000` on boards with RKBIN +v1.19) and the rebuilt blob at `/tmp/rebuilt.bin`: + +```bash +# Run once in "pass" mode and capture tripwire to CSV +python3 training_sim.py ../rk3588_ddr_v1.19_prod.bin \ + --mode pass --tripwire-out /tmp/tw-pass.csv + +# Run again with the first read of every training status flipped +python3 training_sim.py ../rk3588_ddr_v1.19_prod.bin \ + --mode bitflip --flip-count 1 --flip-mask 0xFFFFFFFF \ + --tripwire-out /tmp/tw-flip.csv + +# Diff the two runs by function bucket +python3 tripwire_diff.py /tmp/tw-pass.csv /tmp/tw-flip.csv + +# Sweep every training-status address one-at-a-time and tabulate +# whether the retry loop reconverges cleanly +python3 bitflip_sweep.py ../rk3588_ddr_v1.19_prod.bin +``` + +For vendor-vs-rebuilt verification (needs `../mmio_diff.py` in the +parent dir): + +```bash +python3 ../mmio_diff.py --ignore-pc \ + ../rk3588_ddr_v1.19_prod.bin /tmp/rebuilt.bin \ + --tripwire-out-vendor /tmp/tw-v.csv \ + --tripwire-out-rebuilt /tmp/tw-r.csv \ + --show-regions + +python3 tripwire_diff.py /tmp/tw-v.csv /tmp/tw-r.csv +``` + +## Architecture + +### `mmio_regions.py` — address classifier + +Pure lookup table. `classify(addr)` returns a short tag for each +RK3588 peripheral window. Used by every other tool so trace output is +scannable without memorising the memory map. + +Region tags: `DDRCTL`, `DDRCTL:SW` (STAT/PWRCTL/SWCTL/SWSTAT), +`DDRCTL:MR` (mode-register ops), `DDRPHY`, `DDRPHY:TR` (training +status offsets `0x080/090/0B4/3CC/514/684/A24`), `DDR_CRU`, `DDR_MEM`, +`SRAM`, `PMU_SRAM`, `GRF`, `BUS_GRF`, `SGRF`, `CRU`, `SCRU`, `PMU`, +`FW_DDR`, `OTP`, `UART`, `STACK`, `OTHER`. + +### `sim_tripwire.py` — per-access capture + +`Capture` class with `rd(pc, addr, size, val, tick)` and `wr(...)` +that record one row per access: + + (seq_idx, insn_tick, pc, addr, size, rw, val, region, fn_name) + +`fn_name` comes from `PCResolver`, which bisects the vendor funs +table parsed from `../ddr_conservative_asm.s` (115 `FUN_xxxx @ offset` +headers; Ghidra export). Set `RK_DDR_ASM` env var to override the +default asm path. + +`emit_csv(path)` writes out; `load_csv(path)` re-hydrates. Both +`training_sim.py` and `mmio_diff.py` (in parent dir) accept a +tripwire capture object and record into it. + +### `tripwire_diff.py` — PC-bucketed diff + +For each unique `fn_name` in either capture, collect records, key +them by `(region, addr, rw, val, size)`, diff via `difflib. +SequenceMatcher`. `quick_ratio()` short-circuits buckets that share +almost nothing. + +Outputs three tiers: +- **OK**: byte-identical key sequences (suppressed unless + `--show-identical`). +- **minor-diff**: ratio ≥ `--suspect-threshold` (default 0.9). +- **SUSPECT**: ratio below threshold, printed first with the raw + edit script. + +Why PC-bucket and not index-by-index? Under bitflip mode the control +flow diverges at the flip point, which destroys index alignment. +Grouping by function localises divergences so one buggy bucket +doesn't cascade noise into unrelated ones. + +### `training_sim.py` — DDR training simulator + +Two modes: + +- `--mode pass` — every training-status read returns its "done/OK/ + trained" stub value every time. Equivalent to `mmio_diff`'s base + harness. +- `--mode bitflip --flip-count N --flip-mask MASK` — the first `N` + reads of each training-status address return `stub_value ^ mask` + (default mask `0xFFFFFFFF` → "not done"). Subsequent reads revert. + Exercises the retry / error-recovery paths. + +Training-status addresses are defined inside `is_training_status()`; +override single-address via the `BITFLIP_ONLY=0xADDR` env var +(used by `bitflip_sweep.py`). + +Region-tagged access histogram + UART TX dump on every run. + +### `bitflip_sweep.py` — per-address retry convergence + +Flips each training-status register one-at-a-time and summarises: + +- how many records diverged from the pass-mode baseline +- whether any MMIO write value changed (= retry path took a + different branch) +- which function(s) wrote the divergent values + +Output is a single table row per address. A clean "write_divergence" +column means retry paths converge deterministically. A non-zero +count names the function whose retry wrote a different register +value — which is often vendor-intended retry behavior, sometimes +a port bug. + +Currently sweeps 23 addresses (7 DDRPHY training + 4 DDRCTL status +× 4 channels). + +## Record shape + diff bucketing (for tool authors) + +Per-access record fields: + + seq monotonic index within the capture + tick Unicorn instruction count at the access + pc access-site PC (absolute) + addr MMIO/stack/SRAM address + size 1/2/4/8 + rw 'rd' or 'wr' + val value read or written (hex) + region mmio_regions.classify(addr) tag + fn PCResolver result: FUN_xxxxxxxx from the funs table + +Diff key inside each fn bucket: `(region, addr, rw, val, size)`. +Explicitly excludes `pc` (codegen reg-alloc shifts individual load/ +store PCs within a function without changing behavior), `seq`, and +`tick` (these drift with any upstream path difference). + +## Known limitations + +- The Unicorn simulator exits early on sustained same-PC loops + (>10 000 iterations) to avoid deadlocks. Real silicon polling that + would eventually succeed is modelled via the stub returning the + success value; if your use case needs a different success-delay + profile, edit `stub_value` / `is_training_status`. +- `sim_tripwire.PCResolver` attributes every PC to the *largest + FUN_-entry address ≤ PC*. Unported code paths still resolve to a + reasonable fn_name. Ports not in the `// ============ FUN_xxxx @` + convention won't match. +- `mmio_diff.py`'s `--capture-stack-writes` flag catches writes to + Unicorn's scratch stack `0x00400000..0x00500000` — but the vendor + firmware sometimes uses SRAM-resident scratch buffers (e.g. the + `tp` timing buffer at `0xff0164f8`) instead of the call-stack. For + those, add a dedicated hook in the probe (see `../debug_probes/ + tp_slot_writes.py` for an example). + +## Dependencies + +- Python 3.8+ +- `unicorn-engine` (AArch64 CPU emulator) +- `difflib` (stdlib) + +```bash +pip install unicorn +``` + +## License + +GPL-2.0-or-later, matching the port candidates' SPDX headers. diff --git a/simulation/bitflip_sweep.py b/simulation/bitflip_sweep.py new file mode 100644 index 0000000..4f997f0 --- /dev/null +++ b/simulation/bitflip_sweep.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python3 +"""bitflip_sweep.py — flip each training-status address one-at-a-time +and summarise how the rebuild's retry logic responds. + +For every training-status address (DDRPHY training + DDRCTL per-ch +status), run training_sim twice: + (a) --mode pass baseline + (b) --mode bitflip with `is_training_status` restricted to just that + one address + +Compare the two tripwire CSVs per run. Report: + - how many records diverged + - whether mmio writes still converge to the same final sequence + - one-line summary: "retry fired? final state same? # write-value + divergences?" + +Output a table row per address so you can scan for any address whose +retry loop doesn't converge. +""" +import argparse +import csv +import os +import subprocess +import sys +import tempfile + +BENCH = os.path.dirname(os.path.abspath(__file__)) + +TRAINING_TARGETS = [ + ("DDRPHY:TR", 0xFE0C0000, 0x080, "MicroReset"), + ("DDRPHY:TR", 0xFE0C0000, 0x090, "MicroContMux"), + ("DDRPHY:TR", 0xFE0C0000, 0x0B4, "TrainingDone(b18)"), + ("DDRPHY:TR", 0xFE0C0000, 0x3CC, "TrainingStep(b0)"), + ("DDRPHY:TR", 0xFE0C0000, 0x514, "TrainingDone"), + ("DDRPHY:TR", 0xFE0C0000, 0x684, "CalBusy"), + ("DDRPHY:TR", 0xFE0C0000, 0xA24, "DfiStatus"), +] +# Per-channel DDRCTL status addresses: expand for all 4 channels. +DDRCTL_CHANNEL_BASES = (0xF7000000, 0xF8000000, 0xF9000000, 0xFA000000) +DDRCTL_STATUS_OFFSETS = [ + ("DDRCTL:SW", 0x10014, "STAT"), + ("DDRCTL:MR", 0x10090, "MRSTAT"), + ("DDRCTL:SW", 0x10C84, "DFISTAT"), + ("DDRCTL:SW", 0x10514, "SWSTAT"), +] +for ch_i, base in enumerate(DDRCTL_CHANNEL_BASES): + for region, off, name in DDRCTL_STATUS_OFFSETS: + TRAINING_TARGETS.append((region, base, off, f"{name} ch{ch_i}")) + + +def run_sim(blob_path, flip_offset, flip_mask, out_csv, max_insn=500_000): + """Run training_sim with a single-address bitflip. Uses env var + BITFLIP_ONLY to narrow the is_training_status predicate in the + simulator. If offset is None, runs plain pass-mode.""" + env = os.environ.copy() + if flip_offset is not None: + env["BITFLIP_ONLY"] = f"{flip_offset:#x}" + mode_args = ["--mode", "bitflip", "--flip-count", "1", + "--flip-mask", f"{flip_mask:#x}"] + else: + env.pop("BITFLIP_ONLY", None) + mode_args = ["--mode", "pass"] + cmd = ["python3", os.path.join(BENCH, "training_sim.py"), + blob_path, *mode_args, "--max-insn", str(max_insn), + "--tripwire-out", out_csv] + r = subprocess.run(cmd, capture_output=True, text=True, env=env) + return r.returncode == 0 + + +def load_csv(path): + out = [] + with open(path, newline="") as f: + r = csv.DictReader(f) + for row in r: + row["seq"] = int(row["seq"]) + row["tick"] = int(row["tick"]) + row["pc"] = int(row["pc"], 16) + row["addr"] = int(row["addr"], 16) + row["val"] = int(row["val"], 16) + out.append(row) + return out + + +def summarise(pass_csv, flip_csv, addr): + """Diff by (addr, rw, val, size) key inside per-fn buckets, not by + index — if retry causes a shift, index-by-index gets noisy. + """ + from collections import defaultdict + p = load_csv(pass_csv) + f = load_csv(flip_csv) + + def bucket(records): + b = defaultdict(list) + for r in records: + b[r["fn"]].append((r["addr"], r["rw"], r["val"], r["size"], r)) + return b + + pb = bucket(p) + fb = bucket(f) + all_fns = set(pb) | set(fb) + + wr_div_rows = [] # (fn, pass_row_or_None, flip_row_or_None) + rd_div_count = 0 + + for fn in all_fns: + pkeys = [(a, rw, v, s) for (a, rw, v, s, _) in pb.get(fn, [])] + fkeys = [(a, rw, v, s) for (a, rw, v, s, _) in fb.get(fn, [])] + if pkeys == fkeys: + continue + # SequenceMatcher alignment per-bucket + import difflib + sm = difflib.SequenceMatcher(a=pkeys, b=fkeys, autojunk=False) + for tag, i1, i2, j1, j2 in sm.get_opcodes(): + if tag == "equal": + continue + # Characterise this edit as "read delta" vs "write delta" + p_rows = pb.get(fn, [])[i1:i2] + f_rows = fb.get(fn, [])[j1:j2] + for (_, _, _, _, row) in p_rows: + if row["rw"] == "wr": + wr_div_rows.append((fn, row, None)) + else: + rd_div_count += 1 + for (_, _, _, _, row) in f_rows: + if row["rw"] == "wr": + wr_div_rows.append((fn, None, row)) + else: + rd_div_count += 1 + return { + "total_records_pass": len(p), + "total_records_flip": len(f), + "read_divergences": rd_div_count, + "write_divergence_rows": wr_div_rows, + } + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("blob", help="path to the DDR TPL blob to drive") + ap.add_argument("--out-dir", default="/tmp/bitflip-sweep") + args = ap.parse_args() + + os.makedirs(args.out_dir, exist_ok=True) + + # Baseline pass-mode run + baseline = os.path.join(args.out_dir, "pass.csv") + print(f"# baseline pass run -> {baseline}") + ok = run_sim(args.blob, None, 0, baseline) + if not ok: + print("baseline run failed", file=sys.stderr); return 1 + + header = (f"{'address':<12} {'region':<11} {'name':<18} " + f"{'rd_div':>6} writes_diverged_in") + print() + print(header) + print("-" * len(header)) + all_wr_details = [] + for region, base, off, name in TRAINING_TARGETS: + addr = base + off + tag = f"0x{addr:08x}" + flip_csv = os.path.join(args.out_dir, f"flip_{addr:08x}.csv") + ok = run_sim(args.blob, addr, 0xFFFFFFFF, flip_csv) + if not ok: + print(f"{tag} {region} {name} -- sim failed") + continue + s = summarise(baseline, flip_csv, addr) + wr_fns = sorted({row[0] for row in s["write_divergence_rows"]}) + preview = ",".join(wr_fns[:4]) + if len(wr_fns) > 4: + preview += f" +{len(wr_fns)-4}" + print(f"{tag:<12} {region:<11} {name:<18} " + f"{s['read_divergences']:>6} {preview}") + for fn, pr, fr in s["write_divergence_rows"]: + all_wr_details.append((addr, name, fn, pr, fr)) + + if all_wr_details: + print("\n## Write-divergence details (retry path changed register values)") + for addr, name, fn, pr, fr in all_wr_details[:60]: + pv = f"pass: addr=0x{pr['addr']:x} val=0x{pr['val']:x}" if pr else "pass: (missing)" + fv = f"flip: addr=0x{fr['addr']:x} val=0x{fr['val']:x}" if fr else "flip: (missing)" + print(f" [{name:<14}] {fn:<22} {pv} | {fv}") + if len(all_wr_details) > 60: + print(f" ... +{len(all_wr_details)-60} more") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/simulation/mmio_regions.py b/simulation/mmio_regions.py new file mode 100644 index 0000000..00172ce --- /dev/null +++ b/simulation/mmio_regions.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +"""mmio_regions.py — address → region classifier for RK3588 DDR TPL. + +Used by mmio_diff.py, blob_emu.py, call_trace.py, training_sim.py to +stamp each access with a short human-readable tag so trace output is +scannable without memorising hex ranges. + +Categories (tag, short description): + DDRCTL uMCTL2 controller (per-channel ch+0x10000 window, or + global 0xFE010000) + DDRCTL:SW STAT/SWCTL/SWSTAT/PWRCTL subregion — frequently polled + DDRCTL:MR mode-register ops (MRCTRL0/MRSTAT subregion) + DDRPHY DDR PHY at 0xFE0C0000 (32 KB) + DDRPHY:TR training status subregion (+0x080/090/0B4/3CC/514/684/A24) + DDR_MEM actual DRAM content — 0x00000000..0x80000000 once trained + SRAM boot SRAM 0xFF000000..0xFF100000 (blob + globals) + CRU clock and reset 0xFD7C0000 + DDR_CRU DDR PHY clock/reset (ch0..3) 0xFD800000..0xFD8C0000 + SCRU secure clock/reset 0xFD8C0000 + PMU_SRAM PMU SRAM 0xFF100000..0xFF110000 + GRF general register file 0xFD580000 + BUS_GRF bus-side GRF 0xFD5F0000 + SGRF secure GRF 0xFE050000 + PMU power-mgmt unit 0xFE400000 + FW_DDR DDR firewall 0xFE030000 + OTP OTP_NS — one-time-programmable controller 0xFECC0000 + UART debug UART 0xFEB50000 + OTHER unmapped / unclassified +""" + +# Per-channel DDRCTL window repeats at these bases. Offset within +# a channel identifies the real sub-register. +DDRCTL_CHANNELS = (0xF7000000, 0xF8000000, 0xF9000000, 0xFA000000) +DDRCTL_GLOBAL = 0xFE010000 +DDRCTL_WIN = 0x20000 +DDRCTL_SUB = 0x10000 # ch+0x10000 lands inside ctrl space + +# Training-status registers (the ones mmio_diff's REGION_OFF stubs). +DDRPHY_TRAINING_OFFSETS = {0x080, 0x090, 0x0B4, 0x3CC, 0x514, 0x684, 0xA24} + +# DDRCTL sub-categories — offsets within the 0x10000 sub-window. +DDRCTL_SW_OFFSETS = {0x10014, 0x10180, 0x10C80, 0x10C84} +DDRCTL_MR_OFFSETS = {0x10080, 0x10090} + + +def classify(addr: int) -> str: + """Return short region tag for an absolute address.""" + # Emulator-only scratch stack (0x00400000..0x00500000) — not a real + # silicon region but tagged distinctly so tripwire can diff stack + # writes (e.g. param_2[] buffers fn_de40 fills). + if 0x00400000 <= addr < 0x00500000: + return "STACK" + # DDR memory (post-training) + if addr < 0x80000000: + return "DDR_MEM" + # Per-channel DDRCTL windows + for base in DDRCTL_CHANNELS: + if base <= addr < base + 0x40000: + off = addr - base + if off in DDRCTL_SW_OFFSETS: + return "DDRCTL:SW" + if off in DDRCTL_MR_OFFSETS: + return "DDRCTL:MR" + return "DDRCTL" + # Global DDRCTL (less common in TPL) + if DDRCTL_GLOBAL <= addr < DDRCTL_GLOBAL + DDRCTL_WIN: + return "DDRCTL" + # DDRPHY 0xFE0C0000..0xFE100000 (256 KB per 4 ports; training at base) + if 0xFE0C0000 <= addr < 0xFE100000: + off = addr & 0xFFF + if off in DDRPHY_TRAINING_OFFSETS: + return "DDRPHY:TR" + return "DDRPHY" + # Clock/reset + if 0xFD7C0000 <= addr < 0xFD800000: return "CRU" + if 0xFD800000 <= addr < 0xFD8C0000: return "DDR_CRU" + if 0xFD8C0000 <= addr < 0xFD8D0000: return "SCRU" + # Register files + if 0xFD580000 <= addr < 0xFD5A0000: return "GRF" + if 0xFD5F0000 <= addr < 0xFD600000: return "BUS_GRF" + if 0xFE050000 <= addr < 0xFE060000: return "SGRF" + # PMU / firewall / scrambler + if 0xFE400000 <= addr < 0xFE410000: return "PMU" + if 0xFE030000 <= addr < 0xFE040000: return "FW_DDR" + if 0xFECC0000 <= addr < 0xFECD0000: return "OTP" + # Debug UART + if 0xFEB50000 <= addr < 0xFEB60000: return "UART" + # Boot SRAM (blob + globals) and PMU SRAM + if 0xFF000000 <= addr < 0xFF100000: return "SRAM" + if 0xFF100000 <= addr < 0xFF110000: return "PMU_SRAM" + return "OTHER" + + +def classify_rw(addr: int, is_write: bool) -> str: + """Direction-aware tag: 'DDRCTL:SW wr' vs 'DDRCTL:SW rd'.""" + return f"{classify(addr):10s} {'wr' if is_write else 'rd'}" + + +if __name__ == "__main__": + import sys + # Smoke-test: classify a few known addresses + tests = [ + (0xFE0C0A24, "DDRPHY:TR"), # DfiStatus + (0xFE0C0000, "DDRPHY"), # generic PHY reg + (0xF7010C80, "DDRCTL:SW"), # SWCTL ch0 + (0xF7010080, "DDRCTL:MR"), # MRCTRL0 ch0 + (0xF7010500, "DDRCTL"), # other DDRCTL + (0xFD7C0000, "CRU"), + (0xFD800000, "DDR_CRU"), + (0xFF001000, "SRAM"), + (0xFF100000, "PMU_SRAM"), + (0xFEB50000, "UART"), + (0x00100000, "DDR_MEM"), + ] + fails = 0 + for addr, want in tests: + got = classify(addr) + ok = "OK" if got == want else "FAIL" + if got != want: fails += 1 + print(f" {ok} 0x{addr:08x} -> {got:12s} (want {want})") + sys.exit(1 if fails else 0) diff --git a/simulation/sim_tripwire.py b/simulation/sim_tripwire.py new file mode 100644 index 0000000..37227aa --- /dev/null +++ b/simulation/sim_tripwire.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 +"""sim_tripwire.py — Bin-style MMIO tracer adapted for Unicorn-sim. + +Records every MMIO access (read + write) with enough context to diff +two simulator runs at sequence level. Modeled on the Bin project's +RAM-ring tripwire primitive, minus the DDR reservation — we're not on +silicon, we're in Python, so we just append to a list. + +Record shape (per Janet, 2026-04-21): + (seq_idx, insn_tick, pc, addr, size, rw, val, region_tag, fn_name) + +Usage from a simulator harness: + + import sim_tripwire + cap = sim_tripwire.Capture(asm_path) # loads funs table + # inside UC_HOOK_MEM_READ: + cap.rd(pc, addr, size, val, insn_tick) + # inside UC_HOOK_MEM_WRITE: + cap.wr(pc, addr, size, val, insn_tick) + cap.emit_csv("/tmp/vendor-trace.csv") + +The companion tool `tripwire_diff.py` reads two CSVs and does a +PC-bucketed diff (group by fn_name, diff per bucket with difflib). +""" +import bisect +import csv +import os +import re + +try: + from mmio_regions import classify as _classify +except ImportError: + def _classify(addr): return "?" + + +# Default location of the vendor disassembly that carries the funs +# table. Defaults to ../ddr_conservative_asm.s relative to this file +# (repo layout); override via env var or constructor arg. +DEFAULT_ASM = os.environ.get( + "RK_DDR_ASM", + os.path.join(os.path.dirname(os.path.abspath(__file__)), + "..", "ddr_conservative_asm.s")) +BLOB_BASE = 0xFF001000 # where the TPL blob lives in SRAM + + +def parse_fun_table(asm_path): + """Parse `// ============ FUN_ @ ============` headers. + + Returns list of (abs_addr, fun_name) sorted by abs_addr so we can + do O(log N) PC → nearest-below lookups. + """ + pat = re.compile(r'// ============ (FUN_[0-9a-fA-F]+) @ ([0-9a-fA-F]+) ============') + out = [] + with open(asm_path) as f: + for ln in f: + m = pat.match(ln) + if not m: + continue + name = m.group(1) + off = int(m.group(2), 16) + out.append((BLOB_BASE + off, name)) + out.sort() + return out + + +class PCResolver: + """PC → nearest containing FUN_ name. + + Uses the vendor funs table (parse_fun_table) as ground truth for + function entries. A PC resolves to the FUN_ whose entry is the + largest ≤ PC. Accuracy depends on the asm covering all functions + — missing entries produce attribution to the previous function. + """ + + def __init__(self, asm_path=DEFAULT_ASM): + self.table = parse_fun_table(asm_path) if os.path.exists(asm_path) else [] + self._keys = [addr for addr, _ in self.table] + self._names = [name for _, name in self.table] + + def resolve(self, pc): + if not self.table: + return "?" + # Find rightmost entry with addr <= pc + i = bisect.bisect_right(self._keys, pc) - 1 + if i < 0: + return "" + return self._names[i] + + +class Capture: + """Append-only tripwire capture. + + Records are (seq_idx, insn_tick, pc, addr, size, rw, val, region, fn). + Keep this lean — writing this list is in the hot Unicorn callback + path. + """ + + def __init__(self, asm_path=DEFAULT_ASM, resolve=True): + self.records = [] + self._pcr = PCResolver(asm_path) if resolve else None + + def _append(self, pc, addr, size, rw, val, insn_tick): + seq = len(self.records) + region = _classify(addr) + fn = self._pcr.resolve(pc) if self._pcr else "?" + self.records.append( + (seq, insn_tick, pc, addr, size, rw, val, region, fn)) + + def rd(self, pc, addr, size, val, insn_tick): + self._append(pc, addr, size, "rd", val, insn_tick) + + def wr(self, pc, addr, size, val, insn_tick): + self._append(pc, addr, size, "wr", val, insn_tick) + + def emit_csv(self, path): + with open(path, "w", newline="") as f: + w = csv.writer(f) + w.writerow(("seq", "tick", "pc", "addr", "size", + "rw", "val", "region", "fn")) + for seq, tick, pc, addr, size, rw, val, region, fn in self.records: + w.writerow((seq, tick, f"0x{pc:x}", f"0x{addr:x}", + size, rw, f"0x{val:x}", region, fn)) + + def summary(self): + """Return (n_total, n_rd, n_wr, per_fn_counter, per_region_counter).""" + from collections import Counter + fn = Counter() + region = Counter() + n_rd = n_wr = 0 + for _, _, _, _, _, rw, _, reg, fname in self.records: + (n_rd if rw == "rd" else n_wr) # no-op; tracked below + if rw == "rd": n_rd += 1 + else: n_wr += 1 + fn[fname] += 1 + region[reg] += 1 + return len(self.records), n_rd, n_wr, fn, region + + +def load_csv(path): + """Read a CSV emitted by emit_csv. Returns list of dict records.""" + out = [] + with open(path, newline="") as f: + r = csv.DictReader(f) + for row in r: + row["seq"] = int(row["seq"]) + row["tick"] = int(row["tick"]) + row["pc"] = int(row["pc"], 16) + row["addr"] = int(row["addr"], 16) + row["size"] = int(row["size"]) + row["val"] = int(row["val"], 16) + out.append(row) + return out + + +if __name__ == "__main__": + # Smoke: parse funs table and show first 5 entries + t = parse_fun_table(DEFAULT_ASM) + print(f"loaded {len(t)} fn entries from {DEFAULT_ASM}") + for addr, name in t[:5]: + print(f" 0x{addr:08x} {name}") + pcr = PCResolver() + # BLOB_BASE + offset of known functions + for off in (0x4, 0x40, 0x3c48, 0xfcc4, 0xde40, 0xf170): + pc = BLOB_BASE + off + print(f" resolve 0x{pc:x} (BLOB+0x{off:x}) -> {pcr.resolve(pc)}") diff --git a/simulation/training_sim.py b/simulation/training_sim.py new file mode 100644 index 0000000..1e803b4 --- /dev/null +++ b/simulation/training_sim.py @@ -0,0 +1,348 @@ +#!/usr/bin/env python3 +"""training_sim.py — DDR training simulator for the RK3588 TPL blob. + +Simulates a DRAM machine that answers the PHY's training handshakes +deterministically, without needing silicon. Two modes: + + --mode pass Every status/poll returns "done/OK/trained". + First-iteration behavior, fastest path through + training. This is the existing mmio_diff default. + + --mode bitflip For N iterations the status register returns a + bit-flipped (wrong) value, forcing the code + through its retry / error-recovery path. After + N bad reads, the value snaps back to the "pass" + word. Default N = 1: classic "first-pass fails, + retry succeeds" PHY behavior. + +Human-readable trace: every MMIO access is tagged with its region +(DDRCTL:SW, DDRPHY:TR, SRAM, UART, ...) so you can scan the log +without memorising the address map. + +Usage: + training_sim.py [--mode pass|bitflip] [--flip-count N] + [--max-insn N] [--verbose] [--limit-trace N] +""" +import argparse, sys, os +from unicorn import * +from unicorn.arm64_const import * + +# Local modules +from mmio_regions import classify +from sim_tripwire import Capture as _TripwireCapture + +SRAM_BASE = 0xFF000000 +SRAM_SIZE = 0x00100000 +BLOB_BASE = 0xFF001000 +STACK_BASE = 0x00400000 +STACK_SIZE = 0x00100000 +RET_STUB = 0x00800000 +RET_SIZE = 0x00001000 + +MMIO = [ + (0xFD580000, 0x00020000), (0xFD5F0000, 0x00010000), + (0xFD7C0000, 0x00040000), (0xFD800000, 0x00010000), + (0xFD8C0000, 0x00010000), + (0xFE010000, 0x00020000), (0xFE030000, 0x00010000), + (0xFE050000, 0x00010000), (0xFE0C0000, 0x00040000), + (0xFE400000, 0x00010000), (0xFECC0000, 0x00010000), + (0xFEB50000, 0x00010000), (0xFF100000, 0x00010000), + (0xF7000000, 0x00040000), (0xF8000000, 0x00040000), + (0xF9000000, 0x00040000), (0xFA000000, 0x00040000), +] + +# Per-address pass values — copied from mmio_diff.ABS_STUB. +ABS_PASS = { + 0xFE0500E0: 0x00000000, + 0xFE050054: 0x00000001, + 0xFE0500E4: 0x00000000, + 0xFEB50014: 0x00000060, + 0xFEB5007C: 0x00000002, +} + +# DDRPHY training-status stubs. Tuple: (base, end, mask, offset, pass_value). +# Copied from mmio_diff.REGION_OFF. +REGION_OFF = [ + (0xFE0C0000, 0xFE100000, 0xFFF, 0xA24, 0x00000002), + (0xFE0C0000, 0xFE100000, 0xFFF, 0x684, 0x00000000), + (0xFE0C0000, 0xFE100000, 0xFFF, 0x090, 0x00000000), + (0xFE0C0000, 0xFE100000, 0xFFF, 0x080, 0x00000000), + (0xFE0C0000, 0xFE100000, 0xFFF, 0x514, 0x00000000), + (0xFE0C0000, 0xFE100000, 0xFFF, 0x3CC, 0x00000001), + (0xFE0C0000, 0xFE100000, 0xFFF, 0x0B4, 0x00040000), + (0xF7000000, 0xFB000000, 0xFFFFFF, 0x10C84, 0x00000001), + (0xF7000000, 0xFB000000, 0xFFFFFF, 0x10090, 0x00010000), + (0xF7000000, 0xFB000000, 0xFFFFFF, 0x10014, 0x00000001), +] +REGION_CONST = [(0xFD8C0000, 0xFD8D0000, 0x00000001)] + +# Addresses that are *training status* — these are the ones bitflip mode +# perturbs. Anything else keeps the pass value even in bitflip mode so +# the test is focused on training retry paths, not boot-infrastructure +# noise. +def is_training_status(addr): + # Env-var override for bitflip_sweep.py: when set to an address, + # only that exact address is considered "training status" and thus + # bitflippable. Lets us flip one register at a time. + only = os.environ.get("BITFLIP_ONLY") + if only: + return addr == int(only, 0) + if 0xFE0C0000 <= addr < 0xFE100000: + off = addr & 0xFFF + return off in (0x080, 0x090, 0x0B4, 0x3CC, 0x514, 0x684, 0xA24) + if 0xF7000000 <= addr < 0xFB000000: + off = addr & 0xFFFFFF + return off in (0x10014, 0x10090, 0x10C84, 0x10514) + return False + + +def pass_value(addr): + """Return the 'all-good' stub value for a status address.""" + if addr in ABS_PASS: return ABS_PASS[addr] + for rbase, rend, mask, off_val, rv in REGION_OFF: + if rbase <= addr < rend and (addr & mask) == off_val: + return rv + for rbase, rend, rv in REGION_CONST: + if rbase <= addr < rend: + return rv + # SWSTAT-like toggle: ch+0x10514 alternates per read (preserves + # fn_29f4 two-poll-opposite-polarity expectation). + return None # caller applies its own fallback + + +class TrainingSim: + def __init__(self, mode, flip_count, flip_mask, limit_trace, verbose): + self.mode = mode # "pass" or "bitflip" + self.flip_count = flip_count # how many first reads return flipped + self.flip_mask = flip_mask # XOR mask for bitflip + self.limit_trace = limit_trace + self.verbose = verbose + # Per-address read counters (used both for bitflip semantics + # and for the SWSTAT toggle that the existing harness needs). + self._reads = {} + self._swstat_toggle = {} + # Traces + self.access_log = [] # (kind, pc, addr, size, val, region) + self.training_log = [] # (n, pc, addr, stub_val, flipped?) + + def read_value(self, addr, size): + count = self._reads.get(addr, 0) + self._reads[addr] = count + 1 + + # Fast path — pass value if defined + pv = pass_value(addr) + if pv is None: + # SWSTAT-like toggle at ch+0x10514 + if 0xF7000000 <= addr < 0xFB000000 and (addr & 0xFFFFFF) == 0x10514: + n = self._swstat_toggle.get(addr, 0) + self._swstat_toggle[addr] = n + 1 + pv = 1 if (n & 1) else 0 + else: + pv = 0 + + # Apply bitflip to training-status only, first N reads per addr + flipped = False + if self.mode == "bitflip" and is_training_status(addr): + if count < self.flip_count: + pv ^= self.flip_mask + flipped = True + self.training_log.append((count, addr, pv, True)) + else: + self.training_log.append((count, addr, pv, False)) + + return pv & ((1 << (size * 8)) - 1) + + def log(self, kind, pc, addr, size, val): + if len(self.access_log) < self.limit_trace: + self.access_log.append((kind, pc, addr, size, val, classify(addr))) + + +def run(blob_path, sim, max_insn, tripwire=None): + blob = open(blob_path, "rb").read() + uc = Uc(UC_ARCH_ARM64, UC_MODE_ARM) + uc.mem_map(SRAM_BASE, SRAM_SIZE, UC_PROT_ALL) + uc.mem_write(BLOB_BASE, blob) + uc.mem_map(STACK_BASE, STACK_SIZE, UC_PROT_ALL) + uc.mem_map(RET_STUB, RET_SIZE, UC_PROT_ALL) + uc.mem_write(RET_STUB, b"\x00\x00\x20\xd4") # brk #0 + + for base, sz in MMIO: + uc.mem_map(base, sz, UC_PROT_ALL) + + state = {"count": 0, "last_pc": 0, "same_pc": 0, "max_pc": 0, "writes": 0} + + def hook_code(uc, addr, size, ud): + state["count"] += 1 + if addr == state["last_pc"]: + state["same_pc"] += 1 + if state["same_pc"] > 10000: + uc.emu_stop() + else: + state["same_pc"] = 0 + state["last_pc"] = addr + if addr > state["max_pc"]: + state["max_pc"] = addr + if state["count"] >= max_insn: + uc.emu_stop() + + def hook_read(uc, typ, addr, size, val, ud): + v = sim.read_value(addr, size) + uc.mem_write(addr, v.to_bytes(size, "little")) + pc = uc.reg_read(UC_ARM64_REG_PC) + sim.log("rd", pc, addr, size, v) + if tripwire is not None: + tripwire.rd(pc, addr, size, v, state["count"]) + + uart_buf = bytearray() + def hook_write(uc, typ, addr, size, val, ud): + pc = uc.reg_read(UC_ARM64_REG_PC) + state["writes"] += 1 + sim.log("wr", pc, addr, size, val) + if tripwire is not None: + tripwire.wr(pc, addr, size, val, state["count"]) + if addr == 0xFEB50000: + c = val & 0xFF + uart_buf.append(c) + + def hook_unmapped(uc, typ, addr, size, val, ud): + page = addr & ~0xFFFF + try: + uc.mem_map(page, 0x10000, UC_PROT_ALL) + except UcError: + pass + if typ == UC_MEM_READ_UNMAPPED: + v = sim.read_value(addr, size) + uc.mem_write(addr, v.to_bytes(size, "little")) + pc = uc.reg_read(UC_ARM64_REG_PC) + sim.log("rd", pc, addr, size, v) + if tripwire is not None: + tripwire.rd(pc, addr, size, v, state["count"]) + elif typ == UC_MEM_WRITE_UNMAPPED: + pc = uc.reg_read(UC_ARM64_REG_PC) + state["writes"] += 1 + sim.log("wr", pc, addr, size, val) + if tripwire is not None: + tripwire.wr(pc, addr, size, val, state["count"]) + return True + + uc.hook_add(UC_HOOK_CODE, hook_code) + for base, sz in MMIO: + uc.hook_add(UC_HOOK_MEM_READ, hook_read, begin=base, end=base + sz) + uc.hook_add(UC_HOOK_MEM_WRITE, hook_write, begin=base, end=base + sz) + uc.hook_add(UC_HOOK_MEM_UNMAPPED, hook_unmapped) + + uc.reg_write(UC_ARM64_REG_SP, STACK_BASE + STACK_SIZE - 16) + uc.reg_write(UC_ARM64_REG_X30, BLOB_BASE + 0x40) + + XREG = [getattr(__import__("unicorn.arm64_const", fromlist=["X"]), + f"UC_ARM64_REG_X{i}") for i in range(31)] + pc = BLOB_BASE + remaining = max_insn + while remaining > 0: + try: + uc.emu_start(pc, RET_STUB, count=remaining) + break + except UcError as e: + pc = uc.reg_read(UC_ARM64_REG_PC) + try: + insn = int.from_bytes(uc.mem_read(pc, 4), "little") + except UcError: + break + if (insn >> 20) == 0xD53: + rt = insn & 0x1F + if rt < 31: + uc.reg_write(XREG[rt], 0) + pc += 4 + uc.reg_write(UC_ARM64_REG_PC, pc) + remaining -= 1 + continue + if (insn >> 20) in (0xD51, 0xD50): + pc += 4 + uc.reg_write(UC_ARM64_REG_PC, pc) + remaining -= 1 + continue + break + + return state, uart_buf + + +def print_summary(sim, state, uart_buf, region_hist): + print(f"# training_sim mode={sim.mode} flip_count={sim.flip_count} " + f"flip_mask=0x{sim.flip_mask:x}") + print(f"insns: {state['count']} writes: {state['writes']} " + f"max_pc: 0x{state['max_pc']:x}") + print() + print("# region histogram (access count by region):") + for region, (rd, wr) in sorted(region_hist.items(), + key=lambda x: -(x[1][0] + x[1][1])): + print(f" {region:12s} rd={rd:6d} wr={wr:6d}") + if sim.training_log: + print() + print(f"# training-status reads ({len(sim.training_log)}):") + for count, addr, val, flipped in sim.training_log[:20]: + tag = "FLIP" if flipped else " " + print(f" [{count}] {tag} {classify(addr):10s} " + f"0x{addr:08x} -> 0x{val:08x}") + if len(sim.training_log) > 20: + print(f" ... +{len(sim.training_log)-20} more") + if uart_buf: + print() + print(f"# UART TX ({len(uart_buf)} bytes):") + print(uart_buf.decode('utf-8', errors='replace')) + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("blob") + ap.add_argument("--mode", choices=("pass", "bitflip"), default="pass", + help="pass = always answer training positively; " + "bitflip = flip returned bits for N reads, then pass") + ap.add_argument("--flip-count", type=int, default=1, + help="N: how many flipped reads per status address before " + "reverting to pass (default 1)") + ap.add_argument("--flip-mask", default="0xFFFFFFFF", + help="XOR mask applied to training status (default: " + "invert all bits, which usually reads as 'not done')") + ap.add_argument("--max-insn", type=int, default=500_000) + ap.add_argument("--limit-trace", type=int, default=200, + help="cap on per-access trace rows stored (no I/O cost)") + ap.add_argument("--verbose", action="store_true", + help="print full per-access trace (may be very long)") + ap.add_argument("--tripwire-out", default=None, metavar="CSV", + help="write full PC-resolved access trace to this CSV") + args = ap.parse_args() + + sim = TrainingSim( + mode=args.mode, + flip_count=args.flip_count, + flip_mask=int(args.flip_mask, 0), + limit_trace=args.limit_trace if not args.verbose else 10**9, + verbose=args.verbose, + ) + tripwire = _TripwireCapture() if args.tripwire_out else None + state, uart_buf = run(args.blob, sim, args.max_insn, tripwire=tripwire) + if tripwire is not None: + tripwire.emit_csv(args.tripwire_out) + print(f"# tripwire: {len(tripwire.records)} records -> " + f"{args.tripwire_out}") + + # Region histogram from the trace (capped by limit_trace). + region_hist = {} + for kind, pc, addr, size, val, region in sim.access_log: + rd, wr = region_hist.get(region, (0, 0)) + if kind == "rd": + region_hist[region] = (rd + 1, wr) + else: + region_hist[region] = (rd, wr + 1) + + if args.verbose: + for kind, pc, addr, size, val, region in sim.access_log: + print(f" PC=0x{pc:08x} [{region:10s}] {kind} " + f"0x{addr:08x} sz={size} val=0x{val:x}") + + print_summary(sim, state, uart_buf, region_hist) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/simulation/tripwire_diff.py b/simulation/tripwire_diff.py new file mode 100644 index 0000000..bfe7f3b --- /dev/null +++ b/simulation/tripwire_diff.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 +"""tripwire_diff.py — PC-bucketed sequence diff of two tripwire CSVs. + +Per Janet (2026-04-21): cross-index diff is destroyed the moment control +flow diverges (bitflip mode guarantees this). Group records by fn_name, +diff per bucket with difflib.SequenceMatcher. Long edit-distance buckets +get tagged SUSPECT and emit a raw side-by-side sub-sequence for human +triage — we do not try to auto-resolve. + +Usage: + tripwire_diff.py vendor.csv rebuilt.csv [--suspect-threshold 0.9] + [--show-identical] [--limit-per-bucket N] + +Key for each record inside a bucket (tunable): region + addr + rw + val. +PC is excluded because codegen reg-alloc can shift individual load/store +PCs within a function without changing behavior. `seq` and `tick` are +excluded because they drift with any upstream path difference. +""" +import argparse +import difflib +import sys +from collections import defaultdict + +from sim_tripwire import load_csv + + +def bucket_key(rec): + """Inside a fn_name bucket, the canonical record key for diffing.""" + return (rec["region"], rec["addr"], rec["rw"], rec["val"], rec["size"]) + + +def bucket_by_fn(records): + buckets = defaultdict(list) + for r in records: + buckets[r["fn"]].append(r) + return buckets + + +def ratio(seq_a, seq_b): + """Cheap-first ratio. Skips O(n²) SequenceMatcher when obviously not similar.""" + if not seq_a and not seq_b: + return 1.0 + if not seq_a or not seq_b: + return 0.0 + if seq_a == seq_b: + return 1.0 + sm = difflib.SequenceMatcher(a=seq_a, b=seq_b, autojunk=False) + # quick_ratio is an upper bound computed from set intersection — + # useful as an early reject when buckets share nothing. + qr = sm.quick_ratio() + if qr < 0.5: + return qr + return sm.ratio() + + +def _sm_cache(va, vb): + """Return (key_a, key_b, cached SequenceMatcher) once, reuse for opcodes.""" + ka = [bucket_key(r) for r in va] + kb = [bucket_key(r) for r in vb] + if ka == kb: + return ka, kb, None + sm = difflib.SequenceMatcher(a=ka, b=kb, autojunk=False) + return ka, kb, sm + + +def fmt_rec(rec): + return (f"{rec['region']:11s} {rec['rw']} 0x{rec['addr']:08x} " + f"sz={rec['size']} val=0x{rec['val']:x} (pc=0x{rec['pc']:x})") + + +def diff_bucket(name, va, vb, limit, show_identical): + ka = [bucket_key(r) for r in va] + kb = [bucket_key(r) for r in vb] + r = ratio(ka, kb) + status = "OK " if ka == kb else f"{r:.3f} " + if ka == kb and not show_identical: + return status, None + if ka == kb: + return status, (f"{name:22s} OK {len(va)} records match " + "(showing on --show-identical)") + # Surface an edit-script side-by-side + sm = difflib.SequenceMatcher(a=ka, b=kb, autojunk=False) + lines = [f"{name:22s} {status} " + f"vendor={len(va):5d} rebuilt={len(vb):5d}"] + shown = 0 + for tag, i1, i2, j1, j2 in sm.get_opcodes(): + if tag == "equal": + continue + for i in range(i1, i2): + if shown >= limit: break + lines.append(f" - [V#{va[i]['seq']:5d}] {fmt_rec(va[i])}") + shown += 1 + for j in range(j1, j2): + if shown >= limit: break + lines.append(f" + [R#{vb[j]['seq']:5d}] {fmt_rec(vb[j])}") + shown += 1 + if shown >= limit: + lines.append(f" ... (truncated at {limit} per bucket)") + break + return status, "\n".join(lines) + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("vendor") + ap.add_argument("rebuilt") + ap.add_argument("--suspect-threshold", type=float, default=0.9, + help="buckets with ratio below this get SUSPECT tag") + ap.add_argument("--show-identical", action="store_true") + ap.add_argument("--limit-per-bucket", type=int, default=20, + help="max insert/delete lines per bucket in the report") + args = ap.parse_args() + + vrecs = load_csv(args.vendor) + rrecs = load_csv(args.rebuilt) + print(f"# vendor: {len(vrecs):6d} records ({args.vendor})") + print(f"# rebuilt: {len(rrecs):6d} records ({args.rebuilt})") + + vb = bucket_by_fn(vrecs) + rb = bucket_by_fn(rrecs) + fns = sorted(set(vb) | set(rb)) + + print(f"# buckets: {len(fns)} functions touched across either side") + print() + ok = susp = diff = 0 + reports = [] + suspects = [] + for fn in fns: + va = vb.get(fn, []) + rs = rb.get(fn, []) + ka, kb, sm = _sm_cache(va, rs) + if sm is None: + ok += 1 + if args.show_identical: + reports.append(f"{fn:22s} OK {len(va)} records") + continue + # Fast: set-intersection upper bound; short-circuit on no overlap + qr = sm.quick_ratio() + r = qr if qr < 0.5 else sm.ratio() + tag = "SUSPECT" if r < args.suspect_threshold else " " + if r < args.suspect_threshold: + susp += 1 + else: + diff += 1 + lines = [f"{fn:22s} vendor={len(va):5d} rebuilt={len(rs):5d}"] + shown = 0 + for op_tag, i1, i2, j1, j2 in sm.get_opcodes(): + if op_tag == "equal": + continue + for i in range(i1, i2): + if shown >= args.limit_per_bucket: break + lines.append(f" - [V#{va[i]['seq']:5d}] {fmt_rec(va[i])}") + shown += 1 + for j in range(j1, j2): + if shown >= args.limit_per_bucket: break + lines.append(f" + [R#{rs[j]['seq']:5d}] {fmt_rec(rs[j])}") + shown += 1 + if shown >= args.limit_per_bucket: + lines.append(f" ... (truncated at {args.limit_per_bucket})") + break + rep = "\n".join(lines) + if r < args.suspect_threshold: + suspects.append((r, fn, rep)) + else: + reports.append(f"[{tag}] r={r:.3f} " + rep) + + print(f"# OK: {ok} minor-diff: {diff} SUSPECT(<{args.suspect_threshold}): {susp}") + print() + if suspects: + suspects.sort() + print(f"## SUSPECT BUCKETS ({len(suspects)}) — human triage required") + print() + for _, fn, rep in suspects: + print(rep) + print() + # Any minor-diff buckets worth dumping too + for line in reports: + if "SUSPECT" not in line: + continue + for line in reports: + if "SUSPECT" in line: + continue + print(line) + + return 0 if not suspects else 1 + + +if __name__ == "__main__": + sys.exit(main())