471 lines
18 KiB
Python
471 lines
18 KiB
Python
"""One function per bring-up stage. Each is explicit and independently
|
||
runnable from the operator prompt — no implicit sequencing between them."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import time
|
||
from dataclasses import dataclass
|
||
from typing import Callable
|
||
|
||
import fmt
|
||
from protocol import Link, Response, Event, parse_line
|
||
|
||
|
||
@dataclass
|
||
class Tally:
|
||
passed: int = 0
|
||
failed: int = 0
|
||
skipped: int = 0
|
||
warnings: int = 0
|
||
|
||
def note_pass(self) -> None: self.passed += 1
|
||
def note_fail(self) -> None: self.failed += 1
|
||
def note_skip(self) -> None: self.skipped += 1
|
||
def note_warn(self) -> None: self.warnings += 1
|
||
|
||
|
||
def _prompt(msg: str) -> None:
|
||
"""Block until operator presses Enter (Ctrl-C still aborts)."""
|
||
input(fmt.prompt(msg) + fmt.dim(" [Enter to continue]") + ": ")
|
||
|
||
|
||
def _show_response(label: str, r: Response) -> None:
|
||
bag = " ".join(f"{k}={v}" for k, v in r.fields.items())
|
||
print(f" {fmt.status_tag(r.status)} {label} {fmt.dim(bag)}")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Stage 0 — Begin + identify
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def stage_begin(link: Link, t: Tally) -> None:
|
||
print(fmt.stage("Stage 0 — Begin"))
|
||
print(" Waiting for device to finish booting ...")
|
||
r = link.wait_ready("BU.BEGIN", per_attempt_s=1.5, overall_timeout_s=30.0)
|
||
_show_response("begin", r)
|
||
if r.status != "OK":
|
||
t.note_fail(); raise SystemExit("Device did not enter bring-up mode")
|
||
r = link.request("BU.INFO")
|
||
_show_response("info", r)
|
||
(t.note_pass if r.status == "OK" else t.note_fail)()
|
||
print(f" -> fw={r.get('fw')} board={r.get('board', '?')} reset={r.get('reset')} "
|
||
f"heap={r.get('heap')}")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Stage 1 — Flash & persistence
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def stage_flash(link: Link, t: Tally) -> None:
|
||
print(fmt.stage("Stage 1 — Flash & storage"))
|
||
_prompt(" Run flash roundtrip + log head/tail check")
|
||
r = link.request("BU.FLASH", overall_timeout_s=10)
|
||
_show_response("flash", r)
|
||
(t.note_pass if r.status == "OK" else t.note_fail)()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Stage 2 — I2C + LEDs
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def stage_i2c_led(link: Link, t: Tally) -> None:
|
||
import threading
|
||
|
||
print(fmt.stage("Stage 2 — I2C / TCA9555 / LEDs"))
|
||
_prompt(" Probe TCA9555 and run LED check")
|
||
|
||
r = link.request("BU.I2C")
|
||
_show_response("i2c", r)
|
||
if r.status != "OK":
|
||
t.note_fail(); return
|
||
t.note_pass()
|
||
|
||
# Firmware-driven LED watch: all LEDs solid when the button is released,
|
||
# fast waterfall while held. Operator actuates the button and confirms.
|
||
def reader() -> None:
|
||
try:
|
||
for item in link.request_stream("BU.LED.WATCH",
|
||
overall_timeout_s=3600):
|
||
if isinstance(item, Event) and item.cmd == "led":
|
||
state = "PRESSED" if item.fields.get("pressed") == "1" else "released"
|
||
print(f" [led] button {state}")
|
||
elif isinstance(item, Response):
|
||
return
|
||
except Exception as e: # pragma: no cover — defensive
|
||
print(f" [led-reader] {e!r}")
|
||
|
||
th = threading.Thread(target=reader, daemon=True)
|
||
th.start()
|
||
|
||
print(" LEDs should be SOLID (all on) when button is released.")
|
||
print(" Press-and-hold the button: LEDs should WATERFALL at ~3× speed.")
|
||
try:
|
||
while True:
|
||
ans = input(" LEDs behaved correctly? [y/n]: ").strip().lower()
|
||
if ans.startswith("y"):
|
||
verdict = "pass"; break
|
||
if ans.startswith("n"):
|
||
verdict = "fail"; break
|
||
finally:
|
||
link.send("") # any byte aborts BU.LED.WATCH
|
||
th.join(timeout=3)
|
||
|
||
if verdict == "pass":
|
||
t.note_pass()
|
||
else:
|
||
print(f" {fmt.fail('LED visual check FAILED')}")
|
||
t.note_fail()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Stage 3 — ADC + battery calibration
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def stage_adc(link: Link, t: Tally, calibrate: bool = True) -> None:
|
||
print(fmt.stage("Stage 3 — Analog front-end"))
|
||
_prompt(" Read ADC snapshot (battery / motor current)")
|
||
|
||
r = link.request("BU.ADC")
|
||
_show_response("adc", r)
|
||
if r.status != "OK":
|
||
t.note_fail(); return
|
||
t.note_pass()
|
||
|
||
bat_V = r.getf("bat_V", 0.0)
|
||
print(f" -> battery reports {bat_V:.3f} V")
|
||
|
||
# VOC and FAULT pins on V5 are unusable (wired direct to input-only
|
||
# ESP32 GPIOs, no external resistors — see README "V5 hardware caveats").
|
||
# They're intentionally ignored here.
|
||
|
||
if not calibrate:
|
||
return
|
||
_run_battery_cal(link, t)
|
||
|
||
|
||
def _run_battery_cal(link: Link, t: Tally) -> None:
|
||
from calibrate import single_point_offset, verify
|
||
|
||
print(fmt.section("Battery voltage calibration"))
|
||
|
||
# Read current K and raw mV.
|
||
k_r = link.request("BU.PARAM GET V_SENS_K")
|
||
if k_r.status != "OK":
|
||
print(" Could not read V_SENS_K"); t.note_fail(); return
|
||
k = k_r.getf("value")
|
||
|
||
adc_r = link.request("BU.ADC")
|
||
bat_mv = adc_r.getf("bat_mv")
|
||
# ADC noise rarely lands on exactly 0; check against a small range so a
|
||
# near-floor reading still flags as bogus.
|
||
if bat_mv < 50:
|
||
print(f" ADC read looks bogus (mv={bat_mv:.0f})"); t.note_fail(); return
|
||
|
||
raw_ans = input(" Measure the battery at the board terminals with a DMM.\n"
|
||
" Enter true voltage (V): ").strip()
|
||
try:
|
||
v_true = float(raw_ans)
|
||
except ValueError:
|
||
print(" Not a number — skipping cal"); t.note_skip(); return
|
||
|
||
# Sanity-check the operator-supplied true voltage. The system runs on a
|
||
# nominal 12-24 V battery; values outside 5..30 V are almost certainly a
|
||
# typo or DMM unit mistake (e.g. mV instead of V).
|
||
if not (5.0 <= v_true <= 30.0):
|
||
print(f" v_true={v_true:.3f} V is outside plausible 5..30 V range")
|
||
t.note_fail(); return
|
||
|
||
cal = single_point_offset(bat_mv, v_true, k)
|
||
predicted = verify(bat_mv, cal)
|
||
print(f" bat_mv={bat_mv:.0f} K={k:.10f} new OFFSET={cal.offset:+.6f} V")
|
||
print(f" predicted V_bat after cal = {predicted:.3f} (true = {v_true:.3f})")
|
||
|
||
# Sanity-check the computed offset. Default is 0.4 V; |offset| > 2 V means
|
||
# something else is wrong (broken divider, wrong K, ADC ref off).
|
||
if abs(cal.offset) > 2.0:
|
||
print(f" {fmt.fail('FAIL')}: |offset|={abs(cal.offset):.3f} V exceeds 2 V — "
|
||
f"check divider / K / DMM units")
|
||
t.note_fail(); return
|
||
|
||
wr = link.request(f"BU.PARAM SET V_SENS_OFFSET {cal.offset:.6f}")
|
||
_show_response("param.set", wr)
|
||
if wr.status != "OK":
|
||
t.note_fail(); return
|
||
|
||
# Read it back to confirm storage actually persisted what we sent.
|
||
rb = link.request("BU.PARAM GET V_SENS_OFFSET")
|
||
if rb.status != "OK":
|
||
print(" Could not read back V_SENS_OFFSET"); t.note_fail(); return
|
||
stored = rb.getf("value")
|
||
if abs(stored - cal.offset) > 1e-4:
|
||
print(f" {fmt.fail('FAIL')}: readback {stored:+.6f} != written {cal.offset:+.6f}")
|
||
t.note_fail(); return
|
||
|
||
# Verify by re-reading the ADC. Firmware's cmd_adc_once now bypasses the
|
||
# EMA, so bat_V here reflects the new offset immediately.
|
||
check = link.request("BU.ADC")
|
||
new_V = check.getf("bat_V")
|
||
err = new_V - v_true
|
||
print(f" Post-cal bat_V = {new_V:.3f} (err {err*1000:+.1f} mV)")
|
||
abs_err = abs(err)
|
||
if abs_err < 0.040:
|
||
print(f" {fmt.pass_('PASS')}: cal residual within ±40 mV")
|
||
t.note_pass()
|
||
elif abs_err < 0.100:
|
||
print(f" {fmt.warn('WARN')}: residual {err*1000:+.1f} mV (>40, <100 mV)")
|
||
t.note_warn()
|
||
else:
|
||
print(f" {fmt.fail('FAIL')}: residual {err*1000:+.1f} mV exceeds 100 mV")
|
||
t.note_fail()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Stage 4 — Discrete sensors (mandatory edges)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
SENSOR_NAMES = ["SAFETY"] # JACK and DRIVE are checked via the relay pulse stage.
|
||
|
||
|
||
def stage_sensors(link: Link, t: Tally) -> None:
|
||
"""Live-print safety-sensor edges until operator presses Enter.
|
||
|
||
Drive and jack sensors are encoder-style and only trip while the motor
|
||
runs — they're verified as a side effect of Stage 5 relay pulses.
|
||
"""
|
||
import threading
|
||
|
||
print(fmt.stage("Stage 4 — Sensor live view"))
|
||
print(" Live state of all 4 sensor pins is printed below when any one")
|
||
print(" changes. Per-edge events also print as they arrive.")
|
||
print(" Poke each sensor by hand / magnet / jumper to verify it responds.")
|
||
print(" SAFETY must show both break and make to pass; the others are")
|
||
print(" diagnostic only (drive/jack are properly tested in Stage 5).")
|
||
print(" Press Enter when you're satisfied.")
|
||
|
||
state = {"make": False, "break": False}
|
||
|
||
last_state_line = {"v": ""}
|
||
|
||
def reader() -> None:
|
||
try:
|
||
for item in link.request_stream("BU.SENSORS.WATCH 0",
|
||
overall_timeout_s=3600):
|
||
if isinstance(item, Event):
|
||
if item.cmd == "sensor":
|
||
name = item.fields.get("name")
|
||
edge = item.fields.get("edge")
|
||
if name == "SAFETY" and edge in state:
|
||
state[edge] = True
|
||
print(f" [{name}] {edge}")
|
||
elif item.cmd == "state":
|
||
# Live snapshot of all four sensors. Only print when
|
||
# the level line changes, so steady state doesn't spam.
|
||
f = item.fields
|
||
line = (f"SAFETY={f.get('SAFETY','?')} "
|
||
f"DRIVE={f.get('DRIVE','?')} "
|
||
f"JACK={f.get('JACK','?')} "
|
||
f"AUX={f.get('AUX','?')} "
|
||
f"isr=(s={f.get('isr_s','?')} "
|
||
f"d={f.get('isr_d','?')} "
|
||
f"j={f.get('isr_j','?')} "
|
||
f"a={f.get('isr_a','?')})")
|
||
if line != last_state_line["v"]:
|
||
print(f" [state] {line}")
|
||
last_state_line["v"] = line
|
||
elif isinstance(item, Response):
|
||
# terminating OK after we aborted
|
||
return
|
||
except Exception as e: # pragma: no cover — defensive
|
||
print(f" [reader] {e!r}")
|
||
|
||
th = threading.Thread(target=reader, daemon=True)
|
||
th.start()
|
||
|
||
input(" Press Enter when SAFETY has been actuated: ")
|
||
|
||
# Kick the firmware out of its watch loop: any byte aborts.
|
||
link.send("") # just the \n
|
||
th.join(timeout=3)
|
||
|
||
if state["make"] and state["break"]:
|
||
print(f" SAFETY: {fmt.pass_('PASS')} (saw break and make)")
|
||
t.note_pass()
|
||
else:
|
||
missing = [k for k, v in state.items() if not v]
|
||
print(f" SAFETY: {fmt.fail('FAIL')} — missed {missing}")
|
||
t.note_fail()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Stage 5 — Relay bridges
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# (bridge, dir, ms, (dI_min, dI_max), check_edges)
|
||
# check_edges → bridge has an encoder-style sensor; pulse must produce
|
||
# at least one edge on it.
|
||
RELAY_TESTS = [
|
||
("SENSORS", "ON", 500, (0.0, 0.0), False),
|
||
("DRIVE", "FWD", 3000, (0.5, 25.0), True),
|
||
("DRIVE", "REV", 3000, (0.5, 25.0), True),
|
||
("JACK", "UP", 1200, (0.2, 25.0), True),
|
||
("JACK", "DOWN", 1200, (0.2, 25.0), True),
|
||
("AUX", "FWD", 150, (0.1, 25.0), False),
|
||
]
|
||
|
||
|
||
def stage_relays(link: Link, t: Tally) -> None:
|
||
print(fmt.stage("Stage 5 — Relay bridges"))
|
||
print(" PRECONDITIONS:")
|
||
print(" - Battery connected, fuse in place")
|
||
print(" - Drive wheels off ground / disengaged")
|
||
print(" - Safety interlock asserted (SAFETY sensor HIGH)")
|
||
|
||
for bridge, direction, ms, (lo, hi), check_edges in RELAY_TESTS:
|
||
_prompt(f" Pulse {bridge} {direction} for {ms} ms")
|
||
r = link.request(f"BU.RELAY {bridge} {direction} {ms}",
|
||
overall_timeout_s=ms / 1000.0 + 5.0)
|
||
_show_response(f"{bridge}/{direction}", r)
|
||
if r.status == "SKIP":
|
||
print(" Device refused (safety?)"); t.note_skip(); continue
|
||
if r.status != "OK":
|
||
t.note_fail(); continue
|
||
if bridge == "SENSORS":
|
||
t.note_pass(); continue
|
||
|
||
i_before = r.getf("I_before")
|
||
i_mid = r.getf("I_mid")
|
||
delta = abs(i_mid - i_before)
|
||
tripped = r.geti("tripped") == 1
|
||
edges = r.geti("edges")
|
||
stop = r.get("stop", "time")
|
||
actual_ms = r.geti("actual_ms", ms)
|
||
edge_str = f" edges={edges}" if check_edges else ""
|
||
stop_str = f" stop={stop} ({actual_ms}/{ms} ms)" if stop != "time" else ""
|
||
print(f" |ΔI| = {delta:.2f} A (expected {lo}-{hi}) "
|
||
f"tripped={tripped}{edge_str}{stop_str}")
|
||
|
||
if tripped:
|
||
print(f" {fmt.fail('FAIL')}: efuse tripped"); t.note_fail(); continue
|
||
if check_edges and edges <= 0:
|
||
print(f" {fmt.fail('FAIL')}: {bridge} sensor saw no edges — motor not turning?")
|
||
t.note_fail(); continue
|
||
if lo <= delta <= hi:
|
||
t.note_pass()
|
||
else:
|
||
print(f" {fmt.warn('WARN')}: ΔI outside nominal")
|
||
t.note_warn()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Stage 6 — Radio & connectivity
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def stage_rf(link: Link, t: Tally) -> None:
|
||
import threading
|
||
|
||
print(fmt.stage("Stage 6a — RF 433 MHz"))
|
||
_prompt(" Watch for RF remote codes")
|
||
|
||
print(" Press buttons on the RF remote. Codes will print live.")
|
||
print(" Press Enter to stop.")
|
||
|
||
count = {"seen": 0}
|
||
|
||
def reader() -> None:
|
||
try:
|
||
for item in link.request_stream("BU.RF.WATCH 0",
|
||
overall_timeout_s=3600):
|
||
if isinstance(item, Event) and item.cmd == "rf":
|
||
code = item.fields.get("code", "?")
|
||
print(f" rf code={code}")
|
||
count["seen"] += 1
|
||
elif isinstance(item, Response):
|
||
return
|
||
except Exception as e: # pragma: no cover
|
||
print(f" [reader] {e!r}")
|
||
|
||
th = threading.Thread(target=reader, daemon=True)
|
||
th.start()
|
||
|
||
input(" Press Enter when done: ")
|
||
link.send("") # abort the watch
|
||
th.join(timeout=3)
|
||
|
||
print(f" -> {count['seen']} code(s) captured")
|
||
(t.note_pass if count["seen"] > 0 else t.note_warn)()
|
||
|
||
|
||
def stage_wifi(link: Link, t: Tally) -> None:
|
||
print(fmt.stage("Stage 6b — WiFi + web UI"))
|
||
_prompt(" Start SoftAP and wait for a client to load the web UI")
|
||
r = link.request("BU.WIFI.START", overall_timeout_s=20)
|
||
_show_response("wifi.start", r)
|
||
if r.status != "OK":
|
||
t.note_fail(); return
|
||
ssid = r.get("ssid", "?")
|
||
print(f"\n Connect a device to WiFi SSID `{ssid}` and open http://192.168.4.1/")
|
||
print(" Waiting for a client to associate and load the page... (Ctrl+C to abort)")
|
||
try:
|
||
for item in link.request_stream("BU.WIFI.WAIT", overall_timeout_s=3600):
|
||
if isinstance(item, Event):
|
||
print(f" {item.cmd} {' '.join(f'{k}={v}' for k,v in item.fields.items())}")
|
||
elif isinstance(item, Response):
|
||
_show_response("wifi.wait", item)
|
||
(t.note_pass if item.status == "OK" else t.note_fail)()
|
||
break
|
||
except KeyboardInterrupt:
|
||
print(" WiFi wait aborted by operator"); t.note_skip()
|
||
# Push a byte so the firmware's cmd_wifi_wait breaks out of its
|
||
# loop and unblocks the bring-up dispatcher; otherwise BU.END
|
||
# never reaches the device and the reboot doesn't happen.
|
||
# Then drain the resulting wifi.wait OK so it doesn't get
|
||
# mistaken for the response to a later command.
|
||
try:
|
||
link.send("")
|
||
deadline = time.monotonic() + 2.0
|
||
while time.monotonic() < deadline:
|
||
line = link._readline(deadline)
|
||
if line is None:
|
||
break
|
||
if isinstance(parse_line(line), Response):
|
||
break
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# End
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def stage_end(link: Link, t: Tally) -> None:
|
||
print(fmt.stage("Stage — End"))
|
||
_prompt(" Exit bring-up mode (device will reboot)")
|
||
r = link.request("BU.END", overall_timeout_s=5)
|
||
_show_response("end", r)
|
||
# Device reboots; no further response expected.
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Top-level driver
|
||
# ---------------------------------------------------------------------------
|
||
|
||
Stage = Callable[[Link, Tally], None]
|
||
|
||
|
||
def all_stages(skip_relays: bool = False, no_calibrate: bool = False) -> list[Stage]:
|
||
stages: list[Stage] = [
|
||
stage_begin,
|
||
stage_flash,
|
||
stage_i2c_led,
|
||
lambda link, t: stage_adc(link, t, calibrate=not no_calibrate),
|
||
stage_sensors,
|
||
]
|
||
if not skip_relays:
|
||
stages.append(stage_relays)
|
||
stages.extend([
|
||
stage_rf,
|
||
stage_wifi,
|
||
stage_end,
|
||
])
|
||
return stages
|