#!/usr/bin/env python3 """training_sim.py — DDR training simulator for the RK3588 TPL blob. Simulates a DRAM machine that answers the PHY's training handshakes deterministically, without needing silicon. Two modes: --mode pass Every status/poll returns "done/OK/trained". First-iteration behavior, fastest path through training. This is the existing mmio_diff default. --mode bitflip For N iterations the status register returns a bit-flipped (wrong) value, forcing the code through its retry / error-recovery path. After N bad reads, the value snaps back to the "pass" word. Default N = 1: classic "first-pass fails, retry succeeds" PHY behavior. Human-readable trace: every MMIO access is tagged with its region (DDRCTL:SW, DDRPHY:TR, SRAM, UART, ...) so you can scan the log without memorising the address map. Usage: training_sim.py [--mode pass|bitflip] [--flip-count N] [--max-insn N] [--verbose] [--limit-trace N] """ import argparse, sys, os from unicorn import * from unicorn.arm64_const import * # Local modules from mmio_regions import classify from sim_tripwire import Capture as _TripwireCapture SRAM_BASE = 0xFF000000 SRAM_SIZE = 0x00100000 BLOB_BASE = 0xFF001000 STACK_BASE = 0x00400000 STACK_SIZE = 0x00100000 RET_STUB = 0x00800000 RET_SIZE = 0x00001000 MMIO = [ (0xFD580000, 0x00020000), (0xFD5F0000, 0x00010000), (0xFD7C0000, 0x00040000), (0xFD800000, 0x00010000), (0xFD8C0000, 0x00010000), (0xFE010000, 0x00020000), (0xFE030000, 0x00010000), (0xFE050000, 0x00010000), (0xFE0C0000, 0x00040000), (0xFE400000, 0x00010000), (0xFECC0000, 0x00010000), (0xFEB50000, 0x00010000), (0xFF100000, 0x00010000), (0xF7000000, 0x00040000), (0xF8000000, 0x00040000), (0xF9000000, 0x00040000), (0xFA000000, 0x00040000), ] # Per-address pass values — copied from mmio_diff.ABS_STUB. ABS_PASS = { 0xFE0500E0: 0x00000000, 0xFE050054: 0x00000001, 0xFE0500E4: 0x00000000, 0xFEB50014: 0x00000060, 0xFEB5007C: 0x00000002, } # DDRPHY training-status stubs. Tuple: (base, end, mask, offset, pass_value). # Copied from mmio_diff.REGION_OFF. REGION_OFF = [ (0xFE0C0000, 0xFE100000, 0xFFF, 0xA24, 0x00000002), (0xFE0C0000, 0xFE100000, 0xFFF, 0x684, 0x00000000), (0xFE0C0000, 0xFE100000, 0xFFF, 0x090, 0x00000000), (0xFE0C0000, 0xFE100000, 0xFFF, 0x080, 0x00000000), (0xFE0C0000, 0xFE100000, 0xFFF, 0x514, 0x00000000), (0xFE0C0000, 0xFE100000, 0xFFF, 0x3CC, 0x00000001), (0xFE0C0000, 0xFE100000, 0xFFF, 0x0B4, 0x00040000), (0xF7000000, 0xFB000000, 0xFFFFFF, 0x10C84, 0x00000001), (0xF7000000, 0xFB000000, 0xFFFFFF, 0x10090, 0x00010000), (0xF7000000, 0xFB000000, 0xFFFFFF, 0x10014, 0x00000001), ] REGION_CONST = [(0xFD8C0000, 0xFD8D0000, 0x00000001)] # Addresses that are *training status* — these are the ones bitflip mode # perturbs. Anything else keeps the pass value even in bitflip mode so # the test is focused on training retry paths, not boot-infrastructure # noise. def is_training_status(addr): # Env-var override for bitflip_sweep.py: when set to an address, # only that exact address is considered "training status" and thus # bitflippable. Lets us flip one register at a time. only = os.environ.get("BITFLIP_ONLY") if only: return addr == int(only, 0) if 0xFE0C0000 <= addr < 0xFE100000: off = addr & 0xFFF return off in (0x080, 0x090, 0x0B4, 0x3CC, 0x514, 0x684, 0xA24) if 0xF7000000 <= addr < 0xFB000000: off = addr & 0xFFFFFF return off in (0x10014, 0x10090, 0x10C84, 0x10514) return False def pass_value(addr): """Return the 'all-good' stub value for a status address.""" if addr in ABS_PASS: return ABS_PASS[addr] for rbase, rend, mask, off_val, rv in REGION_OFF: if rbase <= addr < rend and (addr & mask) == off_val: return rv for rbase, rend, rv in REGION_CONST: if rbase <= addr < rend: return rv # SWSTAT-like toggle: ch+0x10514 alternates per read (preserves # fn_29f4 two-poll-opposite-polarity expectation). return None # caller applies its own fallback class TrainingSim: def __init__(self, mode, flip_count, flip_mask, limit_trace, verbose): self.mode = mode # "pass" or "bitflip" self.flip_count = flip_count # how many first reads return flipped self.flip_mask = flip_mask # XOR mask for bitflip self.limit_trace = limit_trace self.verbose = verbose # Per-address read counters (used both for bitflip semantics # and for the SWSTAT toggle that the existing harness needs). self._reads = {} self._swstat_toggle = {} # Traces self.access_log = [] # (kind, pc, addr, size, val, region) self.training_log = [] # (n, pc, addr, stub_val, flipped?) def read_value(self, addr, size): count = self._reads.get(addr, 0) self._reads[addr] = count + 1 # Fast path — pass value if defined pv = pass_value(addr) if pv is None: # SWSTAT-like toggle at ch+0x10514 if 0xF7000000 <= addr < 0xFB000000 and (addr & 0xFFFFFF) == 0x10514: n = self._swstat_toggle.get(addr, 0) self._swstat_toggle[addr] = n + 1 pv = 1 if (n & 1) else 0 else: pv = 0 # Apply bitflip to training-status only, first N reads per addr flipped = False if self.mode == "bitflip" and is_training_status(addr): if count < self.flip_count: pv ^= self.flip_mask flipped = True self.training_log.append((count, addr, pv, True)) else: self.training_log.append((count, addr, pv, False)) return pv & ((1 << (size * 8)) - 1) def log(self, kind, pc, addr, size, val): if len(self.access_log) < self.limit_trace: self.access_log.append((kind, pc, addr, size, val, classify(addr))) def run(blob_path, sim, max_insn, tripwire=None): blob = open(blob_path, "rb").read() uc = Uc(UC_ARCH_ARM64, UC_MODE_ARM) uc.mem_map(SRAM_BASE, SRAM_SIZE, UC_PROT_ALL) uc.mem_write(BLOB_BASE, blob) uc.mem_map(STACK_BASE, STACK_SIZE, UC_PROT_ALL) uc.mem_map(RET_STUB, RET_SIZE, UC_PROT_ALL) uc.mem_write(RET_STUB, b"\x00\x00\x20\xd4") # brk #0 for base, sz in MMIO: uc.mem_map(base, sz, UC_PROT_ALL) state = {"count": 0, "last_pc": 0, "same_pc": 0, "max_pc": 0, "writes": 0} def hook_code(uc, addr, size, ud): state["count"] += 1 if addr == state["last_pc"]: state["same_pc"] += 1 if state["same_pc"] > 10000: uc.emu_stop() else: state["same_pc"] = 0 state["last_pc"] = addr if addr > state["max_pc"]: state["max_pc"] = addr if state["count"] >= max_insn: uc.emu_stop() def hook_read(uc, typ, addr, size, val, ud): v = sim.read_value(addr, size) uc.mem_write(addr, v.to_bytes(size, "little")) pc = uc.reg_read(UC_ARM64_REG_PC) sim.log("rd", pc, addr, size, v) if tripwire is not None: tripwire.rd(pc, addr, size, v, state["count"]) uart_buf = bytearray() def hook_write(uc, typ, addr, size, val, ud): pc = uc.reg_read(UC_ARM64_REG_PC) state["writes"] += 1 sim.log("wr", pc, addr, size, val) if tripwire is not None: tripwire.wr(pc, addr, size, val, state["count"]) if addr == 0xFEB50000: c = val & 0xFF uart_buf.append(c) def hook_unmapped(uc, typ, addr, size, val, ud): page = addr & ~0xFFFF try: uc.mem_map(page, 0x10000, UC_PROT_ALL) except UcError: pass if typ == UC_MEM_READ_UNMAPPED: v = sim.read_value(addr, size) uc.mem_write(addr, v.to_bytes(size, "little")) pc = uc.reg_read(UC_ARM64_REG_PC) sim.log("rd", pc, addr, size, v) if tripwire is not None: tripwire.rd(pc, addr, size, v, state["count"]) elif typ == UC_MEM_WRITE_UNMAPPED: pc = uc.reg_read(UC_ARM64_REG_PC) state["writes"] += 1 sim.log("wr", pc, addr, size, val) if tripwire is not None: tripwire.wr(pc, addr, size, val, state["count"]) return True uc.hook_add(UC_HOOK_CODE, hook_code) for base, sz in MMIO: uc.hook_add(UC_HOOK_MEM_READ, hook_read, begin=base, end=base + sz) uc.hook_add(UC_HOOK_MEM_WRITE, hook_write, begin=base, end=base + sz) uc.hook_add(UC_HOOK_MEM_UNMAPPED, hook_unmapped) uc.reg_write(UC_ARM64_REG_SP, STACK_BASE + STACK_SIZE - 16) uc.reg_write(UC_ARM64_REG_X30, BLOB_BASE + 0x40) XREG = [getattr(__import__("unicorn.arm64_const", fromlist=["X"]), f"UC_ARM64_REG_X{i}") for i in range(31)] pc = BLOB_BASE remaining = max_insn while remaining > 0: try: uc.emu_start(pc, RET_STUB, count=remaining) break except UcError as e: pc = uc.reg_read(UC_ARM64_REG_PC) try: insn = int.from_bytes(uc.mem_read(pc, 4), "little") except UcError: break if (insn >> 20) == 0xD53: rt = insn & 0x1F if rt < 31: uc.reg_write(XREG[rt], 0) pc += 4 uc.reg_write(UC_ARM64_REG_PC, pc) remaining -= 1 continue if (insn >> 20) in (0xD51, 0xD50): pc += 4 uc.reg_write(UC_ARM64_REG_PC, pc) remaining -= 1 continue break return state, uart_buf def print_summary(sim, state, uart_buf, region_hist): print(f"# training_sim mode={sim.mode} flip_count={sim.flip_count} " f"flip_mask=0x{sim.flip_mask:x}") print(f"insns: {state['count']} writes: {state['writes']} " f"max_pc: 0x{state['max_pc']:x}") print() print("# region histogram (access count by region):") for region, (rd, wr) in sorted(region_hist.items(), key=lambda x: -(x[1][0] + x[1][1])): print(f" {region:12s} rd={rd:6d} wr={wr:6d}") if sim.training_log: print() print(f"# training-status reads ({len(sim.training_log)}):") for count, addr, val, flipped in sim.training_log[:20]: tag = "FLIP" if flipped else " " print(f" [{count}] {tag} {classify(addr):10s} " f"0x{addr:08x} -> 0x{val:08x}") if len(sim.training_log) > 20: print(f" ... +{len(sim.training_log)-20} more") if uart_buf: print() print(f"# UART TX ({len(uart_buf)} bytes):") print(uart_buf.decode('utf-8', errors='replace')) def main(): ap = argparse.ArgumentParser() ap.add_argument("blob") ap.add_argument("--mode", choices=("pass", "bitflip"), default="pass", help="pass = always answer training positively; " "bitflip = flip returned bits for N reads, then pass") ap.add_argument("--flip-count", type=int, default=1, help="N: how many flipped reads per status address before " "reverting to pass (default 1)") ap.add_argument("--flip-mask", default="0xFFFFFFFF", help="XOR mask applied to training status (default: " "invert all bits, which usually reads as 'not done')") ap.add_argument("--max-insn", type=int, default=500_000) ap.add_argument("--limit-trace", type=int, default=200, help="cap on per-access trace rows stored (no I/O cost)") ap.add_argument("--verbose", action="store_true", help="print full per-access trace (may be very long)") ap.add_argument("--tripwire-out", default=None, metavar="CSV", help="write full PC-resolved access trace to this CSV") args = ap.parse_args() sim = TrainingSim( mode=args.mode, flip_count=args.flip_count, flip_mask=int(args.flip_mask, 0), limit_trace=args.limit_trace if not args.verbose else 10**9, verbose=args.verbose, ) tripwire = _TripwireCapture() if args.tripwire_out else None state, uart_buf = run(args.blob, sim, args.max_insn, tripwire=tripwire) if tripwire is not None: tripwire.emit_csv(args.tripwire_out) print(f"# tripwire: {len(tripwire.records)} records -> " f"{args.tripwire_out}") # Region histogram from the trace (capped by limit_trace). region_hist = {} for kind, pc, addr, size, val, region in sim.access_log: rd, wr = region_hist.get(region, (0, 0)) if kind == "rd": region_hist[region] = (rd + 1, wr) else: region_hist[region] = (rd, wr + 1) if args.verbose: for kind, pc, addr, size, val, region in sim.access_log: print(f" PC=0x{pc:08x} [{region:10s}] {kind} " f"0x{addr:08x} sz={size} val=0x{val:x}") print_summary(sim, state, uart_buf, region_hist) return 0 if __name__ == "__main__": sys.exit(main())