#!/usr/bin/env python3 """mmio_diff.py — log MMIO writes from vendor + rebuilt, diff sequences. MMIO writes are the externally observable behavior. If the rebuilt writes the same address/value/order as vendor, it is behaviorally equivalent — register-level differences from compiler reg-alloc are irrelevant. First divergent write identifies the function that generates wrong output. Usage: mmio_diff.py [--max N] """ import argparse, sys, os from unicorn import * from unicorn.arm64_const import * # Region classifier — stamps each address with a human-readable tag # (DDRCTL:SW, DDRPHY:TR, SRAM, UART, ...). Keeps diff output scannable # by readers who don't have the address map memorised. try: from mmio_regions import classify as _classify_region except ImportError: def _classify_region(addr): return "?" # Tripwire module — full PC-resolved access capture for CSV emit. try: from sim_tripwire import Capture as _TripwireCapture except ImportError: _TripwireCapture = None SRAM_BASE = 0xFF000000 BLOB_BASE = 0xFF001000 STACK_BASE = 0x00400000 RET_STUB = 0x00800000 MMIO = [ (0xFD580000, 0x00020000), (0xFD5F0000, 0x00010000), (0xFD7C0000, 0x00040000), (0xFD800000, 0x00010000), (0xFD8C0000, 0x00010000), (0xFE010000, 0x00020000), (0xFE030000, 0x00010000), (0xFE050000, 0x00010000), (0xFE0C0000, 0x00040000), (0xFE400000, 0x00010000), (0xFECC0000, 0x00010000), (0xFEB50000, 0x00010000), (0xFF100000, 0x00010000), # DDR per-channel bases: ch0-ch3. ddrctl = ch+0x10000; MRCTRL0 at # ch+0x10080 (bit 31 = mr_wr trigger, hw auto-clears on completion); # MRSTAT at ch+0x10090 (bit 0 = busy). Stubs return 0 so polls exit # immediately. Vendor prod.bin NOPs the polls; rebuilt keeps them. (0xF7000000, 0x00040000), (0xF8000000, 0x00040000), (0xF9000000, 0x00040000), (0xFA000000, 0x00040000), ] ABS_STUB = {0xFE0500E0:0, 0xFE050054:1, 0xFE0500E4:0, 0xFEB50014:0x60, 0xFEB5007C:2} REGION_OFF = [ (0xFE0C0000, 0xFE100000, 0xFFF, 0xA24, 0x00000002), (0xFE0C0000, 0xFE100000, 0xFFF, 0x684, 0x00000000), (0xFE0C0000, 0xFE100000, 0xFFF, 0x090, 0x00000000), (0xFE0C0000, 0xFE100000, 0xFFF, 0x080, 0x00000000), (0xFE0C0000, 0xFE100000, 0xFFF, 0x514, 0x00000000), # DDRPHY +0x3cc bit 0 = training-step done (fn_8b40 post-fn_27e0 poll). (0xFE0C0000, 0xFE100000, 0xFFF, 0x3CC, 0x00000001), # DDRPHY +0x0b4 bit 18 = phy-training done (fn_8b40 final-pass poll). (0xFE0C0000, 0xFE100000, 0xFFF, 0x0B4, 0x00040000), # DDR per-channel DFISTAT (ch+0x10c84): bit 0 = dfi_init_complete. # fn_27e0 commits DDRCTL then polls tbz bit 0 → must return 1 to exit. (0xF7000000, 0xFB000000, 0xFFFFFF, 0x10C84, 0x00000001), # DDR per-channel MRSTAT (ch+0x10090): bit 0 = mr_wr_busy (want 0 to # exit busy-poll), bit 16 = mr_rd_done (want 1 to exit mr_read done-poll). (0xF7000000, 0xFB000000, 0xFFFFFF, 0x10090, 0x00010000), # DDRCTL STAT (ch+0x10014): bits [2:0] operating_mode. fn_8b40 polls # `(STAT & 7) == 1` after init — "normal" state. (0xF7000000, 0xFB000000, 0xFFFFFF, 0x10014, 0x00000001), ] REGION_CONST = [(0xFD8C0000, 0xFD8D0000, 0x00000001)] XREG = [getattr(__import__("unicorn.arm64_const", fromlist=["X"]), f"UC_ARM64_REG_X{i}") for i in range(31)] _SWSTAT_TOGGLE_COUNT = {} def reset_stub_state(): _SWSTAT_TOGGLE_COUNT.clear() def stub_value(addr): if addr in ABS_STUB: return ABS_STUB[addr] for rbase, rend, mask, off_val, rv in REGION_OFF: if rbase <= addr < rend and (addr & mask) == off_val: return rv for rbase, rend, rv in REGION_CONST: if rbase <= addr < rend: return rv # SWSTAT-like toggle: per-channel ch+0x10514 alternates 0/1 per read. # fn_29f4 has two back-to-back polls at this reg with OPPOSITE polarity # (first waits CLEAR, second waits SET). Real HW reflects SWCTL writes; # the toggle gives each poll one "correct" iteration to exit. if 0xF7000000 <= addr < 0xFB000000 and (addr & 0xFFFFFF) == 0x10514: n = _SWSTAT_TOGGLE_COUNT.get(addr, 0) _SWSTAT_TOGGLE_COUNT[addr] = n + 1 return 1 if (n & 1) else 0 return 0 def run_and_log_writes(blob_path, max_insn, tripwire=None, capture_stack_writes=False): """Run blob under Unicorn, return list of (write_idx, pc, addr, size, val). If `tripwire` is a sim_tripwire.Capture, every MMIO read and write is also appended to it for CSV emit. If `capture_stack_writes` is True, writes to the emulator-scratch stack region (0x00400000.. 0x00500000) are also appended — useful for bisecting divergences in stack-allocated buffers like fn_de40's param_2[]. """ reset_stub_state() blob = open(blob_path, "rb").read() uc = Uc(UC_ARCH_ARM64, UC_MODE_ARM) uc.mem_map(SRAM_BASE, 0x100000, UC_PROT_ALL) uc.mem_write(BLOB_BASE, blob) uc.mem_map(STACK_BASE, 0x100000, UC_PROT_ALL) uc.mem_map(RET_STUB, 0x1000, UC_PROT_ALL) uc.mem_write(RET_STUB, b"\x00\x00\x20\xd4") for b, s in MMIO: uc.mem_map(b, s, UC_PROT_ALL) writes = [] state = {"count": 0, "last_pc": 0, "same_pc": 0} def hook_code(uc, addr, size, ud): state["count"] += 1 state["last_pc"] = addr if addr == state.get("prev_pc"): state["same_pc"] += 1 if state["same_pc"] > 10000: uc.emu_stop() else: state["same_pc"] = 0; state["prev_pc"] = addr if state["count"] >= max_insn: uc.emu_stop() def hook_mmio_read(uc, typ, addr, size, val, ud): v = stub_value(addr) & ((1 << size*8) - 1) uc.mem_write(addr, v.to_bytes(size, "little")) if tripwire is not None: pc = uc.reg_read(UC_ARM64_REG_PC) tripwire.rd(pc, addr, size, v, state["count"]) def hook_mmio_write(uc, typ, addr, size, val, ud): pc = uc.reg_read(UC_ARM64_REG_PC) writes.append((len(writes), pc, addr, size, val)) if tripwire is not None: tripwire.wr(pc, addr, size, val, state["count"]) def hook_unmapped(uc, typ, addr, size, val, ud): page = addr & ~0xFFFF try: uc.mem_map(page, 0x10000, UC_PROT_ALL) except UcError: pass if typ == UC_MEM_READ_UNMAPPED: v = stub_value(addr) & ((1 << size*8) - 1) uc.mem_write(addr, v.to_bytes(size, "little")) if tripwire is not None: pc = uc.reg_read(UC_ARM64_REG_PC) tripwire.rd(pc, addr, size, v, state["count"]) elif typ == UC_MEM_WRITE_UNMAPPED: pc = uc.reg_read(UC_ARM64_REG_PC) writes.append((len(writes), pc, addr, size, val)) if tripwire is not None: tripwire.wr(pc, addr, size, val, state["count"]) return True uc.hook_add(UC_HOOK_CODE, hook_code) for b, s in MMIO: uc.hook_add(UC_HOOK_MEM_READ, hook_mmio_read, begin=b, end=b + s) uc.hook_add(UC_HOOK_MEM_WRITE, hook_mmio_write, begin=b, end=b + s) uc.hook_add(UC_HOOK_MEM_UNMAPPED, hook_unmapped) if capture_stack_writes and tripwire is not None: def hook_stack_write(uc, typ, addr, size, val, ud): pc = uc.reg_read(UC_ARM64_REG_PC) tripwire.wr(pc, addr, size, val, state["count"]) uc.hook_add(UC_HOOK_MEM_WRITE, hook_stack_write, begin=STACK_BASE, end=STACK_BASE + 0x100000) uc.reg_write(UC_ARM64_REG_SP, STACK_BASE + 0xF0000) uc.reg_write(UC_ARM64_REG_X30, BLOB_BASE + 0x40) pc = BLOB_BASE; remaining = max_insn while remaining > 0: try: uc.emu_start(pc, RET_STUB, count=remaining); break except UcError as e: pc = uc.reg_read(UC_ARM64_REG_PC) try: insn = int.from_bytes(uc.mem_read(pc, 4), "little") except UcError: break if (insn >> 20) == 0xD53: rt = insn & 0x1F if rt < 31: uc.reg_write(XREG[rt], 0) pc += 4; uc.reg_write(UC_ARM64_REG_PC, pc); remaining -= 1; continue if (insn >> 20) in (0xD51, 0xD50): pc += 4; uc.reg_write(UC_ARM64_REG_PC, pc); remaining -= 1; continue break return writes def main(): ap = argparse.ArgumentParser() ap.add_argument("vendor"); ap.add_argument("rebuilt") ap.add_argument("--max", type=int, default=500000) ap.add_argument("--ignore-pc", action="store_true", help="ignore PC when comparing (only addr+val)") ap.add_argument("--show-regions", action="store_true", help="print region histogram of vendor writes on success") ap.add_argument("--tripwire-out-vendor", default=None, metavar="CSV", help="write PC-resolved access trace of the vendor run to CSV") ap.add_argument("--tripwire-out-rebuilt", default=None, metavar="CSV", help="write PC-resolved access trace of the rebuilt run to CSV") ap.add_argument("--capture-stack-writes", action="store_true", help="also capture writes to the emulator stack " "(0x00400000..0x00500000) in tripwire CSVs") args = ap.parse_args() print(f"# MMIO-write diff {args.vendor} vs {args.rebuilt}") tw_v = _TripwireCapture() if (args.tripwire_out_vendor and _TripwireCapture) else None tw_r = _TripwireCapture() if (args.tripwire_out_rebuilt and _TripwireCapture) else None vw = run_and_log_writes(args.vendor, args.max, tripwire=tw_v, capture_stack_writes=args.capture_stack_writes) rw = run_and_log_writes(args.rebuilt, args.max, tripwire=tw_r, capture_stack_writes=args.capture_stack_writes) if tw_v is not None: tw_v.emit_csv(args.tripwire_out_vendor) print(f"# tripwire(vendor): {len(tw_v.records)} records -> " f"{args.tripwire_out_vendor}") if tw_r is not None: tw_r.emit_csv(args.tripwire_out_rebuilt) print(f"# tripwire(rebuilt): {len(tw_r.records)} records -> " f"{args.tripwire_out_rebuilt}") print(f"vendor writes: {len(vw)} rebuilt writes: {len(rw)}") n = min(len(vw), len(rw)) for i in range(n): _, vp, va, vs, vv = vw[i] _, rp, ra, rs, rv = rw[i] key_v = (va, vs, vv) if args.ignore_pc else (vp, va, vs, vv) key_r = (ra, rs, rv) if args.ignore_pc else (rp, ra, rs, rv) if key_v != key_r: print(f"[write {i}] DIVERGE") print(f" vendor: pc=0x{vp:x} [{_classify_region(va):10s}] " f"addr=0x{va:x} sz={vs} val=0x{vv:x}") print(f" rebuilt: pc=0x{rp:x} [{_classify_region(ra):10s}] " f"addr=0x{ra:x} sz={rs} val=0x{rv:x}") # show context: last 3 matching writes for j in range(max(0, i-3), i): _, p, a, s, v = vw[j] print(f" match [{j}]: pc=0x{p:x} [{_classify_region(a):10s}] " f"addr=0x{a:x} val=0x{v:x}") return 1 if len(vw) != len(rw): print(f"[diverge @ end] length mismatch: vendor={len(vw)} rebuilt={len(rw)}") # Region histogram of the longer side's tail — tells you which # subsystem our rebuild hasn't reached yet, or which one it # reached that vendor doesn't. longer = rw if len(rw) > len(vw) else vw side = "rebuilt" if len(rw) > len(vw) else "vendor" hist = {} for _, _, a, _, _ in longer[n:]: r = _classify_region(a) hist[r] = hist.get(r, 0) + 1 if hist: print(f" extra-{side} region histogram:") for r, c in sorted(hist.items(), key=lambda x: -x[1]): print(f" {r:12s} {c}") return 1 print(f"[OK] all {n} MMIO writes match") if args.show_regions: hist = {} for _, _, a, _, _ in vw: r = _classify_region(a) hist[r] = hist.get(r, 0) + 1 print("# region histogram (vendor write counts):") for r, c in sorted(hist.items(), key=lambda x: -x[1]): print(f" {r:12s} {c}") return 0 if __name__ == "__main__": sys.exit(main())