"""One function per bring-up stage. Each is explicit and independently runnable from the operator prompt — no implicit sequencing between them.""" from __future__ import annotations import time from dataclasses import dataclass from typing import Callable import fmt from protocol import Link, Response, Event, parse_line @dataclass class Tally: passed: int = 0 failed: int = 0 skipped: int = 0 warnings: int = 0 def note_pass(self) -> None: self.passed += 1 def note_fail(self) -> None: self.failed += 1 def note_skip(self) -> None: self.skipped += 1 def note_warn(self) -> None: self.warnings += 1 def _prompt(msg: str) -> None: """Block until operator presses Enter (Ctrl-C still aborts).""" input(fmt.prompt(msg) + fmt.dim(" [Enter to continue]") + ": ") def _show_response(label: str, r: Response) -> None: bag = " ".join(f"{k}={v}" for k, v in r.fields.items()) print(f" {fmt.status_tag(r.status)} {label} {fmt.dim(bag)}") # --------------------------------------------------------------------------- # Stage 0 — Begin + identify # --------------------------------------------------------------------------- def stage_begin(link: Link, t: Tally) -> None: print(fmt.stage("Stage 0 — Begin")) print(" Waiting for device to finish booting ...") r = link.wait_ready("BU.BEGIN", per_attempt_s=1.5, overall_timeout_s=30.0) _show_response("begin", r) if r.status != "OK": t.note_fail(); raise SystemExit("Device did not enter bring-up mode") r = link.request("BU.INFO") _show_response("info", r) (t.note_pass if r.status == "OK" else t.note_fail)() print(f" -> fw={r.get('fw')} board={r.get('board', '?')} reset={r.get('reset')} " f"heap={r.get('heap')}") # --------------------------------------------------------------------------- # Stage 1 — Flash & persistence # --------------------------------------------------------------------------- def stage_flash(link: Link, t: Tally) -> None: print(fmt.stage("Stage 1 — Flash & storage")) _prompt(" Run flash roundtrip + log head/tail check") r = link.request("BU.FLASH", overall_timeout_s=10) _show_response("flash", r) (t.note_pass if r.status == "OK" else t.note_fail)() # --------------------------------------------------------------------------- # Stage 2 — I2C + LEDs # --------------------------------------------------------------------------- def stage_i2c_led(link: Link, t: Tally) -> None: import threading print(fmt.stage("Stage 2 — I2C / TCA9555 / LEDs")) _prompt(" Probe TCA9555 and run LED check") r = link.request("BU.I2C") _show_response("i2c", r) if r.status != "OK": t.note_fail(); return t.note_pass() # Firmware-driven LED watch: all LEDs solid when the button is released, # fast waterfall while held. Operator actuates the button and confirms. def reader() -> None: try: for item in link.request_stream("BU.LED.WATCH", overall_timeout_s=3600): if isinstance(item, Event) and item.cmd == "led": state = "PRESSED" if item.fields.get("pressed") == "1" else "released" print(f" [led] button {state}") elif isinstance(item, Response): return except Exception as e: # pragma: no cover — defensive print(f" [led-reader] {e!r}") th = threading.Thread(target=reader, daemon=True) th.start() print(" LEDs should be SOLID (all on) when button is released.") print(" Press-and-hold the button: LEDs should WATERFALL at ~3× speed.") try: while True: ans = input(" LEDs behaved correctly? [y/n]: ").strip().lower() if ans.startswith("y"): verdict = "pass"; break if ans.startswith("n"): verdict = "fail"; break finally: link.send("") # any byte aborts BU.LED.WATCH th.join(timeout=3) if verdict == "pass": t.note_pass() else: print(f" {fmt.fail('LED visual check FAILED')}") t.note_fail() # --------------------------------------------------------------------------- # Stage 3 — ADC + battery calibration # --------------------------------------------------------------------------- def stage_adc(link: Link, t: Tally, calibrate: bool = True) -> None: print(fmt.stage("Stage 3 — Analog front-end")) _prompt(" Read ADC snapshot (battery / motor current)") r = link.request("BU.ADC") _show_response("adc", r) if r.status != "OK": t.note_fail(); return t.note_pass() bat_V = r.getf("bat_V", 0.0) print(f" -> battery reports {bat_V:.3f} V") # VOC and FAULT pins on V5 are unusable (wired direct to input-only # ESP32 GPIOs, no external resistors — see README "V5 hardware caveats"). # They're intentionally ignored here. if not calibrate: return _run_battery_cal(link, t) def _run_battery_cal(link: Link, t: Tally) -> None: from calibrate import single_point_offset, verify print(fmt.section("Battery voltage calibration")) # Read current K and raw mV. k_r = link.request("BU.PARAM GET V_SENS_K") if k_r.status != "OK": print(" Could not read V_SENS_K"); t.note_fail(); return k = k_r.getf("value") adc_r = link.request("BU.ADC") bat_mv = adc_r.getf("bat_mv") # ADC noise rarely lands on exactly 0; check against a small range so a # near-floor reading still flags as bogus. if bat_mv < 50: print(f" ADC read looks bogus (mv={bat_mv:.0f})"); t.note_fail(); return raw_ans = input(" Measure the battery at the board terminals with a DMM.\n" " Enter true voltage (V): ").strip() try: v_true = float(raw_ans) except ValueError: print(" Not a number — skipping cal"); t.note_skip(); return # Sanity-check the operator-supplied true voltage. The system runs on a # nominal 12-24 V battery; values outside 5..30 V are almost certainly a # typo or DMM unit mistake (e.g. mV instead of V). if not (5.0 <= v_true <= 30.0): print(f" v_true={v_true:.3f} V is outside plausible 5..30 V range") t.note_fail(); return cal = single_point_offset(bat_mv, v_true, k) predicted = verify(bat_mv, cal) print(f" bat_mv={bat_mv:.0f} K={k:.10f} new OFFSET={cal.offset:+.6f} V") print(f" predicted V_bat after cal = {predicted:.3f} (true = {v_true:.3f})") # Sanity-check the computed offset. Default is 0.4 V; |offset| > 2 V means # something else is wrong (broken divider, wrong K, ADC ref off). if abs(cal.offset) > 2.0: print(f" {fmt.fail('FAIL')}: |offset|={abs(cal.offset):.3f} V exceeds 2 V — " f"check divider / K / DMM units") t.note_fail(); return wr = link.request(f"BU.PARAM SET V_SENS_OFFSET {cal.offset:.6f}") _show_response("param.set", wr) if wr.status != "OK": t.note_fail(); return # Read it back to confirm storage actually persisted what we sent. rb = link.request("BU.PARAM GET V_SENS_OFFSET") if rb.status != "OK": print(" Could not read back V_SENS_OFFSET"); t.note_fail(); return stored = rb.getf("value") if abs(stored - cal.offset) > 1e-4: print(f" {fmt.fail('FAIL')}: readback {stored:+.6f} != written {cal.offset:+.6f}") t.note_fail(); return # Verify by re-reading the ADC. Firmware's cmd_adc_once now bypasses the # EMA, so bat_V here reflects the new offset immediately. check = link.request("BU.ADC") new_V = check.getf("bat_V") err = new_V - v_true print(f" Post-cal bat_V = {new_V:.3f} (err {err*1000:+.1f} mV)") abs_err = abs(err) if abs_err < 0.040: print(f" {fmt.pass_('PASS')}: cal residual within ±40 mV") t.note_pass() elif abs_err < 0.100: print(f" {fmt.warn('WARN')}: residual {err*1000:+.1f} mV (>40, <100 mV)") t.note_warn() else: print(f" {fmt.fail('FAIL')}: residual {err*1000:+.1f} mV exceeds 100 mV") t.note_fail() # --------------------------------------------------------------------------- # Stage 4 — Discrete sensors (mandatory edges) # --------------------------------------------------------------------------- SENSOR_NAMES = ["SAFETY"] # JACK and DRIVE are checked via the relay pulse stage. def stage_sensors(link: Link, t: Tally) -> None: """Live-print safety-sensor edges until operator presses Enter. Drive and jack sensors are encoder-style and only trip while the motor runs — they're verified as a side effect of Stage 5 relay pulses. """ import threading print(fmt.stage("Stage 4 — Sensor live view")) print(" Live state of all 4 sensor pins is printed below when any one") print(" changes. Per-edge events also print as they arrive.") print(" Poke each sensor by hand / magnet / jumper to verify it responds.") print(" SAFETY must show both break and make to pass; the others are") print(" diagnostic only (drive/jack are properly tested in Stage 5).") print(" Press Enter when you're satisfied.") state = {"make": False, "break": False} last_state_line = {"v": ""} def reader() -> None: try: for item in link.request_stream("BU.SENSORS.WATCH 0", overall_timeout_s=3600): if isinstance(item, Event): if item.cmd == "sensor": name = item.fields.get("name") edge = item.fields.get("edge") if name == "SAFETY" and edge in state: state[edge] = True print(f" [{name}] {edge}") elif item.cmd == "state": # Live snapshot of all four sensors. Only print when # the level line changes, so steady state doesn't spam. f = item.fields line = (f"SAFETY={f.get('SAFETY','?')} " f"DRIVE={f.get('DRIVE','?')} " f"JACK={f.get('JACK','?')} " f"AUX={f.get('AUX','?')} " f"isr=(s={f.get('isr_s','?')} " f"d={f.get('isr_d','?')} " f"j={f.get('isr_j','?')} " f"a={f.get('isr_a','?')})") if line != last_state_line["v"]: print(f" [state] {line}") last_state_line["v"] = line elif isinstance(item, Response): # terminating OK after we aborted return except Exception as e: # pragma: no cover — defensive print(f" [reader] {e!r}") th = threading.Thread(target=reader, daemon=True) th.start() input(" Press Enter when SAFETY has been actuated: ") # Kick the firmware out of its watch loop: any byte aborts. link.send("") # just the \n th.join(timeout=3) if state["make"] and state["break"]: print(f" SAFETY: {fmt.pass_('PASS')} (saw break and make)") t.note_pass() else: missing = [k for k, v in state.items() if not v] print(f" SAFETY: {fmt.fail('FAIL')} — missed {missing}") t.note_fail() # --------------------------------------------------------------------------- # Stage 5 — Relay bridges # --------------------------------------------------------------------------- # (bridge, dir, ms, (dI_min, dI_max), check_edges) # check_edges → bridge has an encoder-style sensor; pulse must produce # at least one edge on it. RELAY_TESTS = [ ("SENSORS", "ON", 500, (0.0, 0.0), False), ("DRIVE", "FWD", 3000, (0.5, 25.0), True), ("DRIVE", "REV", 3000, (0.5, 25.0), True), ("JACK", "UP", 1200, (0.2, 25.0), True), ("JACK", "DOWN", 1200, (0.2, 25.0), True), ("AUX", "FWD", 150, (0.1, 25.0), False), ] def stage_relays(link: Link, t: Tally) -> None: print(fmt.stage("Stage 5 — Relay bridges")) print(" PRECONDITIONS:") print(" - Battery connected, fuse in place") print(" - Drive wheels off ground / disengaged") print(" - Safety interlock asserted (SAFETY sensor HIGH)") for bridge, direction, ms, (lo, hi), check_edges in RELAY_TESTS: _prompt(f" Pulse {bridge} {direction} for {ms} ms") r = link.request(f"BU.RELAY {bridge} {direction} {ms}", overall_timeout_s=ms / 1000.0 + 5.0) _show_response(f"{bridge}/{direction}", r) if r.status == "SKIP": print(" Device refused (safety?)"); t.note_skip(); continue if r.status != "OK": t.note_fail(); continue if bridge == "SENSORS": t.note_pass(); continue i_before = r.getf("I_before") i_mid = r.getf("I_mid") delta = abs(i_mid - i_before) tripped = r.geti("tripped") == 1 edges = r.geti("edges") stop = r.get("stop", "time") actual_ms = r.geti("actual_ms", ms) edge_str = f" edges={edges}" if check_edges else "" stop_str = f" stop={stop} ({actual_ms}/{ms} ms)" if stop != "time" else "" print(f" |ΔI| = {delta:.2f} A (expected {lo}-{hi}) " f"tripped={tripped}{edge_str}{stop_str}") if tripped: print(f" {fmt.fail('FAIL')}: efuse tripped"); t.note_fail(); continue if check_edges and edges <= 0: print(f" {fmt.fail('FAIL')}: {bridge} sensor saw no edges — motor not turning?") t.note_fail(); continue if lo <= delta <= hi: t.note_pass() else: print(f" {fmt.warn('WARN')}: ΔI outside nominal") t.note_warn() # --------------------------------------------------------------------------- # Stage 6 — Radio & connectivity # --------------------------------------------------------------------------- def stage_rf(link: Link, t: Tally) -> None: import threading print(fmt.stage("Stage 6a — RF 433 MHz")) _prompt(" Watch for RF remote codes") print(" Press buttons on the RF remote. Codes will print live.") print(" Press Enter to stop.") count = {"seen": 0} def reader() -> None: try: for item in link.request_stream("BU.RF.WATCH 0", overall_timeout_s=3600): if isinstance(item, Event) and item.cmd == "rf": code = item.fields.get("code", "?") print(f" rf code={code}") count["seen"] += 1 elif isinstance(item, Response): return except Exception as e: # pragma: no cover print(f" [reader] {e!r}") th = threading.Thread(target=reader, daemon=True) th.start() input(" Press Enter when done: ") link.send("") # abort the watch th.join(timeout=3) print(f" -> {count['seen']} code(s) captured") (t.note_pass if count["seen"] > 0 else t.note_warn)() def stage_wifi(link: Link, t: Tally) -> None: print(fmt.stage("Stage 6b — WiFi + web UI")) _prompt(" Start SoftAP and wait for a client to load the web UI") r = link.request("BU.WIFI.START", overall_timeout_s=20) _show_response("wifi.start", r) if r.status != "OK": t.note_fail(); return ssid = r.get("ssid", "?") print(f"\n Connect a device to WiFi SSID `{ssid}` and open http://192.168.4.1/") print(" Waiting for a client to associate and load the page... (Ctrl+C to abort)") try: for item in link.request_stream("BU.WIFI.WAIT", overall_timeout_s=3600): if isinstance(item, Event): print(f" {item.cmd} {' '.join(f'{k}={v}' for k,v in item.fields.items())}") elif isinstance(item, Response): _show_response("wifi.wait", item) (t.note_pass if item.status == "OK" else t.note_fail)() break except KeyboardInterrupt: print(" WiFi wait aborted by operator"); t.note_skip() # Push a byte so the firmware's cmd_wifi_wait breaks out of its # loop and unblocks the bring-up dispatcher; otherwise BU.END # never reaches the device and the reboot doesn't happen. # Then drain the resulting wifi.wait OK so it doesn't get # mistaken for the response to a later command. try: link.send("") deadline = time.monotonic() + 2.0 while time.monotonic() < deadline: line = link._readline(deadline) if line is None: break if isinstance(parse_line(line), Response): break except Exception: pass # --------------------------------------------------------------------------- # End # --------------------------------------------------------------------------- def stage_end(link: Link, t: Tally) -> None: print(fmt.stage("Stage — End")) _prompt(" Exit bring-up mode (device will reboot)") r = link.request("BU.END", overall_timeout_s=5) _show_response("end", r) # Device reboots; no further response expected. # --------------------------------------------------------------------------- # Top-level driver # --------------------------------------------------------------------------- Stage = Callable[[Link, Tally], None] def all_stages(skip_relays: bool = False, no_calibrate: bool = False) -> list[Stage]: stages: list[Stage] = [ stage_begin, stage_flash, stage_i2c_led, lambda link, t: stage_adc(link, t, calibrate=not no_calibrate), stage_sensors, ] if not skip_relays: stages.append(stage_relays) stages.extend([ stage_rf, stage_wifi, stage_end, ]) return stages