simulation: tripwire + PC-bucketed diff + bitflip sweep

Ship the new simulation & verification stack under simulation/:

- mmio_regions.py — address → region classifier (DDRCTL, DDRPHY,
  OTP, SRAM, …). Shared by every other tool so trace output is
  scannable without memorising the memory map.
- sim_tripwire.py — Bin-style per-access capture. Records
  (seq, insn_tick, pc, addr, size, rw, val, region, fn_name) per
  MMIO access. PCResolver bisects the vendor funs table parsed
  from ddr_conservative_asm.s.
- tripwire_diff.py — PC-bucketed difflib.SequenceMatcher diff of
  two tripwire CSVs. Buckets by fn_name so bitflip-induced control
  flow divergence doesn't cascade noise.
- training_sim.py — DDR training simulator with --mode pass and
  --mode bitflip (flip first N reads per training status, exercise
  retry paths). BITFLIP_ONLY env var narrows to a single addr for
  the sweep.
- bitflip_sweep.py — Flip each of 23 training-status addresses
  one-at-a-time and tabulate retry convergence. Surfaces which
  function(s) react to a transient fault by writing different
  downstream register values.

Plus:

- mmio_diff.py updated: region-tagged divergence output,
  --show-regions histogram, --tripwire-out-{vendor,rebuilt} CSV
  capture, --capture-stack-writes for stack-allocated buffer diffs.
- debug_probes/tp_slot_{probe,writes}.py — ad-hoc Unicorn probes
  for chasing a single-slot divergence in an SRAM buffer. Kept as
  reference examples of how to extend the tripwire toolchain.

