Files
test0r 46155bbe91 simulation: tripwire + PC-bucketed diff + bitflip sweep
Ship the new simulation & verification stack under simulation/:

- mmio_regions.py — address → region classifier (DDRCTL, DDRPHY,
  OTP, SRAM, …). Shared by every other tool so trace output is
  scannable without memorising the memory map.
- sim_tripwire.py — Bin-style per-access capture. Records
  (seq, insn_tick, pc, addr, size, rw, val, region, fn_name) per
  MMIO access. PCResolver bisects the vendor funs table parsed
  from ddr_conservative_asm.s.
- tripwire_diff.py — PC-bucketed difflib.SequenceMatcher diff of
  two tripwire CSVs. Buckets by fn_name so bitflip-induced control
  flow divergence doesn't cascade noise.
- training_sim.py — DDR training simulator with --mode pass and
  --mode bitflip (flip first N reads per training status, exercise
  retry paths). BITFLIP_ONLY env var narrows to a single addr for
  the sweep.
- bitflip_sweep.py — Flip each of 23 training-status addresses
  one-at-a-time and tabulate retry convergence. Surfaces which
  function(s) react to a transient fault by writing different
  downstream register values.

Plus:

- mmio_diff.py updated: region-tagged divergence output,
  --show-regions histogram, --tripwire-out-{vendor,rebuilt} CSV
  capture, --capture-stack-writes for stack-allocated buffer diffs.
- debug_probes/tp_slot_{probe,writes}.py — ad-hoc Unicorn probes
  for chasing a single-slot divergence in an SRAM buffer. Kept as
  reference examples of how to extend the tripwire toolchain.

The stack found 6 silicon-hostile bugs in the rebuilt blob that
mmio_diff's write-sequence gate was structurally blind to, including
three ld-unresolved-symbol NULL derefs (case-mismatched externs,
missing DATA_SYMS) and one C-early-return-skips-shared-tail bug
where vendor's asm fell through to the tail via `b` after a `ret`.
2026-04-22 05:55:28 +02:00

349 lines
13 KiB
Python

