46155bbe91
Ship the new simulation & verification stack under simulation/:
- mmio_regions.py — address → region classifier (DDRCTL, DDRPHY,
OTP, SRAM, …). Shared by every other tool so trace output is
scannable without memorising the memory map.
- sim_tripwire.py — Bin-style per-access capture. Records
(seq, insn_tick, pc, addr, size, rw, val, region, fn_name) per
MMIO access. PCResolver bisects the vendor funs table parsed
from ddr_conservative_asm.s.
- tripwire_diff.py — PC-bucketed difflib.SequenceMatcher diff of
two tripwire CSVs. Buckets by fn_name so bitflip-induced control
flow divergence doesn't cascade noise.
- training_sim.py — DDR training simulator with --mode pass and
--mode bitflip (flip first N reads per training status, exercise
retry paths). BITFLIP_ONLY env var narrows to a single addr for
the sweep.
- bitflip_sweep.py — Flip each of 23 training-status addresses
one-at-a-time and tabulate retry convergence. Surfaces which
function(s) react to a transient fault by writing different
downstream register values.
Plus:
- mmio_diff.py updated: region-tagged divergence output,
--show-regions histogram, --tripwire-out-{vendor,rebuilt} CSV
capture, --capture-stack-writes for stack-allocated buffer diffs.
- debug_probes/tp_slot_{probe,writes}.py — ad-hoc Unicorn probes
for chasing a single-slot divergence in an SRAM buffer. Kept as
reference examples of how to extend the tripwire toolchain.
The stack found 6 silicon-hostile bugs in the rebuilt blob that
mmio_diff's write-sequence gate was structurally blind to, including
three ld-unresolved-symbol NULL derefs (case-mismatched externs,
missing DATA_SYMS) and one C-early-return-skips-shared-tail bug
where vendor's asm fell through to the tail via `b` after a `ret`.
349 lines
13 KiB
Python
349 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""training_sim.py — DDR training simulator for the RK3588 TPL blob.
|
|
|
|
Simulates a DRAM machine that answers the PHY's training handshakes
|
|
deterministically, without needing silicon. Two modes:
|
|
|
|
--mode pass Every status/poll returns "done/OK/trained".
|
|
First-iteration behavior, fastest path through
|
|
training. This is the existing mmio_diff default.
|
|
|
|
--mode bitflip For N iterations the status register returns a
|
|
bit-flipped (wrong) value, forcing the code
|
|
through its retry / error-recovery path. After
|
|
N bad reads, the value snaps back to the "pass"
|
|
word. Default N = 1: classic "first-pass fails,
|
|
retry succeeds" PHY behavior.
|
|
|
|
Human-readable trace: every MMIO access is tagged with its region
|
|
(DDRCTL:SW, DDRPHY:TR, SRAM, UART, ...) so you can scan the log
|
|
without memorising the address map.
|
|
|
|
Usage:
|
|
training_sim.py <blob.bin> [--mode pass|bitflip] [--flip-count N]
|
|
[--max-insn N] [--verbose] [--limit-trace N]
|
|
"""
|
|
import argparse, sys, os
|
|
from unicorn import *
|
|
from unicorn.arm64_const import *
|
|
|
|
# Local modules
|
|
from mmio_regions import classify
|
|
from sim_tripwire import Capture as _TripwireCapture
|
|
|
|
SRAM_BASE = 0xFF000000
|
|
SRAM_SIZE = 0x00100000
|
|
BLOB_BASE = 0xFF001000
|
|
STACK_BASE = 0x00400000
|
|
STACK_SIZE = 0x00100000
|
|
RET_STUB = 0x00800000
|
|
RET_SIZE = 0x00001000
|
|
|
|
MMIO = [
|
|
(0xFD580000, 0x00020000), (0xFD5F0000, 0x00010000),
|
|
(0xFD7C0000, 0x00040000), (0xFD800000, 0x00010000),
|
|
(0xFD8C0000, 0x00010000),
|
|
(0xFE010000, 0x00020000), (0xFE030000, 0x00010000),
|
|
(0xFE050000, 0x00010000), (0xFE0C0000, 0x00040000),
|
|
(0xFE400000, 0x00010000), (0xFECC0000, 0x00010000),
|
|
(0xFEB50000, 0x00010000), (0xFF100000, 0x00010000),
|
|
(0xF7000000, 0x00040000), (0xF8000000, 0x00040000),
|
|
(0xF9000000, 0x00040000), (0xFA000000, 0x00040000),
|
|
]
|
|
|
|
# Per-address pass values — copied from mmio_diff.ABS_STUB.
|
|
ABS_PASS = {
|
|
0xFE0500E0: 0x00000000,
|
|
0xFE050054: 0x00000001,
|
|
0xFE0500E4: 0x00000000,
|
|
0xFEB50014: 0x00000060,
|
|
0xFEB5007C: 0x00000002,
|
|
}
|
|
|
|
# DDRPHY training-status stubs. Tuple: (base, end, mask, offset, pass_value).
|
|
# Copied from mmio_diff.REGION_OFF.
|
|
REGION_OFF = [
|
|
(0xFE0C0000, 0xFE100000, 0xFFF, 0xA24, 0x00000002),
|
|
(0xFE0C0000, 0xFE100000, 0xFFF, 0x684, 0x00000000),
|
|
(0xFE0C0000, 0xFE100000, 0xFFF, 0x090, 0x00000000),
|
|
(0xFE0C0000, 0xFE100000, 0xFFF, 0x080, 0x00000000),
|
|
(0xFE0C0000, 0xFE100000, 0xFFF, 0x514, 0x00000000),
|
|
(0xFE0C0000, 0xFE100000, 0xFFF, 0x3CC, 0x00000001),
|
|
(0xFE0C0000, 0xFE100000, 0xFFF, 0x0B4, 0x00040000),
|
|
(0xF7000000, 0xFB000000, 0xFFFFFF, 0x10C84, 0x00000001),
|
|
(0xF7000000, 0xFB000000, 0xFFFFFF, 0x10090, 0x00010000),
|
|
(0xF7000000, 0xFB000000, 0xFFFFFF, 0x10014, 0x00000001),
|
|
]
|
|
REGION_CONST = [(0xFD8C0000, 0xFD8D0000, 0x00000001)]
|
|
|
|
# Addresses that are *training status* — these are the ones bitflip mode
|
|
# perturbs. Anything else keeps the pass value even in bitflip mode so
|
|
# the test is focused on training retry paths, not boot-infrastructure
|
|
# noise.
|
|
def is_training_status(addr):
|
|
# Env-var override for bitflip_sweep.py: when set to an address,
|
|
# only that exact address is considered "training status" and thus
|
|
# bitflippable. Lets us flip one register at a time.
|
|
only = os.environ.get("BITFLIP_ONLY")
|
|
if only:
|
|
return addr == int(only, 0)
|
|
if 0xFE0C0000 <= addr < 0xFE100000:
|
|
off = addr & 0xFFF
|
|
return off in (0x080, 0x090, 0x0B4, 0x3CC, 0x514, 0x684, 0xA24)
|
|
if 0xF7000000 <= addr < 0xFB000000:
|
|
off = addr & 0xFFFFFF
|
|
return off in (0x10014, 0x10090, 0x10C84, 0x10514)
|
|
return False
|
|
|
|
|
|
def pass_value(addr):
|
|
"""Return the 'all-good' stub value for a status address."""
|
|
if addr in ABS_PASS: return ABS_PASS[addr]
|
|
for rbase, rend, mask, off_val, rv in REGION_OFF:
|
|
if rbase <= addr < rend and (addr & mask) == off_val:
|
|
return rv
|
|
for rbase, rend, rv in REGION_CONST:
|
|
if rbase <= addr < rend:
|
|
return rv
|
|
# SWSTAT-like toggle: ch+0x10514 alternates per read (preserves
|
|
# fn_29f4 two-poll-opposite-polarity expectation).
|
|
return None # caller applies its own fallback
|
|
|
|
|
|
class TrainingSim:
|
|
def __init__(self, mode, flip_count, flip_mask, limit_trace, verbose):
|
|
self.mode = mode # "pass" or "bitflip"
|
|
self.flip_count = flip_count # how many first reads return flipped
|
|
self.flip_mask = flip_mask # XOR mask for bitflip
|
|
self.limit_trace = limit_trace
|
|
self.verbose = verbose
|
|
# Per-address read counters (used both for bitflip semantics
|
|
# and for the SWSTAT toggle that the existing harness needs).
|
|
self._reads = {}
|
|
self._swstat_toggle = {}
|
|
# Traces
|
|
self.access_log = [] # (kind, pc, addr, size, val, region)
|
|
self.training_log = [] # (n, pc, addr, stub_val, flipped?)
|
|
|
|
def read_value(self, addr, size):
|
|
count = self._reads.get(addr, 0)
|
|
self._reads[addr] = count + 1
|
|
|
|
# Fast path — pass value if defined
|
|
pv = pass_value(addr)
|
|
if pv is None:
|
|
# SWSTAT-like toggle at ch+0x10514
|
|
if 0xF7000000 <= addr < 0xFB000000 and (addr & 0xFFFFFF) == 0x10514:
|
|
n = self._swstat_toggle.get(addr, 0)
|
|
self._swstat_toggle[addr] = n + 1
|
|
pv = 1 if (n & 1) else 0
|
|
else:
|
|
pv = 0
|
|
|
|
# Apply bitflip to training-status only, first N reads per addr
|
|
flipped = False
|
|
if self.mode == "bitflip" and is_training_status(addr):
|
|
if count < self.flip_count:
|
|
pv ^= self.flip_mask
|
|
flipped = True
|
|
self.training_log.append((count, addr, pv, True))
|
|
else:
|
|
self.training_log.append((count, addr, pv, False))
|
|
|
|
return pv & ((1 << (size * 8)) - 1)
|
|
|
|
def log(self, kind, pc, addr, size, val):
|
|
if len(self.access_log) < self.limit_trace:
|
|
self.access_log.append((kind, pc, addr, size, val, classify(addr)))
|
|
|
|
|
|
def run(blob_path, sim, max_insn, tripwire=None):
|
|
blob = open(blob_path, "rb").read()
|
|
uc = Uc(UC_ARCH_ARM64, UC_MODE_ARM)
|
|
uc.mem_map(SRAM_BASE, SRAM_SIZE, UC_PROT_ALL)
|
|
uc.mem_write(BLOB_BASE, blob)
|
|
uc.mem_map(STACK_BASE, STACK_SIZE, UC_PROT_ALL)
|
|
uc.mem_map(RET_STUB, RET_SIZE, UC_PROT_ALL)
|
|
uc.mem_write(RET_STUB, b"\x00\x00\x20\xd4") # brk #0
|
|
|
|
for base, sz in MMIO:
|
|
uc.mem_map(base, sz, UC_PROT_ALL)
|
|
|
|
state = {"count": 0, "last_pc": 0, "same_pc": 0, "max_pc": 0, "writes": 0}
|
|
|
|
def hook_code(uc, addr, size, ud):
|
|
state["count"] += 1
|
|
if addr == state["last_pc"]:
|
|
state["same_pc"] += 1
|
|
if state["same_pc"] > 10000:
|
|
uc.emu_stop()
|
|
else:
|
|
state["same_pc"] = 0
|
|
state["last_pc"] = addr
|
|
if addr > state["max_pc"]:
|
|
state["max_pc"] = addr
|
|
if state["count"] >= max_insn:
|
|
uc.emu_stop()
|
|
|
|
def hook_read(uc, typ, addr, size, val, ud):
|
|
v = sim.read_value(addr, size)
|
|
uc.mem_write(addr, v.to_bytes(size, "little"))
|
|
pc = uc.reg_read(UC_ARM64_REG_PC)
|
|
sim.log("rd", pc, addr, size, v)
|
|
if tripwire is not None:
|
|
tripwire.rd(pc, addr, size, v, state["count"])
|
|
|
|
uart_buf = bytearray()
|
|
def hook_write(uc, typ, addr, size, val, ud):
|
|
pc = uc.reg_read(UC_ARM64_REG_PC)
|
|
state["writes"] += 1
|
|
sim.log("wr", pc, addr, size, val)
|
|
if tripwire is not None:
|
|
tripwire.wr(pc, addr, size, val, state["count"])
|
|
if addr == 0xFEB50000:
|
|
c = val & 0xFF
|
|
uart_buf.append(c)
|
|
|
|
def hook_unmapped(uc, typ, addr, size, val, ud):
|
|
page = addr & ~0xFFFF
|
|
try:
|
|
uc.mem_map(page, 0x10000, UC_PROT_ALL)
|
|
except UcError:
|
|
pass
|
|
if typ == UC_MEM_READ_UNMAPPED:
|
|
v = sim.read_value(addr, size)
|
|
uc.mem_write(addr, v.to_bytes(size, "little"))
|
|
pc = uc.reg_read(UC_ARM64_REG_PC)
|
|
sim.log("rd", pc, addr, size, v)
|
|
if tripwire is not None:
|
|
tripwire.rd(pc, addr, size, v, state["count"])
|
|
elif typ == UC_MEM_WRITE_UNMAPPED:
|
|
pc = uc.reg_read(UC_ARM64_REG_PC)
|
|
state["writes"] += 1
|
|
sim.log("wr", pc, addr, size, val)
|
|
if tripwire is not None:
|
|
tripwire.wr(pc, addr, size, val, state["count"])
|
|
return True
|
|
|
|
uc.hook_add(UC_HOOK_CODE, hook_code)
|
|
for base, sz in MMIO:
|
|
uc.hook_add(UC_HOOK_MEM_READ, hook_read, begin=base, end=base + sz)
|
|
uc.hook_add(UC_HOOK_MEM_WRITE, hook_write, begin=base, end=base + sz)
|
|
uc.hook_add(UC_HOOK_MEM_UNMAPPED, hook_unmapped)
|
|
|
|
uc.reg_write(UC_ARM64_REG_SP, STACK_BASE + STACK_SIZE - 16)
|
|
uc.reg_write(UC_ARM64_REG_X30, BLOB_BASE + 0x40)
|
|
|
|
XREG = [getattr(__import__("unicorn.arm64_const", fromlist=["X"]),
|
|
f"UC_ARM64_REG_X{i}") for i in range(31)]
|
|
pc = BLOB_BASE
|
|
remaining = max_insn
|
|
while remaining > 0:
|
|
try:
|
|
uc.emu_start(pc, RET_STUB, count=remaining)
|
|
break
|
|
except UcError as e:
|
|
pc = uc.reg_read(UC_ARM64_REG_PC)
|
|
try:
|
|
insn = int.from_bytes(uc.mem_read(pc, 4), "little")
|
|
except UcError:
|
|
break
|
|
if (insn >> 20) == 0xD53:
|
|
rt = insn & 0x1F
|
|
if rt < 31:
|
|
uc.reg_write(XREG[rt], 0)
|
|
pc += 4
|
|
uc.reg_write(UC_ARM64_REG_PC, pc)
|
|
remaining -= 1
|
|
continue
|
|
if (insn >> 20) in (0xD51, 0xD50):
|
|
pc += 4
|
|
uc.reg_write(UC_ARM64_REG_PC, pc)
|
|
remaining -= 1
|
|
continue
|
|
break
|
|
|
|
return state, uart_buf
|
|
|
|
|
|
def print_summary(sim, state, uart_buf, region_hist):
|
|
print(f"# training_sim mode={sim.mode} flip_count={sim.flip_count} "
|
|
f"flip_mask=0x{sim.flip_mask:x}")
|
|
print(f"insns: {state['count']} writes: {state['writes']} "
|
|
f"max_pc: 0x{state['max_pc']:x}")
|
|
print()
|
|
print("# region histogram (access count by region):")
|
|
for region, (rd, wr) in sorted(region_hist.items(),
|
|
key=lambda x: -(x[1][0] + x[1][1])):
|
|
print(f" {region:12s} rd={rd:6d} wr={wr:6d}")
|
|
if sim.training_log:
|
|
print()
|
|
print(f"# training-status reads ({len(sim.training_log)}):")
|
|
for count, addr, val, flipped in sim.training_log[:20]:
|
|
tag = "FLIP" if flipped else " "
|
|
print(f" [{count}] {tag} {classify(addr):10s} "
|
|
f"0x{addr:08x} -> 0x{val:08x}")
|
|
if len(sim.training_log) > 20:
|
|
print(f" ... +{len(sim.training_log)-20} more")
|
|
if uart_buf:
|
|
print()
|
|
print(f"# UART TX ({len(uart_buf)} bytes):")
|
|
print(uart_buf.decode('utf-8', errors='replace'))
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("blob")
|
|
ap.add_argument("--mode", choices=("pass", "bitflip"), default="pass",
|
|
help="pass = always answer training positively; "
|
|
"bitflip = flip returned bits for N reads, then pass")
|
|
ap.add_argument("--flip-count", type=int, default=1,
|
|
help="N: how many flipped reads per status address before "
|
|
"reverting to pass (default 1)")
|
|
ap.add_argument("--flip-mask", default="0xFFFFFFFF",
|
|
help="XOR mask applied to training status (default: "
|
|
"invert all bits, which usually reads as 'not done')")
|
|
ap.add_argument("--max-insn", type=int, default=500_000)
|
|
ap.add_argument("--limit-trace", type=int, default=200,
|
|
help="cap on per-access trace rows stored (no I/O cost)")
|
|
ap.add_argument("--verbose", action="store_true",
|
|
help="print full per-access trace (may be very long)")
|
|
ap.add_argument("--tripwire-out", default=None, metavar="CSV",
|
|
help="write full PC-resolved access trace to this CSV")
|
|
args = ap.parse_args()
|
|
|
|
sim = TrainingSim(
|
|
mode=args.mode,
|
|
flip_count=args.flip_count,
|
|
flip_mask=int(args.flip_mask, 0),
|
|
limit_trace=args.limit_trace if not args.verbose else 10**9,
|
|
verbose=args.verbose,
|
|
)
|
|
tripwire = _TripwireCapture() if args.tripwire_out else None
|
|
state, uart_buf = run(args.blob, sim, args.max_insn, tripwire=tripwire)
|
|
if tripwire is not None:
|
|
tripwire.emit_csv(args.tripwire_out)
|
|
print(f"# tripwire: {len(tripwire.records)} records -> "
|
|
f"{args.tripwire_out}")
|
|
|
|
# Region histogram from the trace (capped by limit_trace).
|
|
region_hist = {}
|
|
for kind, pc, addr, size, val, region in sim.access_log:
|
|
rd, wr = region_hist.get(region, (0, 0))
|
|
if kind == "rd":
|
|
region_hist[region] = (rd + 1, wr)
|
|
else:
|
|
region_hist[region] = (rd, wr + 1)
|
|
|
|
if args.verbose:
|
|
for kind, pc, addr, size, val, region in sim.access_log:
|
|
print(f" PC=0x{pc:08x} [{region:10s}] {kind} "
|
|
f"0x{addr:08x} sz={size} val=0x{val:x}")
|
|
|
|
print_summary(sim, state, uart_buf, region_hist)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|