Files
SC-F001/bringup/stages.py
2026-04-27 17:22:34 -05:00

471 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""One function per bring-up stage. Each is explicit and independently
runnable from the operator prompt — no implicit sequencing between them."""
from __future__ import annotations
import time
from dataclasses import dataclass
from typing import Callable
import fmt
from protocol import Link, Response, Event, parse_line
@dataclass
class Tally:
passed: int = 0
failed: int = 0
skipped: int = 0
warnings: int = 0
def note_pass(self) -> None: self.passed += 1
def note_fail(self) -> None: self.failed += 1
def note_skip(self) -> None: self.skipped += 1
def note_warn(self) -> None: self.warnings += 1
def _prompt(msg: str) -> None:
"""Block until operator presses Enter (Ctrl-C still aborts)."""
input(fmt.prompt(msg) + fmt.dim(" [Enter to continue]") + ": ")
def _show_response(label: str, r: Response) -> None:
bag = " ".join(f"{k}={v}" for k, v in r.fields.items())
print(f" {fmt.status_tag(r.status)} {label} {fmt.dim(bag)}")
# ---------------------------------------------------------------------------
# Stage 0 — Begin + identify
# ---------------------------------------------------------------------------
def stage_begin(link: Link, t: Tally) -> None:
print(fmt.stage("Stage 0 — Begin"))
print(" Waiting for device to finish booting ...")
r = link.wait_ready("BU.BEGIN", per_attempt_s=1.5, overall_timeout_s=30.0)
_show_response("begin", r)
if r.status != "OK":
t.note_fail(); raise SystemExit("Device did not enter bring-up mode")
r = link.request("BU.INFO")
_show_response("info", r)
(t.note_pass if r.status == "OK" else t.note_fail)()
print(f" -> fw={r.get('fw')} board={r.get('board', '?')} reset={r.get('reset')} "
f"heap={r.get('heap')}")
# ---------------------------------------------------------------------------
# Stage 1 — Flash & persistence
# ---------------------------------------------------------------------------
def stage_flash(link: Link, t: Tally) -> None:
print(fmt.stage("Stage 1 — Flash & storage"))
_prompt(" Run flash roundtrip + log head/tail check")
r = link.request("BU.FLASH", overall_timeout_s=10)
_show_response("flash", r)
(t.note_pass if r.status == "OK" else t.note_fail)()
# ---------------------------------------------------------------------------
# Stage 2 — I2C + LEDs
# ---------------------------------------------------------------------------
def stage_i2c_led(link: Link, t: Tally) -> None:
import threading
print(fmt.stage("Stage 2 — I2C / TCA9555 / LEDs"))
_prompt(" Probe TCA9555 and run LED check")
r = link.request("BU.I2C")
_show_response("i2c", r)
if r.status != "OK":
t.note_fail(); return
t.note_pass()
# Firmware-driven LED watch: all LEDs solid when the button is released,
# fast waterfall while held. Operator actuates the button and confirms.
def reader() -> None:
try:
for item in link.request_stream("BU.LED.WATCH",
overall_timeout_s=3600):
if isinstance(item, Event) and item.cmd == "led":
state = "PRESSED" if item.fields.get("pressed") == "1" else "released"
print(f" [led] button {state}")
elif isinstance(item, Response):
return
except Exception as e: # pragma: no cover — defensive
print(f" [led-reader] {e!r}")
th = threading.Thread(target=reader, daemon=True)
th.start()
print(" LEDs should be SOLID (all on) when button is released.")
print(" Press-and-hold the button: LEDs should WATERFALL at ~3× speed.")
try:
while True:
ans = input(" LEDs behaved correctly? [y/n]: ").strip().lower()
if ans.startswith("y"):
verdict = "pass"; break
if ans.startswith("n"):
verdict = "fail"; break
finally:
link.send("") # any byte aborts BU.LED.WATCH
th.join(timeout=3)
if verdict == "pass":
t.note_pass()
else:
print(f" {fmt.fail('LED visual check FAILED')}")
t.note_fail()
# ---------------------------------------------------------------------------
# Stage 3 — ADC + battery calibration
# ---------------------------------------------------------------------------
def stage_adc(link: Link, t: Tally, calibrate: bool = True) -> None:
print(fmt.stage("Stage 3 — Analog front-end"))
_prompt(" Read ADC snapshot (battery / motor current)")
r = link.request("BU.ADC")
_show_response("adc", r)
if r.status != "OK":
t.note_fail(); return
t.note_pass()
bat_V = r.getf("bat_V", 0.0)
print(f" -> battery reports {bat_V:.3f} V")
# VOC and FAULT pins on V5 are unusable (wired direct to input-only
# ESP32 GPIOs, no external resistors — see README "V5 hardware caveats").
# They're intentionally ignored here.
if not calibrate:
return
_run_battery_cal(link, t)
def _run_battery_cal(link: Link, t: Tally) -> None:
from calibrate import single_point_offset, verify
print(fmt.section("Battery voltage calibration"))
# Read current K and raw mV.
k_r = link.request("BU.PARAM GET V_SENS_K")
if k_r.status != "OK":
print(" Could not read V_SENS_K"); t.note_fail(); return
k = k_r.getf("value")
adc_r = link.request("BU.ADC")
bat_mv = adc_r.getf("bat_mv")
# ADC noise rarely lands on exactly 0; check against a small range so a
# near-floor reading still flags as bogus.
if bat_mv < 50:
print(f" ADC read looks bogus (mv={bat_mv:.0f})"); t.note_fail(); return
raw_ans = input(" Measure the battery at the board terminals with a DMM.\n"
" Enter true voltage (V): ").strip()
try:
v_true = float(raw_ans)
except ValueError:
print(" Not a number — skipping cal"); t.note_skip(); return
# Sanity-check the operator-supplied true voltage. The system runs on a
# nominal 12-24 V battery; values outside 5..30 V are almost certainly a
# typo or DMM unit mistake (e.g. mV instead of V).
if not (5.0 <= v_true <= 30.0):
print(f" v_true={v_true:.3f} V is outside plausible 5..30 V range")
t.note_fail(); return
cal = single_point_offset(bat_mv, v_true, k)
predicted = verify(bat_mv, cal)
print(f" bat_mv={bat_mv:.0f} K={k:.10f} new OFFSET={cal.offset:+.6f} V")
print(f" predicted V_bat after cal = {predicted:.3f} (true = {v_true:.3f})")
# Sanity-check the computed offset. Default is 0.4 V; |offset| > 2 V means
# something else is wrong (broken divider, wrong K, ADC ref off).
if abs(cal.offset) > 2.0:
print(f" {fmt.fail('FAIL')}: |offset|={abs(cal.offset):.3f} V exceeds 2 V — "
f"check divider / K / DMM units")
t.note_fail(); return
wr = link.request(f"BU.PARAM SET V_SENS_OFFSET {cal.offset:.6f}")
_show_response("param.set", wr)
if wr.status != "OK":
t.note_fail(); return
# Read it back to confirm storage actually persisted what we sent.
rb = link.request("BU.PARAM GET V_SENS_OFFSET")
if rb.status != "OK":
print(" Could not read back V_SENS_OFFSET"); t.note_fail(); return
stored = rb.getf("value")
if abs(stored - cal.offset) > 1e-4:
print(f" {fmt.fail('FAIL')}: readback {stored:+.6f} != written {cal.offset:+.6f}")
t.note_fail(); return
# Verify by re-reading the ADC. Firmware's cmd_adc_once now bypasses the
# EMA, so bat_V here reflects the new offset immediately.
check = link.request("BU.ADC")
new_V = check.getf("bat_V")
err = new_V - v_true
print(f" Post-cal bat_V = {new_V:.3f} (err {err*1000:+.1f} mV)")
abs_err = abs(err)
if abs_err < 0.020:
print(f" {fmt.pass_('PASS')}: cal residual within ±20 mV")
t.note_pass()
elif abs_err < 0.050:
print(f" {fmt.warn('WARN')}: residual {err*1000:+.1f} mV (>20, <50 mV)")
t.note_warn()
else:
print(f" {fmt.fail('FAIL')}: residual {err*1000:+.1f} mV exceeds 50 mV")
t.note_fail()
# ---------------------------------------------------------------------------
# Stage 4 — Discrete sensors (mandatory edges)
# ---------------------------------------------------------------------------
SENSOR_NAMES = ["SAFETY"] # JACK and DRIVE are checked via the relay pulse stage.
def stage_sensors(link: Link, t: Tally) -> None:
"""Live-print safety-sensor edges until operator presses Enter.
Drive and jack sensors are encoder-style and only trip while the motor
runs — they're verified as a side effect of Stage 5 relay pulses.
"""
import threading
print(fmt.stage("Stage 4 — Sensor live view"))
print(" Live state of all 4 sensor pins is printed below when any one")
print(" changes. Per-edge events also print as they arrive.")
print(" Poke each sensor by hand / magnet / jumper to verify it responds.")
print(" SAFETY must show both break and make to pass; the others are")
print(" diagnostic only (drive/jack are properly tested in Stage 5).")
print(" Press Enter when you're satisfied.")
state = {"make": False, "break": False}
last_state_line = {"v": ""}
def reader() -> None:
try:
for item in link.request_stream("BU.SENSORS.WATCH 0",
overall_timeout_s=3600):
if isinstance(item, Event):
if item.cmd == "sensor":
name = item.fields.get("name")
edge = item.fields.get("edge")
if name == "SAFETY" and edge in state:
state[edge] = True
print(f" [{name}] {edge}")
elif item.cmd == "state":
# Live snapshot of all four sensors. Only print when
# the level line changes, so steady state doesn't spam.
f = item.fields
line = (f"SAFETY={f.get('SAFETY','?')} "
f"DRIVE={f.get('DRIVE','?')} "
f"JACK={f.get('JACK','?')} "
f"AUX={f.get('AUX','?')} "
f"isr=(s={f.get('isr_s','?')} "
f"d={f.get('isr_d','?')} "
f"j={f.get('isr_j','?')} "
f"a={f.get('isr_a','?')})")
if line != last_state_line["v"]:
print(f" [state] {line}")
last_state_line["v"] = line
elif isinstance(item, Response):
# terminating OK after we aborted
return
except Exception as e: # pragma: no cover — defensive
print(f" [reader] {e!r}")
th = threading.Thread(target=reader, daemon=True)
th.start()
input(" Press Enter when SAFETY has been actuated: ")
# Kick the firmware out of its watch loop: any byte aborts.
link.send("") # just the \n
th.join(timeout=3)
if state["make"] and state["break"]:
print(f" SAFETY: {fmt.pass_('PASS')} (saw break and make)")
t.note_pass()
else:
missing = [k for k, v in state.items() if not v]
print(f" SAFETY: {fmt.fail('FAIL')} — missed {missing}")
t.note_fail()
# ---------------------------------------------------------------------------
# Stage 5 — Relay bridges
# ---------------------------------------------------------------------------
# (bridge, dir, ms, (dI_min, dI_max), check_edges)
# check_edges → bridge has an encoder-style sensor; pulse must produce
# at least one edge on it.
RELAY_TESTS = [
("SENSORS", "ON", 500, (0.0, 0.0), False),
("DRIVE", "FWD", 3000, (0.5, 25.0), True),
("DRIVE", "REV", 3000, (0.5, 25.0), True),
("JACK", "UP", 1200, (0.2, 25.0), True),
("JACK", "DOWN", 1200, (0.2, 25.0), True),
("AUX", "FWD", 150, (0.1, 25.0), False),
]
def stage_relays(link: Link, t: Tally) -> None:
print(fmt.stage("Stage 5 — Relay bridges"))
print(" PRECONDITIONS:")
print(" - Battery connected, fuse in place")
print(" - Drive wheels off ground / disengaged")
print(" - Safety interlock asserted (SAFETY sensor HIGH)")
for bridge, direction, ms, (lo, hi), check_edges in RELAY_TESTS:
_prompt(f" Pulse {bridge} {direction} for {ms} ms")
r = link.request(f"BU.RELAY {bridge} {direction} {ms}",
overall_timeout_s=ms / 1000.0 + 5.0)
_show_response(f"{bridge}/{direction}", r)
if r.status == "SKIP":
print(" Device refused (safety?)"); t.note_skip(); continue
if r.status != "OK":
t.note_fail(); continue
if bridge == "SENSORS":
t.note_pass(); continue
i_before = r.getf("I_before")
i_mid = r.getf("I_mid")
delta = abs(i_mid - i_before)
tripped = r.geti("tripped") == 1
edges = r.geti("edges")
stop = r.get("stop", "time")
actual_ms = r.geti("actual_ms", ms)
edge_str = f" edges={edges}" if check_edges else ""
stop_str = f" stop={stop} ({actual_ms}/{ms} ms)" if stop != "time" else ""
print(f" |ΔI| = {delta:.2f} A (expected {lo}-{hi}) "
f"tripped={tripped}{edge_str}{stop_str}")
if tripped:
print(f" {fmt.fail('FAIL')}: efuse tripped"); t.note_fail(); continue
if check_edges and edges <= 0:
print(f" {fmt.fail('FAIL')}: {bridge} sensor saw no edges — motor not turning?")
t.note_fail(); continue
if lo <= delta <= hi:
t.note_pass()
else:
print(f" {fmt.warn('WARN')}: ΔI outside nominal")
t.note_warn()
# ---------------------------------------------------------------------------
# Stage 6 — Radio & connectivity
# ---------------------------------------------------------------------------
def stage_rf(link: Link, t: Tally) -> None:
import threading
print(fmt.stage("Stage 6a — RF 433 MHz"))
_prompt(" Watch for RF remote codes")
print(" Press buttons on the RF remote. Codes will print live.")
print(" Press Enter to stop.")
count = {"seen": 0}
def reader() -> None:
try:
for item in link.request_stream("BU.RF.WATCH 0",
overall_timeout_s=3600):
if isinstance(item, Event) and item.cmd == "rf":
code = item.fields.get("code", "?")
print(f" rf code={code}")
count["seen"] += 1
elif isinstance(item, Response):
return
except Exception as e: # pragma: no cover
print(f" [reader] {e!r}")
th = threading.Thread(target=reader, daemon=True)
th.start()
input(" Press Enter when done: ")
link.send("") # abort the watch
th.join(timeout=3)
print(f" -> {count['seen']} code(s) captured")
(t.note_pass if count["seen"] > 0 else t.note_warn)()
def stage_wifi(link: Link, t: Tally) -> None:
print(fmt.stage("Stage 6b — WiFi + web UI"))
_prompt(" Start SoftAP and wait for a client to load the web UI")
r = link.request("BU.WIFI.START", overall_timeout_s=20)
_show_response("wifi.start", r)
if r.status != "OK":
t.note_fail(); return
ssid = r.get("ssid", "?")
print(f"\n Connect a device to WiFi SSID `{ssid}` and open http://192.168.4.1/")
print(" Waiting for a client to associate and load the page... (Ctrl+C to abort)")
try:
for item in link.request_stream("BU.WIFI.WAIT", overall_timeout_s=3600):
if isinstance(item, Event):
print(f" {item.cmd} {' '.join(f'{k}={v}' for k,v in item.fields.items())}")
elif isinstance(item, Response):
_show_response("wifi.wait", item)
(t.note_pass if item.status == "OK" else t.note_fail)()
break
except KeyboardInterrupt:
print(" WiFi wait aborted by operator"); t.note_skip()
# Push a byte so the firmware's cmd_wifi_wait breaks out of its
# loop and unblocks the bring-up dispatcher; otherwise BU.END
# never reaches the device and the reboot doesn't happen.
# Then drain the resulting wifi.wait OK so it doesn't get
# mistaken for the response to a later command.
try:
link.send("")
deadline = time.monotonic() + 2.0
while time.monotonic() < deadline:
line = link._readline(deadline)
if line is None:
break
if isinstance(parse_line(line), Response):
break
except Exception:
pass
# ---------------------------------------------------------------------------
# End
# ---------------------------------------------------------------------------
def stage_end(link: Link, t: Tally) -> None:
print(fmt.stage("Stage — End"))
_prompt(" Exit bring-up mode (device will reboot)")
r = link.request("BU.END", overall_timeout_s=5)
_show_response("end", r)
# Device reboots; no further response expected.
# ---------------------------------------------------------------------------
# Top-level driver
# ---------------------------------------------------------------------------
Stage = Callable[[Link, Tally], None]
def all_stages(skip_relays: bool = False, no_calibrate: bool = False) -> list[Stage]:
stages: list[Stage] = [
stage_begin,
stage_flash,
stage_i2c_led,
lambda link, t: stage_adc(link, t, calibrate=not no_calibrate),
stage_sensors,
]
if not skip_relays:
stages.append(stage_relays)
stages.extend([
stage_rf,
stage_wifi,
stage_end,
])
return stages