#!/usr/bin/env python3
"""training_sim.py — DDR training simulator for the RK3588 TPL blob.
Simulates a DRAM machine that answers the PHY's training handshakes
deterministically, without needing silicon. Two modes:
--mode pass Every status/poll returns "done/OK/trained".
First-iteration behavior, fastest path through
training. This is the existing mmio_diff default.
--mode bitflip For N iterations the status register returns a
bit-flipped (wrong) value, forcing the code
through its retry / error-recovery path. After
N bad reads, the value snaps back to the "pass"
word. Default N = 1: classic "first-pass fails,
retry succeeds" PHY behavior.
Human-readable trace: every MMIO access is tagged with its region
(DDRCTL:SW, DDRPHY:TR, SRAM, UART, ...) so you can scan the log
without memorising the address map.
Usage:
training_sim.py <blob.bin> [--mode pass|bitflip] [--flip-count N]
[--max-insn N] [--verbose] [--limit-trace N]
"""
import argparse, sys, os
from unicorn import *
from unicorn.arm64_const import *
# Local modules
from mmio_regions import classify
from sim_tripwire import Capture as _TripwireCapture
SRAM_BASE = 0xFF000000
SRAM_SIZE = 0x00100000
BLOB_BASE = 0xFF001000
STACK_BASE = 0x00400000
STACK_SIZE = 0x00100000
RET_STUB = 0x00800000
RET_SIZE = 0x00001000
MMIO = [
(0xFD580000, 0x00020000), (0xFD5F0000, 0x00010000),
(0xFD7C0000, 0x00040000), (0xFD800000, 0x00010000),
(0xFD8C0000, 0x00010000),
(0xFE010000, 0x00020000), (0xFE030000, 0x00010000),
(0xFE050000, 0x00010000), (0xFE0C0000, 0x00040000),
(0xFE400000, 0x00010000), (0xFECC0000, 0x00010000),
(0xFEB50000, 0x00010000), (0xFF100000, 0x00010000),
(0xF7000000, 0x00040000), (0xF8000000, 0x00040000),
(0xF9000000, 0x00040000), (0xFA000000, 0x00040000),
]
# Per-address pass values — copied from mmio_diff.ABS_STUB.
ABS_PASS = {
0xFE0500E0: 0x00000000,
0xFE050054: 0x00000001,
0xFE0500E4: 0x00000000,
0xFEB50014: 0x00000060,
0xFEB5007C: 0x00000002,
}
# DDRPHY training-status stubs. Tuple: (base, end, mask, offset, pass_value).
# Copied from mmio_diff.REGION_OFF.
REGION_OFF = [
(0xFE0C0000, 0xFE100000, 0xFFF, 0xA24, 0x00000002),
(0xFE0C0000, 0xFE100000, 0xFFF, 0x684, 0x00000000),
(0xFE0C0000, 0xFE100000, 0xFFF, 0x090, 0x00000000),
(0xFE0C0000, 0xFE100000, 0xFFF, 0x080, 0x00000000),
(0xFE0C0000, 0xFE100000, 0xFFF, 0x514, 0x00000000),
(0xFE0C0000, 0xFE100000, 0xFFF, 0x3CC, 0x00000001),
(0xFE0C0000, 0xFE100000, 0xFFF, 0x0B4, 0x00040000),
(0xF7000000, 0xFB000000, 0xFFFFFF, 0x10C84, 0x00000001),
(0xF7000000, 0xFB000000, 0xFFFFFF, 0x10090, 0x00010000),
(0xF7000000, 0xFB000000, 0xFFFFFF, 0x10014, 0x00000001),
]
REGION_CONST = [(0xFD8C0000, 0xFD8D0000, 0x00000001)]
# Addresses that are *training status* — these are the ones bitflip mode
# perturbs. Anything else keeps the pass value even in bitflip mode so
# the test is focused on training retry paths, not boot-infrastructure
# noise.
def is_training_status(addr):
# Env-var override for bitflip_sweep.py: when set to an address,
# only that exact address is considered "training status" and thus
# bitflippable. Lets us flip one register at a time.
only = os.environ.get("BITFLIP_ONLY")
if only:
return addr == int(only, 0)
if 0xFE0C0000 <= addr < 0xFE100000:
off = addr & 0xFFF
return off in (0x080, 0x090, 0x0B4, 0x3CC, 0x514, 0x684, 0xA24)
if 0xF7000000 <= addr < 0xFB000000:
off = addr & 0xFFFFFF
return off in (0x10014, 0x10090, 0x10C84, 0x10514)
return False
def pass_value(addr):
"""Return the 'all-good' stub value for a status address."""
if addr in ABS_PASS: return ABS_PASS[addr]
for rbase, rend, mask, off_val, rv in REGION_OFF:
if rbase <= addr < rend and (addr & mask) == off_val:
return rv
for rbase, rend, rv in REGION_CONST:
if rbase <= addr < rend:
return rv
# SWSTAT-like toggle: ch+0x10514 alternates per read (preserves
# fn_29f4 two-poll-opposite-polarity expectation).
return None # caller applies its own fallback
class TrainingSim:
def __init__(self, mode, flip_count, flip_mask, limit_trace, verbose):
self.mode = mode # "pass" or "bitflip"
self.flip_count = flip_count # how many first reads return flipped
self.flip_mask = flip_mask # XOR mask for bitflip
self.limit_trace = limit_trace
self.verbose = verbose
# Per-address read counters (used both for bitflip semantics
# and for the SWSTAT toggle that the existing harness needs).
self._reads = {}
self._swstat_toggle = {}
# Traces
self.access_log = [] # (kind, pc, addr, size, val, region)
self.training_log = [] # (n, pc, addr, stub_val, flipped?)
def read_value(self, addr, size):
count = self._reads.get(addr, 0)
self._reads[addr] = count + 1
# Fast path — pass value if defined
pv = pass_value(addr)
if pv is None:
# SWSTAT-like toggle at ch+0x10514
if 0xF7000000 <= addr < 0xFB000000 and (addr & 0xFFFFFF) == 0x10514:
n = self._swstat_toggle.get(addr, 0)
self._swstat_toggle[addr] = n + 1
pv = 1 if (n & 1) else 0
else:
pv = 0
# Apply bitflip to training-status only, first N reads per addr
flipped = False
if self.mode == "bitflip" and is_training_status(addr):
if count < self.flip_count:
pv ^= self.flip_mask
flipped = True
self.training_log.append((count, addr, pv, True))
else:
self.training_log.append((count, addr, pv, False))
return pv & ((1 << (size * 8)) - 1)
def log(self, kind, pc, addr, size, val):
if len(self.access_log) < self.limit_trace:
self.access_log.append((kind, pc, addr, size, val, classify(addr)))
def run(blob_path, sim, max_insn, tripwire=None):
blob = open(blob_path, "rb").read()
uc = Uc(UC_ARCH_ARM64, UC_MODE_ARM)
uc.mem_map(SRAM_BASE, SRAM_SIZE, UC_PROT_ALL)
uc.mem_write(BLOB_BASE, blob)
uc.mem_map(STACK_BASE, STACK_SIZE, UC_PROT_ALL)
uc.mem_map(RET_STUB, RET_SIZE, UC_PROT_ALL)
uc.mem_write(RET_STUB, b"\x00\x00\x20\xd4") # brk #0
for base, sz in MMIO:
uc.mem_map(base, sz, UC_PROT_ALL)
state = {"count": 0, "last_pc": 0, "same_pc": 0, "max_pc": 0, "writes": 0}
def hook_code(uc, addr, size, ud):
state["count"] += 1
if addr == state["last_pc"]:
state["same_pc"] += 1
if state["same_pc"] > 10000:
uc.emu_stop()
else:
state["same_pc"] = 0
state["last_pc"] = addr
if addr > state["max_pc"]:
state["max_pc"] = addr
if state["count"] >= max_insn:
uc.emu_stop()
def hook_read(uc, typ, addr, size, val, ud):
v = sim.read_value(addr, size)
uc.mem_write(addr, v.to_bytes(size, "little"))
pc = uc.reg_read(UC_ARM64_REG_PC)
sim.log("rd", pc, addr, size, v)
if tripwire is not None:
tripwire.rd(pc, addr, size, v, state["count"])
uart_buf = bytearray()
def hook_write(uc, typ, addr, size, val, ud):
pc = uc.reg_read(UC_ARM64_REG_PC)
state["writes"] += 1
sim.log("wr", pc, addr, size, val)
if tripwire is not None:
tripwire.wr(pc, addr, size, val, state["count"])
if addr == 0xFEB50000:
c = val & 0xFF
uart_buf.append(c)
def hook_unmapped(uc, typ, addr, size, val, ud):
page = addr & ~0xFFFF
try:
uc.mem_map(page, 0x10000, UC_PROT_ALL)
except UcError:
pass
if typ == UC_MEM_READ_UNMAPPED:
v = sim.read_value(addr, size)
uc.mem_write(addr, v.to_bytes(size, "little"))
pc = uc.reg_read(UC_ARM64_REG_PC)
sim.log("rd", pc, addr, size, v)
if tripwire is not None:
tripwire.rd(pc, addr, size, v, state["count"])
elif typ == UC_MEM_WRITE_UNMAPPED:
pc = uc.reg_read(UC_ARM64_REG_PC)
state["writes"] += 1
sim.log("wr", pc, addr, size, val)
if tripwire is not None:
tripwire.wr(pc, addr, size, val, state["count"])
return True
uc.hook_add(UC_HOOK_CODE, hook_code)
for base, sz in MMIO:
uc.hook_add(UC_HOOK_MEM_READ, hook_read, begin=base, end=base + sz)
uc.hook_add(UC_HOOK_MEM_WRITE, hook_write, begin=base, end=base + sz)
uc.hook_add(UC_HOOK_MEM_UNMAPPED, hook_unmapped)
uc.reg_write(UC_ARM64_REG_SP, STACK_BASE + STACK_SIZE - 16)
uc.reg_write(UC_ARM64_REG_X30, BLOB_BASE + 0x40)
XREG = [getattr(__import__("unicorn.arm64_const", fromlist=["X"]),
f"UC_ARM64_REG_X{i}") for i in range(31)]
pc = BLOB_BASE
remaining = max_insn
while remaining > 0:
try:
uc.emu_start(pc, RET_STUB, count=remaining)
break
except UcError as e:
pc = uc.reg_read(UC_ARM64_REG_PC)
try:
insn = int.from_bytes(uc.mem_read(pc, 4), "little")
except UcError:
break
if (insn >> 20) == 0xD53:
rt = insn & 0x1F
if rt < 31:
uc.reg_write(XREG[rt], 0)
pc += 4
uc.reg_write(UC_ARM64_REG_PC, pc)
remaining -= 1
continue
if (insn >> 20) in (0xD51, 0xD50):
pc += 4
uc.reg_write(UC_ARM64_REG_PC, pc)
remaining -= 1
continue
break
return state, uart_buf
def print_summary(sim, state, uart_buf, region_hist):
print(f"# training_sim mode={sim.mode} flip_count={sim.flip_count} "
f"flip_mask=0x{sim.flip_mask:x}")
print(f"insns: {state['count']} writes: {state['writes']} "
f"max_pc: 0x{state['max_pc']:x}")
print()
print("# region histogram (access count by region):")
for region, (rd, wr) in sorted(region_hist.items(),
key=lambda x: -(x[1][0] + x[1][1])):
print(f" {region:12s} rd={rd:6d} wr={wr:6d}")
if sim.training_log:
print()
print(f"# training-status reads ({len(sim.training_log)}):")
for count, addr, val, flipped in sim.training_log[:20]:
tag = "FLIP" if flipped else " "
print(f" [{count}] {tag} {classify(addr):10s} "
f"0x{addr:08x} -> 0x{val:08x}")
if len(sim.training_log) > 20:
print(f" ... +{len(sim.training_log)-20} more")
if uart_buf:
print()
print(f"# UART TX ({len(uart_buf)} bytes):")
print(uart_buf.decode('utf-8', errors='replace'))
def main():
ap = argparse.ArgumentParser()
ap.add_argument("blob")
ap.add_argument("--mode", choices=("pass", "bitflip"), default="pass",
help="pass = always answer training positively; "
"bitflip = flip returned bits for N reads, then pass")
ap.add_argument("--flip-count", type=int, default=1,
help="N: how many flipped reads per status address before "
"reverting to pass (default 1)")
ap.add_argument("--flip-mask", default="0xFFFFFFFF",
help="XOR mask applied to training status (default: "
"invert all bits, which usually reads as 'not done')")
ap.add_argument("--max-insn", type=int, default=500_000)
ap.add_argument("--limit-trace", type=int, default=200,
help="cap on per-access trace rows stored (no I/O cost)")
ap.add_argument("--verbose", action="store_true",
help="print full per-access trace (may be very long)")
ap.add_argument("--tripwire-out", default=None, metavar="CSV",
help="write full PC-resolved access trace to this CSV")
args = ap.parse_args()
sim = TrainingSim(
mode=args.mode,
flip_count=args.flip_count,
flip_mask=int(args.flip_mask, 0),
limit_trace=args.limit_trace if not args.verbose else 10**9,
verbose=args.verbose,
)
tripwire = _TripwireCapture() if args.tripwire_out else None
state, uart_buf = run(args.blob, sim, args.max_insn, tripwire=tripwire)
if tripwire is not None:
tripwire.emit_csv(args.tripwire_out)
print(f"# tripwire: {len(tripwire.records)} records -> "
f"{args.tripwire_out}")
# Region histogram from the trace (capped by limit_trace).
region_hist = {}
for kind, pc, addr, size, val, region in sim.access_log:
rd, wr = region_hist.get(region, (0, 0))
if kind == "rd":
region_hist[region] = (rd + 1, wr)
else:
region_hist[region] = (rd, wr + 1)
if args.verbose:
for kind, pc, addr, size, val, region in sim.access_log:
print(f" PC=0x{pc:08x} [{region:10s}] {kind} "
f"0x{addr:08x} sz={size} val=0x{val:x}")
print_summary(sim, state, uart_buf, region_hist)
return 0
if __name__ == "__main__":
sys.exit(main())