The stack found 6 silicon-hostile bugs in the rebuilt blob that
mmio_diff's write-sequence gate was structurally blind to, including
three ld-unresolved-symbol NULL derefs (case-mismatched externs,
missing DATA_SYMS) and one C-early-return-skips-shared-tail bug
where vendor's asm fell through to the tail via `b` after a `ret`.
This commit is contained in:
2026-04-22 05:55:28 +02:00
parent e20563e2ef
commit 46155bbe91
10 changed files with 1796 additions and 2 deletions
+39 -2
View File
@@ -14,8 +14,27 @@ n## Prerequisites
Decompilation, analysis, and patching of the closed-source Rockchip RK3588 Decompilation, analysis, patching, and **pre-silicon simulation** of the
DDR initialization binary blobs. closed-source Rockchip RK3588 DDR initialization binary blobs.
The project has three layers:
1. **Static RE** — Ghidra-exported decompiled C + disassembly +
annotated register map (`ddr_annotated.c`, `rk3588_ddr.h`).
2. **Patch + flash**`patch_prod.py` rewrites specific poll loops
in the vendor blob to work around known hangs. Validated under
Unicorn via `blob_emu.py` before flash.
3. **Matching-decomp rebuild + simulation** — the goal is a buildable
working DDR blob (not bit-identical reproduction). Per-function C
ports are spliced into the vendor blob; `mmio_diff.py` gates
behavioral equivalence by MMIO write sequence. The `simulation/`
subdir adds read-side tripwire capture, PC-bucketed diff, and
per-address bitflip fault injection for retry-path validation.
> "Markus' insistence on simulation before flashing paid off. Big
> time. Again." — 2026-04-21. Tripwire + PC-bucket diff caught three
> silent NULL-derefs hidden behind `mmio_diff=3173/3173` green.
> `ld --unresolved-symbols=ignore-all` had zeroed undefined
> DATA_SYMS externs; silicon would have bricked.
## Quick Start ## Quick Start
@@ -73,8 +92,26 @@ gcc -O2 -o ddr_emu ddr_emu2.c -lunicorn -lm
| File | Description | | File | Description |
|------|-------------| |------|-------------|
| `ddr_emu2.c` | Unicorn-based C emulator with MMIO stubs | | `ddr_emu2.c` | Unicorn-based C emulator with MMIO stubs |
| `blob_emu.py` | Python Unicorn harness — runs a blob, stubs MMIO, captures UART TX |
| `mmio_diff.py` | Runs vendor + rebuilt, diffs MMIO write sequences; region-tagged output |
| Ghidra project | On oppenheimer (CT131): `/opt/work/ghidra_project/` | | Ghidra project | On oppenheimer (CT131): `/opt/work/ghidra_project/` |
### Simulation & Verification Stack (`simulation/`)
| File | Description |
|------|-------------|
| `simulation/mmio_regions.py` | Address → region classifier (`DDRCTL`, `DDRPHY`, `OTP`, `SRAM`, …) |
| `simulation/sim_tripwire.py` | Bin-style per-access capture with PC → fn name resolver |
| `simulation/tripwire_diff.py` | PC-bucketed SequenceMatcher diff of two tripwire CSVs |
| `simulation/training_sim.py` | DDR-training simulator: `pass` and `bitflip-first-pass` modes |
| `simulation/bitflip_sweep.py` | Flip each training-status addr one-at-a-time, report retry convergence |
| `simulation/README.md` | Synopsis + usage for the above |
### Debug Probes (`debug_probes/`)
| File | Description |
|------|-------------|
| `debug_probes/tp_slot_probe.py` | Snapshot the `tp` timing buffer at fn_5540's read site |
| `debug_probes/tp_slot_writes.py` | List every write to a specific `tp` slot, both vendor and rebuilt |
### Ghidra Export Scripts ### Ghidra Export Scripts
| File | Description | | File | Description |
|------|-------------| |------|-------------|
+152
View File
@@ -0,0 +1,152 @@
#!/usr/bin/env python3
"""tp_slot_probe.py — snapshot tp[0x4f] and tp[0x55] at the exact PC
where fn_5540 reads them, and dump tp[base..base+0x2ac] for diff.
Runs on vendor.bin and rebuilt.bin side-by-side. The PCs for the two
ldrs differ between vendor and rebuilt codegen, so we probe multiple
candidate PCs by looking for `ldr w, [x19, #0x154]` and
`ldr w, [x19, #0x13c]` equivalents.
"""
import argparse
import os
import sys
from unicorn import *
from unicorn.arm64_const import *
sys.path.insert(0, os.path.join(
os.path.dirname(os.path.abspath(__file__)), '..'))
from mmio_diff import (SRAM_BASE, BLOB_BASE, STACK_BASE, RET_STUB,
MMIO, XREG, stub_value, reset_stub_state)
# Search window: PCs between first fn_5540 MMIO write (match 194 = 0x6978)
# and the diverging write (0x69c8/0x6a74). Capture x19 + read the two
# slots as soon as we see an instruction pattern `ldr w?, [x19, #0x154]`
# or `ldr w?, [x19, #0x13c]` (encoding 0xb94_154.. / 0xb94_13c..).
# Simpler: hook every PC in fn_5540 [0x5540..0x6040) and, on each, if
# the insn looks like such an ldr, snapshot x19 and memory contents.
def match_ldr_w_imm(ins, imm):
"""Match `ldr w?, [x?, #imm]` — any base reg. Returns rn or None."""
if (ins >> 22) != 0b1011100101:
return None
imm12 = (ins >> 10) & 0xFFF
if imm12 * 4 != imm:
return None
return (ins >> 5) & 0x1F
def run(blob_path, max_insn=500_000):
reset_stub_state()
blob = open(blob_path, 'rb').read()
uc = Uc(UC_ARCH_ARM64, UC_MODE_ARM)
uc.mem_map(SRAM_BASE, 0x100000, UC_PROT_ALL)
uc.mem_write(BLOB_BASE, blob)
uc.mem_map(STACK_BASE, 0x100000, UC_PROT_ALL)
uc.mem_map(RET_STUB, 0x1000, UC_PROT_ALL)
uc.mem_write(RET_STUB, b'\x00\x00\x20\xd4')
for b, s in MMIO:
uc.mem_map(b, s, UC_PROT_ALL)
state = {'count': 0, 'prev_pc': 0, 'same_pc': 0,
'snap': None, 'tp_dump': None}
def hook_code(uc, addr, size, ud):
state['count'] += 1
if addr == state.get('prev_pc'):
state['same_pc'] += 1
if state['same_pc'] > 10000: uc.emu_stop()
else:
state['same_pc'] = 0; state['prev_pc'] = addr
if state['count'] >= max_insn: uc.emu_stop()
# Whole-blob search — the rebuilt's ldr site may be anywhere in
# fn_5540 post-recompile.
if not (0xff001000 <= addr < 0xff020000):
return
try:
ins = int.from_bytes(uc.mem_read(addr, 4), 'little')
except UcError:
return
# Look for ldr w, [x?, #0x13c] — this is the tp[0x4f] load
rn = match_ldr_w_imm(ins, 0x13c)
if rn is not None and state['snap'] is None:
base = uc.reg_read(XREG[rn])
try:
tp4f = int.from_bytes(uc.mem_read(base + 0x13c, 4), 'little')
tp55 = int.from_bytes(uc.mem_read(base + 0x154, 4), 'little')
dump = uc.mem_read(base, 0x2ac)
except UcError:
return
state['snap'] = (addr, base, rn, tp4f, tp55)
state['tp_dump'] = bytes(dump)
def hook_mmio_read(uc, typ, addr, size, val, ud):
v = stub_value(addr) & ((1 << size*8) - 1)
uc.mem_write(addr, v.to_bytes(size, 'little'))
def hook_mmio_write(uc, typ, addr, size, val, ud):
pass
def hook_unmapped(uc, typ, addr, size, val, ud):
page = addr & ~0xFFFF
try: uc.mem_map(page, 0x10000, UC_PROT_ALL)
except UcError: pass
if typ == UC_MEM_READ_UNMAPPED:
v = stub_value(addr) & ((1 << size*8) - 1)
uc.mem_write(addr, v.to_bytes(size, 'little'))
return True
uc.hook_add(UC_HOOK_CODE, hook_code)
for b, s in MMIO:
uc.hook_add(UC_HOOK_MEM_READ, hook_mmio_read, begin=b, end=b + s)
uc.hook_add(UC_HOOK_MEM_WRITE, hook_mmio_write, begin=b, end=b + s)
uc.hook_add(UC_HOOK_MEM_UNMAPPED, hook_unmapped)
uc.reg_write(UC_ARM64_REG_SP, STACK_BASE + 0xF0000)
uc.reg_write(UC_ARM64_REG_X30, BLOB_BASE + 0x40)
pc = BLOB_BASE
remaining = max_insn
while remaining > 0:
try:
uc.emu_start(pc, RET_STUB, count=remaining); break
except UcError as e:
pc = uc.reg_read(UC_ARM64_REG_PC)
try: insn = int.from_bytes(uc.mem_read(pc, 4), 'little')
except UcError: break
if (insn >> 20) == 0xD53:
rt = insn & 0x1F
if rt < 31: uc.reg_write(XREG[rt], 0)
pc += 4; uc.reg_write(UC_ARM64_REG_PC, pc); remaining -= 1; continue
if (insn >> 20) in (0xD51, 0xD50):
pc += 4; uc.reg_write(UC_ARM64_REG_PC, pc); remaining -= 1; continue
break
return state
def main():
ap = argparse.ArgumentParser()
ap.add_argument('blob')
args = ap.parse_args()
state = run(args.blob)
if state['snap'] is None:
print(f'NO snapshot captured — did not find ldr w, [x19, #0x13c] in fn_5540 range')
return
pc, tp_base, rn, tp4f, tp55 = state['snap']
print(f'fn_5540 read site: pc=0x{pc:x} (via x{rn})')
print(f' tp_base=0x{tp_base:x}')
print(f' tp[0x4f] (+0x13c) = 0x{tp4f:08x}')
print(f' tp[0x55] (+0x154) = 0x{tp55:08x}')
print(f' computed write val = 0x{(tp55 | (tp4f << 16)) & 0xFFFFFFFF:08x}')
import sys as _sys
print(' tp dump (32-byte-line, u32):')
dump = state['tp_dump']
for off in range(0, min(0x2ac, len(dump)), 32):
import struct
words = struct.unpack_from('<8I', dump, off)
print(f' +0x{off:03x}: ' + ' '.join(f'{w:08x}' for w in words))
if __name__ == '__main__':
sys.exit(main())
+118
View File
@@ -0,0 +1,118 @@
#!/usr/bin/env python3
"""tp_slot_writes.py — list every write to the tp buffer's +0x13c slot
(tp[0x4f]) during the run, on both vendor and rebuilt. Since tp lives
in SRAM at 0xff0164f8 (discovered via tp_slot_probe), we just hook
SRAM writes to that exact address.
"""
import argparse
import os
import sys
from unicorn import *
from unicorn.arm64_const import *
sys.path.insert(0, os.path.join(
os.path.dirname(os.path.abspath(__file__)), '..'))
from mmio_diff import (SRAM_BASE, BLOB_BASE, STACK_BASE, RET_STUB,
MMIO, XREG, stub_value, reset_stub_state)
TP_BASE = 0xff0164f8
TP_SLOT_4F = TP_BASE + 0x13c
TP_SLOT_55 = TP_BASE + 0x154
def run(blob_path, max_insn=500_000):
reset_stub_state()
blob = open(blob_path, 'rb').read()
uc = Uc(UC_ARCH_ARM64, UC_MODE_ARM)
uc.mem_map(SRAM_BASE, 0x100000, UC_PROT_ALL)
uc.mem_write(BLOB_BASE, blob)
uc.mem_map(STACK_BASE, 0x100000, UC_PROT_ALL)
uc.mem_map(RET_STUB, 0x1000, UC_PROT_ALL)
uc.mem_write(RET_STUB, b'\x00\x00\x20\xd4')
for b, s in MMIO:
uc.mem_map(b, s, UC_PROT_ALL)
state = {'count': 0, 'prev_pc': 0, 'same_pc': 0, 'writes_4f': [],
'writes_55': []}
def hook_code(uc, addr, size, ud):
state['count'] += 1
if addr == state.get('prev_pc'):
state['same_pc'] += 1
if state['same_pc'] > 10000: uc.emu_stop()
else:
state['same_pc'] = 0; state['prev_pc'] = addr
if state['count'] >= max_insn: uc.emu_stop()
def hook_mmio_read(uc, typ, addr, size, val, ud):
v = stub_value(addr) & ((1 << size*8) - 1)
uc.mem_write(addr, v.to_bytes(size, 'little'))
def hook_mmio_write(uc, typ, addr, size, val, ud):
pass
def hook_sram_write(uc, typ, addr, size, val, ud):
if addr == TP_SLOT_4F or (addr <= TP_SLOT_4F < addr + size):
pc = uc.reg_read(UC_ARM64_REG_PC)
state['writes_4f'].append((state['count'], pc, addr, size, val))
if addr == TP_SLOT_55 or (addr <= TP_SLOT_55 < addr + size):
pc = uc.reg_read(UC_ARM64_REG_PC)
state['writes_55'].append((state['count'], pc, addr, size, val))
def hook_unmapped(uc, typ, addr, size, val, ud):
page = addr & ~0xFFFF
try: uc.mem_map(page, 0x10000, UC_PROT_ALL)
except UcError: pass
if typ == UC_MEM_READ_UNMAPPED:
v = stub_value(addr) & ((1 << size*8) - 1)
uc.mem_write(addr, v.to_bytes(size, 'little'))
return True
uc.hook_add(UC_HOOK_CODE, hook_code)
for b, s in MMIO:
uc.hook_add(UC_HOOK_MEM_READ, hook_mmio_read, begin=b, end=b + s)
uc.hook_add(UC_HOOK_MEM_WRITE, hook_mmio_write, begin=b, end=b + s)
# Hook SRAM writes in the whole blob+data region where tp lives.
uc.hook_add(UC_HOOK_MEM_WRITE, hook_sram_write,
begin=SRAM_BASE, end=SRAM_BASE + 0x100000)
uc.hook_add(UC_HOOK_MEM_UNMAPPED, hook_unmapped)
uc.reg_write(UC_ARM64_REG_SP, STACK_BASE + 0xF0000)
uc.reg_write(UC_ARM64_REG_X30, BLOB_BASE + 0x40)
pc = BLOB_BASE
remaining = max_insn
while remaining > 0:
try:
uc.emu_start(pc, RET_STUB, count=remaining); break
except UcError as e:
pc = uc.reg_read(UC_ARM64_REG_PC)
try: insn = int.from_bytes(uc.mem_read(pc, 4), 'little')
except UcError: break
if (insn >> 20) == 0xD53:
rt = insn & 0x1F
if rt < 31: uc.reg_write(XREG[rt], 0)
pc += 4; uc.reg_write(UC_ARM64_REG_PC, pc); remaining -= 1; continue
if (insn >> 20) in (0xD51, 0xD50):
pc += 4; uc.reg_write(UC_ARM64_REG_PC, pc); remaining -= 1; continue
break
return state
def main():
ap = argparse.ArgumentParser()
ap.add_argument('blob')
args = ap.parse_args()
state = run(args.blob)
print(f'=== writes to tp[0x4f] (@{TP_SLOT_4F:#x}) — {len(state["writes_4f"])} total ===')
for tick, pc, addr, size, val in state['writes_4f']:
off = pc - 0xff001000
print(f' tick={tick:7d} pc=0x{pc:x} (blob+0x{off:05x}) addr=0x{addr:x} sz={size} val=0x{val:x}')
print(f'=== writes to tp[0x55] (@{TP_SLOT_55:#x}) — {len(state["writes_55"])} total ===')
for tick, pc, addr, size, val in state['writes_55']:
off = pc - 0xff001000
print(f' tick={tick:7d} pc=0x{pc:x} (blob+0x{off:05x}) addr=0x{addr:x} sz={size} val=0x{val:x}')
if __name__ == '__main__':
sys.exit(main())
+279
View File
@@ -0,0 +1,279 @@
#!/usr/bin/env python3
"""mmio_diff.py — log MMIO writes from vendor + rebuilt, diff sequences.
MMIO writes are the externally observable behavior. If the rebuilt writes
the same address/value/order as vendor, it is behaviorally equivalent —
register-level differences from compiler reg-alloc are irrelevant.
First divergent write identifies the function that generates wrong output.
Usage:
mmio_diff.py <vendor.bin> <rebuilt.bin> [--max N]
"""
import argparse, sys, os
from unicorn import *
from unicorn.arm64_const import *
# Region classifier — stamps each address with a human-readable tag
# (DDRCTL:SW, DDRPHY:TR, SRAM, UART, ...). Keeps diff output scannable
# by readers who don't have the address map memorised.
try:
from mmio_regions import classify as _classify_region
except ImportError:
def _classify_region(addr): return "?"
# Tripwire module — full PC-resolved access capture for CSV emit.
try:
from sim_tripwire import Capture as _TripwireCapture
except ImportError:
_TripwireCapture = None
SRAM_BASE = 0xFF000000
BLOB_BASE = 0xFF001000
STACK_BASE = 0x00400000
RET_STUB = 0x00800000
MMIO = [
(0xFD580000, 0x00020000), (0xFD5F0000, 0x00010000),
(0xFD7C0000, 0x00040000), (0xFD800000, 0x00010000),
(0xFD8C0000, 0x00010000),
(0xFE010000, 0x00020000), (0xFE030000, 0x00010000),
(0xFE050000, 0x00010000), (0xFE0C0000, 0x00040000),
(0xFE400000, 0x00010000), (0xFECC0000, 0x00010000),
(0xFEB50000, 0x00010000), (0xFF100000, 0x00010000),
# DDR per-channel bases: ch0-ch3. ddrctl = ch+0x10000; MRCTRL0 at
# ch+0x10080 (bit 31 = mr_wr trigger, hw auto-clears on completion);
# MRSTAT at ch+0x10090 (bit 0 = busy). Stubs return 0 so polls exit
# immediately. Vendor prod.bin NOPs the polls; rebuilt keeps them.
(0xF7000000, 0x00040000), (0xF8000000, 0x00040000),
(0xF9000000, 0x00040000), (0xFA000000, 0x00040000),
]
ABS_STUB = {0xFE0500E0:0, 0xFE050054:1, 0xFE0500E4:0, 0xFEB50014:0x60, 0xFEB5007C:2}
REGION_OFF = [
(0xFE0C0000, 0xFE100000, 0xFFF, 0xA24, 0x00000002),
(0xFE0C0000, 0xFE100000, 0xFFF, 0x684, 0x00000000),
(0xFE0C0000, 0xFE100000, 0xFFF, 0x090, 0x00000000),
(0xFE0C0000, 0xFE100000, 0xFFF, 0x080, 0x00000000),
(0xFE0C0000, 0xFE100000, 0xFFF, 0x514, 0x00000000),
# DDRPHY +0x3cc bit 0 = training-step done (fn_8b40 post-fn_27e0 poll).
(0xFE0C0000, 0xFE100000, 0xFFF, 0x3CC, 0x00000001),
# DDRPHY +0x0b4 bit 18 = phy-training done (fn_8b40 final-pass poll).
(0xFE0C0000, 0xFE100000, 0xFFF, 0x0B4, 0x00040000),
# DDR per-channel DFISTAT (ch+0x10c84): bit 0 = dfi_init_complete.
# fn_27e0 commits DDRCTL then polls tbz bit 0 → must return 1 to exit.
(0xF7000000, 0xFB000000, 0xFFFFFF, 0x10C84, 0x00000001),
# DDR per-channel MRSTAT (ch+0x10090): bit 0 = mr_wr_busy (want 0 to
# exit busy-poll), bit 16 = mr_rd_done (want 1 to exit mr_read done-poll).
(0xF7000000, 0xFB000000, 0xFFFFFF, 0x10090, 0x00010000),
# DDRCTL STAT (ch+0x10014): bits [2:0] operating_mode. fn_8b40 polls
# `(STAT & 7) == 1` after init — "normal" state.
(0xF7000000, 0xFB000000, 0xFFFFFF, 0x10014, 0x00000001),
]
REGION_CONST = [(0xFD8C0000, 0xFD8D0000, 0x00000001)]
XREG = [getattr(__import__("unicorn.arm64_const", fromlist=["X"]),
f"UC_ARM64_REG_X{i}") for i in range(31)]
_SWSTAT_TOGGLE_COUNT = {}
def reset_stub_state():
_SWSTAT_TOGGLE_COUNT.clear()
def stub_value(addr):
if addr in ABS_STUB: return ABS_STUB[addr]
for rbase, rend, mask, off_val, rv in REGION_OFF:
if rbase <= addr < rend and (addr & mask) == off_val: return rv
for rbase, rend, rv in REGION_CONST:
if rbase <= addr < rend: return rv
# SWSTAT-like toggle: per-channel ch+0x10514 alternates 0/1 per read.
# fn_29f4 has two back-to-back polls at this reg with OPPOSITE polarity
# (first waits CLEAR, second waits SET). Real HW reflects SWCTL writes;
# the toggle gives each poll one "correct" iteration to exit.
if 0xF7000000 <= addr < 0xFB000000 and (addr & 0xFFFFFF) == 0x10514:
n = _SWSTAT_TOGGLE_COUNT.get(addr, 0)
_SWSTAT_TOGGLE_COUNT[addr] = n + 1
return 1 if (n & 1) else 0
return 0
def run_and_log_writes(blob_path, max_insn, tripwire=None,
capture_stack_writes=False):
"""Run blob under Unicorn, return list of (write_idx, pc, addr, size, val).
If `tripwire` is a sim_tripwire.Capture, every MMIO read and write
is also appended to it for CSV emit. If `capture_stack_writes` is
True, writes to the emulator-scratch stack region (0x00400000..
0x00500000) are also appended — useful for bisecting divergences
in stack-allocated buffers like fn_de40's param_2[].
"""
reset_stub_state()
blob = open(blob_path, "rb").read()
uc = Uc(UC_ARCH_ARM64, UC_MODE_ARM)
uc.mem_map(SRAM_BASE, 0x100000, UC_PROT_ALL)
uc.mem_write(BLOB_BASE, blob)
uc.mem_map(STACK_BASE, 0x100000, UC_PROT_ALL)
uc.mem_map(RET_STUB, 0x1000, UC_PROT_ALL)
uc.mem_write(RET_STUB, b"\x00\x00\x20\xd4")
for b, s in MMIO: uc.mem_map(b, s, UC_PROT_ALL)
writes = []
state = {"count": 0, "last_pc": 0, "same_pc": 0}
def hook_code(uc, addr, size, ud):
state["count"] += 1
state["last_pc"] = addr
if addr == state.get("prev_pc"):
state["same_pc"] += 1
if state["same_pc"] > 10000: uc.emu_stop()
else:
state["same_pc"] = 0; state["prev_pc"] = addr
if state["count"] >= max_insn: uc.emu_stop()
def hook_mmio_read(uc, typ, addr, size, val, ud):
v = stub_value(addr) & ((1 << size*8) - 1)
uc.mem_write(addr, v.to_bytes(size, "little"))
if tripwire is not None:
pc = uc.reg_read(UC_ARM64_REG_PC)
tripwire.rd(pc, addr, size, v, state["count"])
def hook_mmio_write(uc, typ, addr, size, val, ud):
pc = uc.reg_read(UC_ARM64_REG_PC)
writes.append((len(writes), pc, addr, size, val))
if tripwire is not None:
tripwire.wr(pc, addr, size, val, state["count"])
def hook_unmapped(uc, typ, addr, size, val, ud):
page = addr & ~0xFFFF
try: uc.mem_map(page, 0x10000, UC_PROT_ALL)
except UcError: pass
if typ == UC_MEM_READ_UNMAPPED:
v = stub_value(addr) & ((1 << size*8) - 1)
uc.mem_write(addr, v.to_bytes(size, "little"))
if tripwire is not None:
pc = uc.reg_read(UC_ARM64_REG_PC)
tripwire.rd(pc, addr, size, v, state["count"])
elif typ == UC_MEM_WRITE_UNMAPPED:
pc = uc.reg_read(UC_ARM64_REG_PC)
writes.append((len(writes), pc, addr, size, val))
if tripwire is not None:
tripwire.wr(pc, addr, size, val, state["count"])
return True
uc.hook_add(UC_HOOK_CODE, hook_code)
for b, s in MMIO:
uc.hook_add(UC_HOOK_MEM_READ, hook_mmio_read, begin=b, end=b + s)
uc.hook_add(UC_HOOK_MEM_WRITE, hook_mmio_write, begin=b, end=b + s)
uc.hook_add(UC_HOOK_MEM_UNMAPPED, hook_unmapped)
if capture_stack_writes and tripwire is not None:
def hook_stack_write(uc, typ, addr, size, val, ud):
pc = uc.reg_read(UC_ARM64_REG_PC)
tripwire.wr(pc, addr, size, val, state["count"])
uc.hook_add(UC_HOOK_MEM_WRITE, hook_stack_write,
begin=STACK_BASE, end=STACK_BASE + 0x100000)
uc.reg_write(UC_ARM64_REG_SP, STACK_BASE + 0xF0000)
uc.reg_write(UC_ARM64_REG_X30, BLOB_BASE + 0x40)
pc = BLOB_BASE; remaining = max_insn
while remaining > 0:
try:
uc.emu_start(pc, RET_STUB, count=remaining); break
except UcError as e:
pc = uc.reg_read(UC_ARM64_REG_PC)
try: insn = int.from_bytes(uc.mem_read(pc, 4), "little")
except UcError: break
if (insn >> 20) == 0xD53:
rt = insn & 0x1F
if rt < 31: uc.reg_write(XREG[rt], 0)
pc += 4; uc.reg_write(UC_ARM64_REG_PC, pc); remaining -= 1; continue
if (insn >> 20) in (0xD51, 0xD50):
pc += 4; uc.reg_write(UC_ARM64_REG_PC, pc); remaining -= 1; continue
break
return writes
def main():
ap = argparse.ArgumentParser()
ap.add_argument("vendor"); ap.add_argument("rebuilt")
ap.add_argument("--max", type=int, default=500000)
ap.add_argument("--ignore-pc", action="store_true",
help="ignore PC when comparing (only addr+val)")
ap.add_argument("--show-regions", action="store_true",
help="print region histogram of vendor writes on success")
ap.add_argument("--tripwire-out-vendor", default=None, metavar="CSV",
help="write PC-resolved access trace of the vendor run to CSV")
ap.add_argument("--tripwire-out-rebuilt", default=None, metavar="CSV",
help="write PC-resolved access trace of the rebuilt run to CSV")
ap.add_argument("--capture-stack-writes", action="store_true",
help="also capture writes to the emulator stack "
"(0x00400000..0x00500000) in tripwire CSVs")
args = ap.parse_args()
print(f"# MMIO-write diff {args.vendor} vs {args.rebuilt}")
tw_v = _TripwireCapture() if (args.tripwire_out_vendor and _TripwireCapture) else None
tw_r = _TripwireCapture() if (args.tripwire_out_rebuilt and _TripwireCapture) else None
vw = run_and_log_writes(args.vendor, args.max, tripwire=tw_v,
capture_stack_writes=args.capture_stack_writes)
rw = run_and_log_writes(args.rebuilt, args.max, tripwire=tw_r,
capture_stack_writes=args.capture_stack_writes)
if tw_v is not None:
tw_v.emit_csv(args.tripwire_out_vendor)
print(f"# tripwire(vendor): {len(tw_v.records)} records -> "
f"{args.tripwire_out_vendor}")
if tw_r is not None:
tw_r.emit_csv(args.tripwire_out_rebuilt)
print(f"# tripwire(rebuilt): {len(tw_r.records)} records -> "
f"{args.tripwire_out_rebuilt}")
print(f"vendor writes: {len(vw)} rebuilt writes: {len(rw)}")
n = min(len(vw), len(rw))
for i in range(n):
_, vp, va, vs, vv = vw[i]
_, rp, ra, rs, rv = rw[i]
key_v = (va, vs, vv) if args.ignore_pc else (vp, va, vs, vv)
key_r = (ra, rs, rv) if args.ignore_pc else (rp, ra, rs, rv)
if key_v != key_r:
print(f"[write {i}] DIVERGE")
print(f" vendor: pc=0x{vp:x} [{_classify_region(va):10s}] "
f"addr=0x{va:x} sz={vs} val=0x{vv:x}")
print(f" rebuilt: pc=0x{rp:x} [{_classify_region(ra):10s}] "
f"addr=0x{ra:x} sz={rs} val=0x{rv:x}")
# show context: last 3 matching writes
for j in range(max(0, i-3), i):
_, p, a, s, v = vw[j]
print(f" match [{j}]: pc=0x{p:x} [{_classify_region(a):10s}] "
f"addr=0x{a:x} val=0x{v:x}")
return 1
if len(vw) != len(rw):
print(f"[diverge @ end] length mismatch: vendor={len(vw)} rebuilt={len(rw)}")
# Region histogram of the longer side's tail — tells you which
# subsystem our rebuild hasn't reached yet, or which one it
# reached that vendor doesn't.
longer = rw if len(rw) > len(vw) else vw
side = "rebuilt" if len(rw) > len(vw) else "vendor"
hist = {}
for _, _, a, _, _ in longer[n:]:
r = _classify_region(a)
hist[r] = hist.get(r, 0) + 1
if hist:
print(f" extra-{side} region histogram:")
for r, c in sorted(hist.items(), key=lambda x: -x[1]):
print(f" {r:12s} {c}")
return 1
print(f"[OK] all {n} MMIO writes match")
if args.show_regions:
hist = {}
for _, _, a, _, _ in vw:
r = _classify_region(a)
hist[r] = hist.get(r, 0) + 1
print("# region histogram (vendor write counts):")
for r, c in sorted(hist.items(), key=lambda x: -x[1]):
print(f" {r:12s} {c}")
return 0
if __name__ == "__main__":
sys.exit(main())
+197
View File
@@ -0,0 +1,197 @@
# RK3588 DDR TPL — Simulation & Verification Stack
A set of Unicorn-based tools for pre-silicon simulation, behavioral
diffing, and fault-injection of Rockchip RK3588 DDR TPL blobs (vendor
or rebuilt).
Built to hunt silicon-corruption bugs that `mmio_diff.py`'s
write-sequence comparison cannot see — NULL derefs, read-side
divergences, retry-path diffs.
## Synopsis
| Tool | One-line |
|---|---|
| `mmio_regions.py` | Address → region classifier (`DDRCTL`, `DDRPHY`, `OTP`, `SRAM`, …) |
| `sim_tripwire.py` | Bin-style per-access capture (PC, tick, addr, region, resolved fn name) |
| `tripwire_diff.py` | PC-bucketed `SequenceMatcher` diff of two tripwire CSVs |
| `training_sim.py` | DDR-training simulator with `pass` and `bitflip-first-pass` modes |
| `bitflip_sweep.py` | Flip each training-status address one at a time, report retry convergence |
The simulator **DOES NOT** need silicon. It runs vendor or rebuilt TPL
blobs under Unicorn with an MMIO stub that returns "pass" values for
all training-status polls, captures every access, and lets you diff
runs behaviorally.
## Quick start
Assuming your TPL blob is at `../rk3588_ddr_v1.19_prod.bin` (a copy of
the vendor blob shipped at SPI offset `0x8000` on boards with RKBIN
v1.19) and the rebuilt blob at `/tmp/rebuilt.bin`:
```bash
# Run once in "pass" mode and capture tripwire to CSV
python3 training_sim.py ../rk3588_ddr_v1.19_prod.bin \
--mode pass --tripwire-out /tmp/tw-pass.csv
# Run again with the first read of every training status flipped
python3 training_sim.py ../rk3588_ddr_v1.19_prod.bin \
--mode bitflip --flip-count 1 --flip-mask 0xFFFFFFFF \
--tripwire-out /tmp/tw-flip.csv
# Diff the two runs by function bucket
python3 tripwire_diff.py /tmp/tw-pass.csv /tmp/tw-flip.csv
# Sweep every training-status address one-at-a-time and tabulate
# whether the retry loop reconverges cleanly
python3 bitflip_sweep.py ../rk3588_ddr_v1.19_prod.bin
```
For vendor-vs-rebuilt verification (needs `../mmio_diff.py` in the
parent dir):
```bash
python3 ../mmio_diff.py --ignore-pc \
../rk3588_ddr_v1.19_prod.bin /tmp/rebuilt.bin \
--tripwire-out-vendor /tmp/tw-v.csv \
--tripwire-out-rebuilt /tmp/tw-r.csv \
--show-regions
python3 tripwire_diff.py /tmp/tw-v.csv /tmp/tw-r.csv
```
## Architecture
### `mmio_regions.py` — address classifier
Pure lookup table. `classify(addr)` returns a short tag for each
RK3588 peripheral window. Used by every other tool so trace output is
scannable without memorising the memory map.
Region tags: `DDRCTL`, `DDRCTL:SW` (STAT/PWRCTL/SWCTL/SWSTAT),
`DDRCTL:MR` (mode-register ops), `DDRPHY`, `DDRPHY:TR` (training
status offsets `0x080/090/0B4/3CC/514/684/A24`), `DDR_CRU`, `DDR_MEM`,
`SRAM`, `PMU_SRAM`, `GRF`, `BUS_GRF`, `SGRF`, `CRU`, `SCRU`, `PMU`,
`FW_DDR`, `OTP`, `UART`, `STACK`, `OTHER`.
### `sim_tripwire.py` — per-access capture
`Capture` class with `rd(pc, addr, size, val, tick)` and `wr(...)`
that record one row per access:
(seq_idx, insn_tick, pc, addr, size, rw, val, region, fn_name)
`fn_name` comes from `PCResolver`, which bisects the vendor funs
table parsed from `../ddr_conservative_asm.s` (115 `FUN_xxxx @ offset`
headers; Ghidra export). Set `RK_DDR_ASM` env var to override the
default asm path.
`emit_csv(path)` writes out; `load_csv(path)` re-hydrates. Both
`training_sim.py` and `mmio_diff.py` (in parent dir) accept a
tripwire capture object and record into it.
### `tripwire_diff.py` — PC-bucketed diff
For each unique `fn_name` in either capture, collect records, key
them by `(region, addr, rw, val, size)`, diff via `difflib.
SequenceMatcher`. `quick_ratio()` short-circuits buckets that share
almost nothing.
Outputs three tiers:
- **OK**: byte-identical key sequences (suppressed unless
`--show-identical`).
- **minor-diff**: ratio ≥ `--suspect-threshold` (default 0.9).
- **SUSPECT**: ratio below threshold, printed first with the raw
edit script.
Why PC-bucket and not index-by-index? Under bitflip mode the control
flow diverges at the flip point, which destroys index alignment.
Grouping by function localises divergences so one buggy bucket
doesn't cascade noise into unrelated ones.
### `training_sim.py` — DDR training simulator
Two modes:
- `--mode pass` — every training-status read returns its "done/OK/
trained" stub value every time. Equivalent to `mmio_diff`'s base
harness.
- `--mode bitflip --flip-count N --flip-mask MASK` — the first `N`
reads of each training-status address return `stub_value ^ mask`
(default mask `0xFFFFFFFF` → "not done"). Subsequent reads revert.
Exercises the retry / error-recovery paths.
Training-status addresses are defined inside `is_training_status()`;
override single-address via the `BITFLIP_ONLY=0xADDR` env var
(used by `bitflip_sweep.py`).
Region-tagged access histogram + UART TX dump on every run.
### `bitflip_sweep.py` — per-address retry convergence
Flips each training-status register one-at-a-time and summarises:
- how many records diverged from the pass-mode baseline
- whether any MMIO write value changed (= retry path took a
different branch)
- which function(s) wrote the divergent values
Output is a single table row per address. A clean "write_divergence"
column means retry paths converge deterministically. A non-zero
count names the function whose retry wrote a different register
value — which is often vendor-intended retry behavior, sometimes
a port bug.
Currently sweeps 23 addresses (7 DDRPHY training + 4 DDRCTL status
× 4 channels).
## Record shape + diff bucketing (for tool authors)
Per-access record fields:
seq monotonic index within the capture
tick Unicorn instruction count at the access
pc access-site PC (absolute)
addr MMIO/stack/SRAM address
size 1/2/4/8
rw 'rd' or 'wr'
val value read or written (hex)
region mmio_regions.classify(addr) tag
fn PCResolver result: FUN_xxxxxxxx from the funs table
Diff key inside each fn bucket: `(region, addr, rw, val, size)`.
Explicitly excludes `pc` (codegen reg-alloc shifts individual load/
store PCs within a function without changing behavior), `seq`, and
`tick` (these drift with any upstream path difference).
## Known limitations
- The Unicorn simulator exits early on sustained same-PC loops
(>10 000 iterations) to avoid deadlocks. Real silicon polling that
would eventually succeed is modelled via the stub returning the
success value; if your use case needs a different success-delay
profile, edit `stub_value` / `is_training_status`.
- `sim_tripwire.PCResolver` attributes every PC to the *largest
FUN_-entry address ≤ PC*. Unported code paths still resolve to a
reasonable fn_name. Ports not in the `// ============ FUN_xxxx @`
convention won't match.
- `mmio_diff.py`'s `--capture-stack-writes` flag catches writes to
Unicorn's scratch stack `0x00400000..0x00500000` — but the vendor
firmware sometimes uses SRAM-resident scratch buffers (e.g. the
`tp` timing buffer at `0xff0164f8`) instead of the call-stack. For
those, add a dedicated hook in the probe (see `../debug_probes/
tp_slot_writes.py` for an example).
## Dependencies
- Python 3.8+
- `unicorn-engine` (AArch64 CPU emulator)
- `difflib` (stdlib)
```bash
pip install unicorn
```
## License
GPL-2.0-or-later, matching the port candidates' SPDX headers.
+188
View File
@@ -0,0 +1,188 @@
#!/usr/bin/env python3
"""bitflip_sweep.py — flip each training-status address one-at-a-time
and summarise how the rebuild's retry logic responds.
For every training-status address (DDRPHY training + DDRCTL per-ch
status), run training_sim twice:
(a) --mode pass baseline
(b) --mode bitflip with `is_training_status` restricted to just that
one address
Compare the two tripwire CSVs per run. Report:
- how many records diverged
- whether mmio writes still converge to the same final sequence
- one-line summary: "retry fired? final state same? # write-value
divergences?"
Output a table row per address so you can scan for any address whose
retry loop doesn't converge.
"""
import argparse
import csv
import os
import subprocess
import sys
import tempfile
BENCH = os.path.dirname(os.path.abspath(__file__))
TRAINING_TARGETS = [
("DDRPHY:TR", 0xFE0C0000, 0x080, "MicroReset"),
("DDRPHY:TR", 0xFE0C0000, 0x090, "MicroContMux"),
("DDRPHY:TR", 0xFE0C0000, 0x0B4, "TrainingDone(b18)"),
("DDRPHY:TR", 0xFE0C0000, 0x3CC, "TrainingStep(b0)"),
("DDRPHY:TR", 0xFE0C0000, 0x514, "TrainingDone"),
("DDRPHY:TR", 0xFE0C0000, 0x684, "CalBusy"),
("DDRPHY:TR", 0xFE0C0000, 0xA24, "DfiStatus"),
]
# Per-channel DDRCTL status addresses: expand for all 4 channels.
DDRCTL_CHANNEL_BASES = (0xF7000000, 0xF8000000, 0xF9000000, 0xFA000000)
DDRCTL_STATUS_OFFSETS = [
("DDRCTL:SW", 0x10014, "STAT"),
("DDRCTL:MR", 0x10090, "MRSTAT"),
("DDRCTL:SW", 0x10C84, "DFISTAT"),
("DDRCTL:SW", 0x10514, "SWSTAT"),
]
for ch_i, base in enumerate(DDRCTL_CHANNEL_BASES):
for region, off, name in DDRCTL_STATUS_OFFSETS:
TRAINING_TARGETS.append((region, base, off, f"{name} ch{ch_i}"))
def run_sim(blob_path, flip_offset, flip_mask, out_csv, max_insn=500_000):
"""Run training_sim with a single-address bitflip. Uses env var
BITFLIP_ONLY to narrow the is_training_status predicate in the
simulator. If offset is None, runs plain pass-mode."""
env = os.environ.copy()
if flip_offset is not None:
env["BITFLIP_ONLY"] = f"{flip_offset:#x}"
mode_args = ["--mode", "bitflip", "--flip-count", "1",
"--flip-mask", f"{flip_mask:#x}"]
else:
env.pop("BITFLIP_ONLY", None)
mode_args = ["--mode", "pass"]
cmd = ["python3", os.path.join(BENCH, "training_sim.py"),
blob_path, *mode_args, "--max-insn", str(max_insn),
"--tripwire-out", out_csv]
r = subprocess.run(cmd, capture_output=True, text=True, env=env)
return r.returncode == 0
def load_csv(path):
out = []
with open(path, newline="") as f:
r = csv.DictReader(f)
for row in r:
row["seq"] = int(row["seq"])
row["tick"] = int(row["tick"])
row["pc"] = int(row["pc"], 16)
row["addr"] = int(row["addr"], 16)
row["val"] = int(row["val"], 16)
out.append(row)
return out
def summarise(pass_csv, flip_csv, addr):
"""Diff by (addr, rw, val, size) key inside per-fn buckets, not by
index — if retry causes a shift, index-by-index gets noisy.
"""
from collections import defaultdict
p = load_csv(pass_csv)
f = load_csv(flip_csv)
def bucket(records):
b = defaultdict(list)
for r in records:
b[r["fn"]].append((r["addr"], r["rw"], r["val"], r["size"], r))
return b
pb = bucket(p)
fb = bucket(f)
all_fns = set(pb) | set(fb)
wr_div_rows = [] # (fn, pass_row_or_None, flip_row_or_None)
rd_div_count = 0
for fn in all_fns:
pkeys = [(a, rw, v, s) for (a, rw, v, s, _) in pb.get(fn, [])]
fkeys = [(a, rw, v, s) for (a, rw, v, s, _) in fb.get(fn, [])]
if pkeys == fkeys:
continue
# SequenceMatcher alignment per-bucket
import difflib
sm = difflib.SequenceMatcher(a=pkeys, b=fkeys, autojunk=False)
for tag, i1, i2, j1, j2 in sm.get_opcodes():
if tag == "equal":
continue
# Characterise this edit as "read delta" vs "write delta"
p_rows = pb.get(fn, [])[i1:i2]
f_rows = fb.get(fn, [])[j1:j2]
for (_, _, _, _, row) in p_rows:
if row["rw"] == "wr":
wr_div_rows.append((fn, row, None))
else:
rd_div_count += 1
for (_, _, _, _, row) in f_rows:
if row["rw"] == "wr":
wr_div_rows.append((fn, None, row))
else:
rd_div_count += 1
return {
"total_records_pass": len(p),
"total_records_flip": len(f),
"read_divergences": rd_div_count,
"write_divergence_rows": wr_div_rows,
}
def main():
ap = argparse.ArgumentParser()
ap.add_argument("blob", help="path to the DDR TPL blob to drive")
ap.add_argument("--out-dir", default="/tmp/bitflip-sweep")
args = ap.parse_args()
os.makedirs(args.out_dir, exist_ok=True)
# Baseline pass-mode run
baseline = os.path.join(args.out_dir, "pass.csv")
print(f"# baseline pass run -> {baseline}")
ok = run_sim(args.blob, None, 0, baseline)
if not ok:
print("baseline run failed", file=sys.stderr); return 1
header = (f"{'address':<12} {'region':<11} {'name':<18} "
f"{'rd_div':>6} writes_diverged_in")
print()
print(header)
print("-" * len(header))
all_wr_details = []
for region, base, off, name in TRAINING_TARGETS:
addr = base + off
tag = f"0x{addr:08x}"
flip_csv = os.path.join(args.out_dir, f"flip_{addr:08x}.csv")
ok = run_sim(args.blob, addr, 0xFFFFFFFF, flip_csv)
if not ok:
print(f"{tag} {region} {name} -- sim failed")
continue
s = summarise(baseline, flip_csv, addr)
wr_fns = sorted({row[0] for row in s["write_divergence_rows"]})
preview = ",".join(wr_fns[:4])
if len(wr_fns) > 4:
preview += f" +{len(wr_fns)-4}"
print(f"{tag:<12} {region:<11} {name:<18} "
f"{s['read_divergences']:>6} {preview}")
for fn, pr, fr in s["write_divergence_rows"]:
all_wr_details.append((addr, name, fn, pr, fr))
if all_wr_details:
print("\n## Write-divergence details (retry path changed register values)")
for addr, name, fn, pr, fr in all_wr_details[:60]:
pv = f"pass: addr=0x{pr['addr']:x} val=0x{pr['val']:x}" if pr else "pass: (missing)"
fv = f"flip: addr=0x{fr['addr']:x} val=0x{fr['val']:x}" if fr else "flip: (missing)"
print(f" [{name:<14}] {fn:<22} {pv} | {fv}")
if len(all_wr_details) > 60:
print(f" ... +{len(all_wr_details)-60} more")
return 0
if __name__ == "__main__":
sys.exit(main())
+121
View File
@@ -0,0 +1,121 @@
#!/usr/bin/env python3
"""mmio_regions.py — address → region classifier for RK3588 DDR TPL.
Used by mmio_diff.py, blob_emu.py, call_trace.py, training_sim.py to
stamp each access with a short human-readable tag so trace output is
scannable without memorising hex ranges.
Categories (tag, short description):
DDRCTL uMCTL2 controller (per-channel ch+0x10000 window, or
global 0xFE010000)
DDRCTL:SW STAT/SWCTL/SWSTAT/PWRCTL subregion — frequently polled
DDRCTL:MR mode-register ops (MRCTRL0/MRSTAT subregion)
DDRPHY DDR PHY at 0xFE0C0000 (32 KB)
DDRPHY:TR training status subregion (+0x080/090/0B4/3CC/514/684/A24)
DDR_MEM actual DRAM content — 0x00000000..0x80000000 once trained
SRAM boot SRAM 0xFF000000..0xFF100000 (blob + globals)
CRU clock and reset 0xFD7C0000
DDR_CRU DDR PHY clock/reset (ch0..3) 0xFD800000..0xFD8C0000
SCRU secure clock/reset 0xFD8C0000
PMU_SRAM PMU SRAM 0xFF100000..0xFF110000
GRF general register file 0xFD580000
BUS_GRF bus-side GRF 0xFD5F0000
SGRF secure GRF 0xFE050000
PMU power-mgmt unit 0xFE400000
FW_DDR DDR firewall 0xFE030000
OTP OTP_NS — one-time-programmable controller 0xFECC0000
UART debug UART 0xFEB50000
OTHER unmapped / unclassified
"""
# Per-channel DDRCTL window repeats at these bases. Offset within
# a channel identifies the real sub-register.
DDRCTL_CHANNELS = (0xF7000000, 0xF8000000, 0xF9000000, 0xFA000000)
DDRCTL_GLOBAL = 0xFE010000
DDRCTL_WIN = 0x20000
DDRCTL_SUB = 0x10000 # ch+0x10000 lands inside ctrl space
# Training-status registers (the ones mmio_diff's REGION_OFF stubs).
DDRPHY_TRAINING_OFFSETS = {0x080, 0x090, 0x0B4, 0x3CC, 0x514, 0x684, 0xA24}
# DDRCTL sub-categories — offsets within the 0x10000 sub-window.
DDRCTL_SW_OFFSETS = {0x10014, 0x10180, 0x10C80, 0x10C84}
DDRCTL_MR_OFFSETS = {0x10080, 0x10090}
def classify(addr: int) -> str:
"""Return short region tag for an absolute address."""
# Emulator-only scratch stack (0x00400000..0x00500000) — not a real
# silicon region but tagged distinctly so tripwire can diff stack
# writes (e.g. param_2[] buffers fn_de40 fills).
if 0x00400000 <= addr < 0x00500000:
return "STACK"
# DDR memory (post-training)
if addr < 0x80000000:
return "DDR_MEM"
# Per-channel DDRCTL windows
for base in DDRCTL_CHANNELS:
if base <= addr < base + 0x40000:
off = addr - base
if off in DDRCTL_SW_OFFSETS:
return "DDRCTL:SW"
if off in DDRCTL_MR_OFFSETS:
return "DDRCTL:MR"
return "DDRCTL"
# Global DDRCTL (less common in TPL)
if DDRCTL_GLOBAL <= addr < DDRCTL_GLOBAL + DDRCTL_WIN:
return "DDRCTL"
# DDRPHY 0xFE0C0000..0xFE100000 (256 KB per 4 ports; training at base)
if 0xFE0C0000 <= addr < 0xFE100000:
off = addr & 0xFFF
if off in DDRPHY_TRAINING_OFFSETS:
return "DDRPHY:TR"
return "DDRPHY"
# Clock/reset
if 0xFD7C0000 <= addr < 0xFD800000: return "CRU"
if 0xFD800000 <= addr < 0xFD8C0000: return "DDR_CRU"
if 0xFD8C0000 <= addr < 0xFD8D0000: return "SCRU"
# Register files
if 0xFD580000 <= addr < 0xFD5A0000: return "GRF"
if 0xFD5F0000 <= addr < 0xFD600000: return "BUS_GRF"
if 0xFE050000 <= addr < 0xFE060000: return "SGRF"
# PMU / firewall / scrambler
if 0xFE400000 <= addr < 0xFE410000: return "PMU"
if 0xFE030000 <= addr < 0xFE040000: return "FW_DDR"
if 0xFECC0000 <= addr < 0xFECD0000: return "OTP"
# Debug UART
if 0xFEB50000 <= addr < 0xFEB60000: return "UART"
# Boot SRAM (blob + globals) and PMU SRAM
if 0xFF000000 <= addr < 0xFF100000: return "SRAM"
if 0xFF100000 <= addr < 0xFF110000: return "PMU_SRAM"
return "OTHER"
def classify_rw(addr: int, is_write: bool) -> str:
"""Direction-aware tag: 'DDRCTL:SW wr' vs 'DDRCTL:SW rd'."""
return f"{classify(addr):10s} {'wr' if is_write else 'rd'}"
if __name__ == "__main__":
import sys
# Smoke-test: classify a few known addresses
tests = [
(0xFE0C0A24, "DDRPHY:TR"), # DfiStatus
(0xFE0C0000, "DDRPHY"), # generic PHY reg
(0xF7010C80, "DDRCTL:SW"), # SWCTL ch0
(0xF7010080, "DDRCTL:MR"), # MRCTRL0 ch0
(0xF7010500, "DDRCTL"), # other DDRCTL
(0xFD7C0000, "CRU"),
(0xFD800000, "DDR_CRU"),
(0xFF001000, "SRAM"),
(0xFF100000, "PMU_SRAM"),
(0xFEB50000, "UART"),
(0x00100000, "DDR_MEM"),
]
fails = 0
for addr, want in tests:
got = classify(addr)
ok = "OK" if got == want else "FAIL"
if got != want: fails += 1
print(f" {ok} 0x{addr:08x} -> {got:12s} (want {want})")
sys.exit(1 if fails else 0)
+165
View File
@@ -0,0 +1,165 @@
#!/usr/bin/env python3
"""sim_tripwire.py — Bin-style MMIO tracer adapted for Unicorn-sim.
Records every MMIO access (read + write) with enough context to diff
two simulator runs at sequence level. Modeled on the Bin project's
RAM-ring tripwire primitive, minus the DDR reservation — we're not on
silicon, we're in Python, so we just append to a list.
Record shape (per Janet, 2026-04-21):
(seq_idx, insn_tick, pc, addr, size, rw, val, region_tag, fn_name)
Usage from a simulator harness:
import sim_tripwire
cap = sim_tripwire.Capture(asm_path) # loads funs table
# inside UC_HOOK_MEM_READ:
cap.rd(pc, addr, size, val, insn_tick)
# inside UC_HOOK_MEM_WRITE:
cap.wr(pc, addr, size, val, insn_tick)
cap.emit_csv("/tmp/vendor-trace.csv")
The companion tool `tripwire_diff.py` reads two CSVs and does a
PC-bucketed diff (group by fn_name, diff per bucket with difflib).
"""
import bisect
import csv
import os
import re
try:
from mmio_regions import classify as _classify
except ImportError:
def _classify(addr): return "?"
# Default location of the vendor disassembly that carries the funs
# table. Defaults to ../ddr_conservative_asm.s relative to this file
# (repo layout); override via env var or constructor arg.
DEFAULT_ASM = os.environ.get(
"RK_DDR_ASM",
os.path.join(os.path.dirname(os.path.abspath(__file__)),
"..", "ddr_conservative_asm.s"))
BLOB_BASE = 0xFF001000 # where the TPL blob lives in SRAM
def parse_fun_table(asm_path):
"""Parse `// ============ FUN_<hex> @ <offset> ============` headers.
Returns list of (abs_addr, fun_name) sorted by abs_addr so we can
do O(log N) PC → nearest-below lookups.
"""
pat = re.compile(r'// ============ (FUN_[0-9a-fA-F]+) @ ([0-9a-fA-F]+) ============')
out = []
with open(asm_path) as f:
for ln in f:
m = pat.match(ln)
if not m:
continue
name = m.group(1)
off = int(m.group(2), 16)
out.append((BLOB_BASE + off, name))
out.sort()
return out
class PCResolver:
"""PC → nearest containing FUN_ name.
Uses the vendor funs table (parse_fun_table) as ground truth for
function entries. A PC resolves to the FUN_ whose entry is the
largest ≤ PC. Accuracy depends on the asm covering all functions
— missing entries produce attribution to the previous function.
"""
def __init__(self, asm_path=DEFAULT_ASM):
self.table = parse_fun_table(asm_path) if os.path.exists(asm_path) else []
self._keys = [addr for addr, _ in self.table]
self._names = [name for _, name in self.table]
def resolve(self, pc):
if not self.table:
return "?"
# Find rightmost entry with addr <= pc
i = bisect.bisect_right(self._keys, pc) - 1
if i < 0:
return "<pre-blob>"
return self._names[i]
class Capture:
"""Append-only tripwire capture.
Records are (seq_idx, insn_tick, pc, addr, size, rw, val, region, fn).
Keep this lean — writing this list is in the hot Unicorn callback
path.
"""
def __init__(self, asm_path=DEFAULT_ASM, resolve=True):
self.records = []
self._pcr = PCResolver(asm_path) if resolve else None
def _append(self, pc, addr, size, rw, val, insn_tick):
seq = len(self.records)
region = _classify(addr)
fn = self._pcr.resolve(pc) if self._pcr else "?"
self.records.append(
(seq, insn_tick, pc, addr, size, rw, val, region, fn))
def rd(self, pc, addr, size, val, insn_tick):
self._append(pc, addr, size, "rd", val, insn_tick)
def wr(self, pc, addr, size, val, insn_tick):
self._append(pc, addr, size, "wr", val, insn_tick)
def emit_csv(self, path):
with open(path, "w", newline="") as f:
w = csv.writer(f)
w.writerow(("seq", "tick", "pc", "addr", "size",
"rw", "val", "region", "fn"))
for seq, tick, pc, addr, size, rw, val, region, fn in self.records:
w.writerow((seq, tick, f"0x{pc:x}", f"0x{addr:x}",
size, rw, f"0x{val:x}", region, fn))
def summary(self):
"""Return (n_total, n_rd, n_wr, per_fn_counter, per_region_counter)."""
from collections import Counter
fn = Counter()
region = Counter()
n_rd = n_wr = 0
for _, _, _, _, _, rw, _, reg, fname in self.records:
(n_rd if rw == "rd" else n_wr) # no-op; tracked below
if rw == "rd": n_rd += 1
else: n_wr += 1
fn[fname] += 1
region[reg] += 1
return len(self.records), n_rd, n_wr, fn, region
def load_csv(path):
"""Read a CSV emitted by emit_csv. Returns list of dict records."""
out = []
with open(path, newline="") as f:
r = csv.DictReader(f)
for row in r:
row["seq"] = int(row["seq"])
row["tick"] = int(row["tick"])
row["pc"] = int(row["pc"], 16)
row["addr"] = int(row["addr"], 16)
row["size"] = int(row["size"])
row["val"] = int(row["val"], 16)
out.append(row)
return out
if __name__ == "__main__":
# Smoke: parse funs table and show first 5 entries
t = parse_fun_table(DEFAULT_ASM)
print(f"loaded {len(t)} fn entries from {DEFAULT_ASM}")
for addr, name in t[:5]:
print(f" 0x{addr:08x} {name}")
pcr = PCResolver()
# BLOB_BASE + offset of known functions
for off in (0x4, 0x40, 0x3c48, 0xfcc4, 0xde40, 0xf170):
pc = BLOB_BASE + off
print(f" resolve 0x{pc:x} (BLOB+0x{off:x}) -> {pcr.resolve(pc)}")
+348
View File
@@ -0,0 +1,348 @@
#!/usr/bin/env python3
"""training_sim.py — DDR training simulator for the RK3588 TPL blob.
Simulates a DRAM machine that answers the PHY's training handshakes
deterministically, without needing silicon. Two modes:
--mode pass Every status/poll returns "done/OK/trained".
First-iteration behavior, fastest path through
training. This is the existing mmio_diff default.
--mode bitflip For N iterations the status register returns a
bit-flipped (wrong) value, forcing the code
through its retry / error-recovery path. After
N bad reads, the value snaps back to the "pass"
word. Default N = 1: classic "first-pass fails,
retry succeeds" PHY behavior.
Human-readable trace: every MMIO access is tagged with its region
(DDRCTL:SW, DDRPHY:TR, SRAM, UART, ...) so you can scan the log
without memorising the address map.
Usage:
training_sim.py <blob.bin> [--mode pass|bitflip] [--flip-count N]
[--max-insn N] [--verbose] [--limit-trace N]
"""
import argparse, sys, os
from unicorn import *
from unicorn.arm64_const import *
# Local modules
from mmio_regions import classify
from sim_tripwire import Capture as _TripwireCapture
SRAM_BASE = 0xFF000000
SRAM_SIZE = 0x00100000
BLOB_BASE = 0xFF001000
STACK_BASE = 0x00400000
STACK_SIZE = 0x00100000
RET_STUB = 0x00800000
RET_SIZE = 0x00001000
MMIO = [
(0xFD580000, 0x00020000), (0xFD5F0000, 0x00010000),
(0xFD7C0000, 0x00040000), (0xFD800000, 0x00010000),
(0xFD8C0000, 0x00010000),
(0xFE010000, 0x00020000), (0xFE030000, 0x00010000),
(0xFE050000, 0x00010000), (0xFE0C0000, 0x00040000),
(0xFE400000, 0x00010000), (0xFECC0000, 0x00010000),
(0xFEB50000, 0x00010000), (0xFF100000, 0x00010000),
(0xF7000000, 0x00040000), (0xF8000000, 0x00040000),
(0xF9000000, 0x00040000), (0xFA000000, 0x00040000),
]
# Per-address pass values — copied from mmio_diff.ABS_STUB.
ABS_PASS = {
0xFE0500E0: 0x00000000,
0xFE050054: 0x00000001,
0xFE0500E4: 0x00000000,
0xFEB50014: 0x00000060,
0xFEB5007C: 0x00000002,
}
# DDRPHY training-status stubs. Tuple: (base, end, mask, offset, pass_value).
# Copied from mmio_diff.REGION_OFF.
REGION_OFF = [
(0xFE0C0000, 0xFE100000, 0xFFF, 0xA24, 0x00000002),
(0xFE0C0000, 0xFE100000, 0xFFF, 0x684, 0x00000000),
(0xFE0C0000, 0xFE100000, 0xFFF, 0x090, 0x00000000),
(0xFE0C0000, 0xFE100000, 0xFFF, 0x080, 0x00000000),
(0xFE0C0000, 0xFE100000, 0xFFF, 0x514, 0x00000000),
(0xFE0C0000, 0xFE100000, 0xFFF, 0x3CC, 0x00000001),
(0xFE0C0000, 0xFE100000, 0xFFF, 0x0B4, 0x00040000),
(0xF7000000, 0xFB000000, 0xFFFFFF, 0x10C84, 0x00000001),
(0xF7000000, 0xFB000000, 0xFFFFFF, 0x10090, 0x00010000),
(0xF7000000, 0xFB000000, 0xFFFFFF, 0x10014, 0x00000001),
]
REGION_CONST = [(0xFD8C0000, 0xFD8D0000, 0x00000001)]
# Addresses that are *training status* — these are the ones bitflip mode
# perturbs. Anything else keeps the pass value even in bitflip mode so
# the test is focused on training retry paths, not boot-infrastructure
# noise.
def is_training_status(addr):
# Env-var override for bitflip_sweep.py: when set to an address,
# only that exact address is considered "training status" and thus
# bitflippable. Lets us flip one register at a time.
only = os.environ.get("BITFLIP_ONLY")
if only:
return addr == int(only, 0)
if 0xFE0C0000 <= addr < 0xFE100000:
off = addr & 0xFFF
return off in (0x080, 0x090, 0x0B4, 0x3CC, 0x514, 0x684, 0xA24)
if 0xF7000000 <= addr < 0xFB000000:
off = addr & 0xFFFFFF
return off in (0x10014, 0x10090, 0x10C84, 0x10514)
return False
def pass_value(addr):
"""Return the 'all-good' stub value for a status address."""
if addr in ABS_PASS: return ABS_PASS[addr]
for rbase, rend, mask, off_val, rv in REGION_OFF:
if rbase <= addr < rend and (addr & mask) == off_val:
return rv
for rbase, rend, rv in REGION_CONST:
if rbase <= addr < rend:
return rv
# SWSTAT-like toggle: ch+0x10514 alternates per read (preserves
# fn_29f4 two-poll-opposite-polarity expectation).
return None # caller applies its own fallback
class TrainingSim:
def __init__(self, mode, flip_count, flip_mask, limit_trace, verbose):
self.mode = mode # "pass" or "bitflip"
self.flip_count = flip_count # how many first reads return flipped
self.flip_mask = flip_mask # XOR mask for bitflip
self.limit_trace = limit_trace
self.verbose = verbose
# Per-address read counters (used both for bitflip semantics
# and for the SWSTAT toggle that the existing harness needs).
self._reads = {}
self._swstat_toggle = {}
# Traces
self.access_log = [] # (kind, pc, addr, size, val, region)
self.training_log = [] # (n, pc, addr, stub_val, flipped?)
def read_value(self, addr, size):
count = self._reads.get(addr, 0)
self._reads[addr] = count + 1
# Fast path — pass value if defined
pv = pass_value(addr)
if pv is None:
# SWSTAT-like toggle at ch+0x10514
if 0xF7000000 <= addr < 0xFB000000 and (addr & 0xFFFFFF) == 0x10514:
n = self._swstat_toggle.get(addr, 0)
self._swstat_toggle[addr] = n + 1
pv = 1 if (n & 1) else 0
else:
pv = 0
# Apply bitflip to training-status only, first N reads per addr
flipped = False
if self.mode == "bitflip" and is_training_status(addr):
if count < self.flip_count:
pv ^= self.flip_mask
flipped = True
self.training_log.append((count, addr, pv, True))
else:
self.training_log.append((count, addr, pv, False))
return pv & ((1 << (size * 8)) - 1)
def log(self, kind, pc, addr, size, val):
if len(self.access_log) < self.limit_trace:
self.access_log.append((kind, pc, addr, size, val, classify(addr)))
def run(blob_path, sim, max_insn, tripwire=None):
blob = open(blob_path, "rb").read()
uc = Uc(UC_ARCH_ARM64, UC_MODE_ARM)
uc.mem_map(SRAM_BASE, SRAM_SIZE, UC_PROT_ALL)
uc.mem_write(BLOB_BASE, blob)
uc.mem_map(STACK_BASE, STACK_SIZE, UC_PROT_ALL)
uc.mem_map(RET_STUB, RET_SIZE, UC_PROT_ALL)
uc.mem_write(RET_STUB, b"\x00\x00\x20\xd4") # brk #0
for base, sz in MMIO:
uc.mem_map(base, sz, UC_PROT_ALL)
state = {"count": 0, "last_pc": 0, "same_pc": 0, "max_pc": 0, "writes": 0}
def hook_code(uc, addr, size, ud):
state["count"] += 1
if addr == state["last_pc"]:
state["same_pc"] += 1
if state["same_pc"] > 10000:
uc.emu_stop()
else:
state["same_pc"] = 0
state["last_pc"] = addr
if addr > state["max_pc"]:
state["max_pc"] = addr
if state["count"] >= max_insn:
uc.emu_stop()
def hook_read(uc, typ, addr, size, val, ud):
v = sim.read_value(addr, size)
uc.mem_write(addr, v.to_bytes(size, "little"))
pc = uc.reg_read(UC_ARM64_REG_PC)
sim.log("rd", pc, addr, size, v)
if tripwire is not None:
tripwire.rd(pc, addr, size, v, state["count"])
uart_buf = bytearray()
def hook_write(uc, typ, addr, size, val, ud):
pc = uc.reg_read(UC_ARM64_REG_PC)
state["writes"] += 1
sim.log("wr", pc, addr, size, val)
if tripwire is not None:
tripwire.wr(pc, addr, size, val, state["count"])
if addr == 0xFEB50000:
c = val & 0xFF
uart_buf.append(c)
def hook_unmapped(uc, typ, addr, size, val, ud):
page = addr & ~0xFFFF
try:
uc.mem_map(page, 0x10000, UC_PROT_ALL)
except UcError:
pass
if typ == UC_MEM_READ_UNMAPPED:
v = sim.read_value(addr, size)
uc.mem_write(addr, v.to_bytes(size, "little"))
pc = uc.reg_read(UC_ARM64_REG_PC)
sim.log("rd", pc, addr, size, v)
if tripwire is not None:
tripwire.rd(pc, addr, size, v, state["count"])
elif typ == UC_MEM_WRITE_UNMAPPED:
pc = uc.reg_read(UC_ARM64_REG_PC)
state["writes"] += 1
sim.log("wr", pc, addr, size, val)
if tripwire is not None:
tripwire.wr(pc, addr, size, val, state["count"])
return True
uc.hook_add(UC_HOOK_CODE, hook_code)
for base, sz in MMIO:
uc.hook_add(UC_HOOK_MEM_READ, hook_read, begin=base, end=base + sz)
uc.hook_add(UC_HOOK_MEM_WRITE, hook_write, begin=base, end=base + sz)
uc.hook_add(UC_HOOK_MEM_UNMAPPED, hook_unmapped)
uc.reg_write(UC_ARM64_REG_SP, STACK_BASE + STACK_SIZE - 16)
uc.reg_write(UC_ARM64_REG_X30, BLOB_BASE + 0x40)
XREG = [getattr(__import__("unicorn.arm64_const", fromlist=["X"]),
f"UC_ARM64_REG_X{i}") for i in range(31)]
pc = BLOB_BASE
remaining = max_insn
while remaining > 0:
try:
uc.emu_start(pc, RET_STUB, count=remaining)
break
except UcError as e:
pc = uc.reg_read(UC_ARM64_REG_PC)
try:
insn = int.from_bytes(uc.mem_read(pc, 4), "little")
except UcError:
break
if (insn >> 20) == 0xD53:
rt = insn & 0x1F
if rt < 31:
uc.reg_write(XREG[rt], 0)
pc += 4
uc.reg_write(UC_ARM64_REG_PC, pc)
remaining -= 1
continue
if (insn >> 20) in (0xD51, 0xD50):
pc += 4
uc.reg_write(UC_ARM64_REG_PC, pc)
remaining -= 1
continue
break
return state, uart_buf
def print_summary(sim, state, uart_buf, region_hist):
print(f"# training_sim mode={sim.mode} flip_count={sim.flip_count} "
f"flip_mask=0x{sim.flip_mask:x}")
print(f"insns: {state['count']} writes: {state['writes']} "
f"max_pc: 0x{state['max_pc']:x}")
print()
print("# region histogram (access count by region):")
for region, (rd, wr) in sorted(region_hist.items(),
key=lambda x: -(x[1][0] + x[1][1])):
print(f" {region:12s} rd={rd:6d} wr={wr:6d}")
if sim.training_log:
print()
print(f"# training-status reads ({len(sim.training_log)}):")
for count, addr, val, flipped in sim.training_log[:20]:
tag = "FLIP" if flipped else " "
print(f" [{count}] {tag} {classify(addr):10s} "
f"0x{addr:08x} -> 0x{val:08x}")
if len(sim.training_log) > 20:
print(f" ... +{len(sim.training_log)-20} more")
if uart_buf:
print()
print(f"# UART TX ({len(uart_buf)} bytes):")
print(uart_buf.decode('utf-8', errors='replace'))
def main():
ap = argparse.ArgumentParser()
ap.add_argument("blob")
ap.add_argument("--mode", choices=("pass", "bitflip"), default="pass",
help="pass = always answer training positively; "
"bitflip = flip returned bits for N reads, then pass")
ap.add_argument("--flip-count", type=int, default=1,
help="N: how many flipped reads per status address before "
"reverting to pass (default 1)")
ap.add_argument("--flip-mask", default="0xFFFFFFFF",
help="XOR mask applied to training status (default: "
"invert all bits, which usually reads as 'not done')")
ap.add_argument("--max-insn", type=int, default=500_000)
ap.add_argument("--limit-trace", type=int, default=200,
help="cap on per-access trace rows stored (no I/O cost)")
ap.add_argument("--verbose", action="store_true",
help="print full per-access trace (may be very long)")
ap.add_argument("--tripwire-out", default=None, metavar="CSV",
help="write full PC-resolved access trace to this CSV")
args = ap.parse_args()
sim = TrainingSim(
mode=args.mode,
flip_count=args.flip_count,
flip_mask=int(args.flip_mask, 0),
limit_trace=args.limit_trace if not args.verbose else 10**9,
verbose=args.verbose,
)
tripwire = _TripwireCapture() if args.tripwire_out else None
state, uart_buf = run(args.blob, sim, args.max_insn, tripwire=tripwire)
if tripwire is not None:
tripwire.emit_csv(args.tripwire_out)
print(f"# tripwire: {len(tripwire.records)} records -> "
f"{args.tripwire_out}")
# Region histogram from the trace (capped by limit_trace).
region_hist = {}
for kind, pc, addr, size, val, region in sim.access_log:
rd, wr = region_hist.get(region, (0, 0))
if kind == "rd":
region_hist[region] = (rd + 1, wr)
else:
region_hist[region] = (rd, wr + 1)
if args.verbose:
for kind, pc, addr, size, val, region in sim.access_log:
print(f" PC=0x{pc:08x} [{region:10s}] {kind} "
f"0x{addr:08x} sz={size} val=0x{val:x}")
print_summary(sim, state, uart_buf, region_hist)
return 0
if __name__ == "__main__":
sys.exit(main())
+189
View File
@@ -0,0 +1,189 @@
#!/usr/bin/env python3
"""tripwire_diff.py — PC-bucketed sequence diff of two tripwire CSVs.
Per Janet (2026-04-21): cross-index diff is destroyed the moment control
flow diverges (bitflip mode guarantees this). Group records by fn_name,
diff per bucket with difflib.SequenceMatcher. Long edit-distance buckets
get tagged SUSPECT and emit a raw side-by-side sub-sequence for human
triage — we do not try to auto-resolve.
Usage:
tripwire_diff.py vendor.csv rebuilt.csv [--suspect-threshold 0.9]
[--show-identical] [--limit-per-bucket N]
Key for each record inside a bucket (tunable): region + addr + rw + val.
PC is excluded because codegen reg-alloc can shift individual load/store
PCs within a function without changing behavior. `seq` and `tick` are
excluded because they drift with any upstream path difference.
"""
import argparse
import difflib
import sys
from collections import defaultdict
from sim_tripwire import load_csv
def bucket_key(rec):
"""Inside a fn_name bucket, the canonical record key for diffing."""
return (rec["region"], rec["addr"], rec["rw"], rec["val"], rec["size"])
def bucket_by_fn(records):
buckets = defaultdict(list)
for r in records:
buckets[r["fn"]].append(r)
return buckets
def ratio(seq_a, seq_b):
"""Cheap-first ratio. Skips O(n²) SequenceMatcher when obviously not similar."""
if not seq_a and not seq_b:
return 1.0
if not seq_a or not seq_b:
return 0.0
if seq_a == seq_b:
return 1.0
sm = difflib.SequenceMatcher(a=seq_a, b=seq_b, autojunk=False)
# quick_ratio is an upper bound computed from set intersection —
# useful as an early reject when buckets share nothing.
qr = sm.quick_ratio()
if qr < 0.5:
return qr
return sm.ratio()
def _sm_cache(va, vb):
"""Return (key_a, key_b, cached SequenceMatcher) once, reuse for opcodes."""
ka = [bucket_key(r) for r in va]
kb = [bucket_key(r) for r in vb]
if ka == kb:
return ka, kb, None
sm = difflib.SequenceMatcher(a=ka, b=kb, autojunk=False)
return ka, kb, sm
def fmt_rec(rec):
return (f"{rec['region']:11s} {rec['rw']} 0x{rec['addr']:08x} "
f"sz={rec['size']} val=0x{rec['val']:x} (pc=0x{rec['pc']:x})")
def diff_bucket(name, va, vb, limit, show_identical):
ka = [bucket_key(r) for r in va]
kb = [bucket_key(r) for r in vb]
r = ratio(ka, kb)
status = "OK " if ka == kb else f"{r:.3f} "
if ka == kb and not show_identical:
return status, None
if ka == kb:
return status, (f"{name:22s} OK {len(va)} records match "
"(showing on --show-identical)")
# Surface an edit-script side-by-side
sm = difflib.SequenceMatcher(a=ka, b=kb, autojunk=False)
lines = [f"{name:22s} {status} "
f"vendor={len(va):5d} rebuilt={len(vb):5d}"]
shown = 0
for tag, i1, i2, j1, j2 in sm.get_opcodes():
if tag == "equal":
continue
for i in range(i1, i2):
if shown >= limit: break
lines.append(f" - [V#{va[i]['seq']:5d}] {fmt_rec(va[i])}")
shown += 1
for j in range(j1, j2):
if shown >= limit: break
lines.append(f" + [R#{vb[j]['seq']:5d}] {fmt_rec(vb[j])}")
shown += 1
if shown >= limit:
lines.append(f" ... (truncated at {limit} per bucket)")
break
return status, "\n".join(lines)
def main():
ap = argparse.ArgumentParser()
ap.add_argument("vendor")
ap.add_argument("rebuilt")
ap.add_argument("--suspect-threshold", type=float, default=0.9,
help="buckets with ratio below this get SUSPECT tag")
ap.add_argument("--show-identical", action="store_true")
ap.add_argument("--limit-per-bucket", type=int, default=20,
help="max insert/delete lines per bucket in the report")
args = ap.parse_args()
vrecs = load_csv(args.vendor)
rrecs = load_csv(args.rebuilt)
print(f"# vendor: {len(vrecs):6d} records ({args.vendor})")
print(f"# rebuilt: {len(rrecs):6d} records ({args.rebuilt})")
vb = bucket_by_fn(vrecs)
rb = bucket_by_fn(rrecs)
fns = sorted(set(vb) | set(rb))
print(f"# buckets: {len(fns)} functions touched across either side")
print()
ok = susp = diff = 0
reports = []
suspects = []
for fn in fns:
va = vb.get(fn, [])
rs = rb.get(fn, [])
ka, kb, sm = _sm_cache(va, rs)
if sm is None:
ok += 1
if args.show_identical:
reports.append(f"{fn:22s} OK {len(va)} records")
continue
# Fast: set-intersection upper bound; short-circuit on no overlap
qr = sm.quick_ratio()
r = qr if qr < 0.5 else sm.ratio()
tag = "SUSPECT" if r < args.suspect_threshold else " "
if r < args.suspect_threshold:
susp += 1
else:
diff += 1
lines = [f"{fn:22s} vendor={len(va):5d} rebuilt={len(rs):5d}"]
shown = 0
for op_tag, i1, i2, j1, j2 in sm.get_opcodes():
if op_tag == "equal":
continue
for i in range(i1, i2):
if shown >= args.limit_per_bucket: break
lines.append(f" - [V#{va[i]['seq']:5d}] {fmt_rec(va[i])}")
shown += 1
for j in range(j1, j2):
if shown >= args.limit_per_bucket: break
lines.append(f" + [R#{rs[j]['seq']:5d}] {fmt_rec(rs[j])}")
shown += 1
if shown >= args.limit_per_bucket:
lines.append(f" ... (truncated at {args.limit_per_bucket})")
break
rep = "\n".join(lines)
if r < args.suspect_threshold:
suspects.append((r, fn, rep))
else:
reports.append(f"[{tag}] r={r:.3f} " + rep)
print(f"# OK: {ok} minor-diff: {diff} SUSPECT(<{args.suspect_threshold}): {susp}")
print()
if suspects:
suspects.sort()
print(f"## SUSPECT BUCKETS ({len(suspects)}) — human triage required")
print()
for _, fn, rep in suspects:
print(rep)
print()
# Any minor-diff buckets worth dumping too
for line in reports:
if "SUSPECT" not in line:
continue
for line in reports:
if "SUSPECT" in line:
continue
print(line)
return 0 if not suspects else 1
if __name__ == "__main__":
sys.exit(main())