Files
SC-F001/logtool/rtc_test.py
Thaddeus Hughes f4077e5e26 rtc craziness
2026-03-11 09:01:32 -05:00

291 lines
9.0 KiB
Python

#!/usr/bin/env python3
"""
rtc_test.py -- SC-F001 RTC timekeeping monitor
1. Reset board and wait for boot
2. Sync device RTC to host clock
3. Stream UART continuously -- no sleep commands issued
4. Every time the firmware logs a TIME line, record host time vs device time and print diff
5. Ctrl+C to stop
The device will sleep naturally after 180s of inactivity and wake every 120s,
producing a TIME line on each wakeup. This script catches each one.
Firmware emits parseable lines of the form:
I (uptime) RTC: TIME unix=<seconds> src=<SYNC|SLEEP|CRASH> uptime=<seconds>s
Generates two log files automatically:
rtc_raw_TIMESTAMP.txt -- every UART byte in/out, wall-clock timestamped
rtc_analysis_TIMESTAMP.txt -- clean parsed comparison table, no ANSI codes
Usage:
pip install pyserial
python logtool/rtc_test.py COM3
python logtool/rtc_test.py COM3 --verbose # also print raw device output
"""
import serial
import time
import re
import sys
import argparse
from datetime import datetime, timezone
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
BAUD = 115200
BOOT_TIMEOUT = 35 # seconds to wait for prompt after reset
_ANSI = re.compile(r"\x1b\[[0-9;]*[A-Za-z]")
# Matches: I (123) RTC: TIME unix=1773167687 src=SLEEP uptime=5s
_TIME_RE = re.compile(r"TIME unix=(\d+) src=(\S+) uptime=(\d+)s")
def _strip(s):
return _ANSI.sub("", s)
def _ts():
return datetime.now().strftime("%H:%M:%S.%f")[:-3]
def _utc(unix_s):
return datetime.fromtimestamp(int(unix_s), tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
# ---------------------------------------------------------------------------
# Log handles
# ---------------------------------------------------------------------------
_raw_log = None
_analysis_log = None
def _rawlog(direction, text):
if not _raw_log:
return
for line in text.splitlines(keepends=True):
_raw_log.write(f"[{_ts()} {direction}] {_strip(line)}")
if text and not text.endswith("\n"):
_raw_log.write("\n")
_raw_log.flush()
def aprint(*args, **kwargs):
"""Print to console AND analysis log (ANSI stripped in file)."""
end = kwargs.get("end", "\n")
text = " ".join(str(a) for a in args) + end
sys.stdout.write(text)
sys.stdout.flush()
if _analysis_log:
_analysis_log.write(_strip(text))
_analysis_log.flush()
# ---------------------------------------------------------------------------
# Serial helpers
# ---------------------------------------------------------------------------
def reset_board(ser):
"""Pulse EN via RTS; keep DTR=False so GPIO0 stays HIGH (normal boot)."""
ser.dtr = False
ser.rts = True
time.sleep(0.1)
ser.rts = False
time.sleep(0.05)
def wait_for_boot(ser, timeout=BOOT_TIMEOUT, verbose=False):
"""
Read until the UART prompt (\\n> ) appears anywhere in the buffer.
Nudges the device every 3s in case the prompt is buried under log spam.
"""
buf = ""
deadline = time.time() + timeout
last_nudge = time.time()
prompt_seen_at = None
while time.time() < deadline:
chunk = ser.read(256).decode(errors="replace")
if chunk:
buf += chunk
_rawlog("RX", chunk)
if verbose:
sys.stdout.write(chunk)
sys.stdout.flush()
if prompt_seen_at is None and ("\n> " in buf or "\r\n> " in buf):
prompt_seen_at = time.time()
if prompt_seen_at is not None and time.time() - prompt_seen_at >= 0.20:
return buf
if time.time() - last_nudge > 3.0:
ser.write(b"\r\n")
_rawlog("TX", "\r\n")
last_nudge = time.time()
time.sleep(0.05)
raise TimeoutError(f"Boot prompt not seen within {timeout}s")
def send_cmd(ser, cmd, timeout=10.0, verbose=False):
"""Send a command and wait for the next prompt."""
ser.read_all()
ser.write((cmd + "\r\n").encode())
_rawlog("TX", cmd + "\r\n")
buf = ""
deadline = time.time() + timeout
prompt_seen_at = None
while time.time() < deadline:
chunk = ser.read(256).decode(errors="replace")
if chunk:
buf += chunk
_rawlog("RX", chunk)
if verbose:
sys.stdout.write(chunk)
sys.stdout.flush()
if prompt_seen_at is None and ("\n> " in buf or "\r\n> " in buf):
prompt_seen_at = time.time()
if prompt_seen_at is not None and time.time() - prompt_seen_at >= 0.20:
return buf
time.sleep(0.05)
return buf # return whatever we got even on timeout
# ---------------------------------------------------------------------------
# Comparison table
# ---------------------------------------------------------------------------
_HDR = (
"\n"
" # Wall clock (host) Device time (UTC) Diff Source Uptime\n"
" -- ------------------- ------------------- ------ -------- ------"
)
_ROW = " {n:>2} {ht:<19} {dt:<19} {diff:>+6}s {src:<8} {up}s"
def _table_row(n, host_ts, device_unix, src, uptime_s):
diff = device_unix - host_ts
return _ROW.format(
n = n,
ht = _utc(host_ts),
dt = _utc(device_unix),
diff = diff,
src = src,
up = uptime_s,
)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
global _raw_log, _analysis_log
parser = argparse.ArgumentParser(description="SC-F001 RTC timekeeping monitor")
parser.add_argument("port", help="Serial port, e.g. COM3 or /dev/ttyUSB0")
parser.add_argument("--boot-timeout", type=float, default=BOOT_TIMEOUT)
parser.add_argument("--verbose", action="store_true",
help="Print raw device output to console (always in raw log)")
args = parser.parse_args()
stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
raw_path = f"logtool/rtc_raw_{stamp}.txt"
analysis_path = f"logtool/rtc_analysis_{stamp}.txt"
_raw_log = open(raw_path, "w", encoding="utf-8", errors="replace")
_analysis_log = open(analysis_path, "w", encoding="utf-8", errors="replace")
aprint(f"Raw log : {raw_path}")
aprint(f"Analysis log : {analysis_path}")
aprint(f"Port : {args.port} {BAUD} baud")
aprint(f"Ctrl+C to stop.\n")
# ---- Open port -------------------------------------------------------- #
ser = serial.Serial(args.port, BAUD, timeout=0.2)
ser.dtr = False
ser.rts = False
time.sleep(0.2)
ser.read_all()
# ---- Reset and wait for boot ------------------------------------------ #
aprint(f"[{_ts()}] Resetting board...")
reset_board(ser)
try:
wait_for_boot(ser, timeout=args.boot_timeout, verbose=args.verbose)
except TimeoutError as e:
aprint(f"ERROR: {e}")
ser.close(); sys.exit(1)
aprint(f"[{_ts()}] Boot complete.\n")
# ---- Sync RTC --------------------------------------------------------- #
host_sync = int(time.time())
aprint(f"[{_ts()}] Syncing device RTC to host: {host_sync} ({_utc(host_sync)} UTC)...")
resp = send_cmd(ser, f'POST: {{"time": {host_sync}}}', verbose=args.verbose)
if "ok" in resp.lower():
aprint(f"[{_ts()}] Sync OK.\n")
else:
aprint(f"[{_ts()}] WARNING: unexpected response: {_strip(resp.strip())}\n")
# ---- Table header ----------------------------------------------------- #
aprint(_HDR)
# ---- Stream loop ------------------------------------------------------ #
event_n = 0
line_buf = ""
aprint()
aprint("Streaming... (device will sleep after 180s inactivity, wake every 120s)")
aprint()
try:
while True:
chunk = ser.read(256).decode(errors="replace")
if not chunk:
continue
_rawlog("RX", chunk)
if args.verbose:
sys.stdout.write(chunk)
sys.stdout.flush()
line_buf += chunk
# Process complete lines
while "\n" in line_buf:
line, line_buf = line_buf.split("\n", 1)
line = line.rstrip("\r")
m = _TIME_RE.search(_strip(line))
if not m:
continue
# Got a TIME event
host_ts = int(time.time())
device_unix = int(m.group(1))
src = m.group(2)
uptime_s = m.group(3)
event_n += 1
row = _table_row(event_n, host_ts, device_unix, src, uptime_s)
aprint(row)
except KeyboardInterrupt:
aprint(f"\n\n[{_ts()}] Stopped by user.")
if event_n == 0:
aprint("No TIME events captured.")
else:
aprint(f"{event_n} event(s) logged to {analysis_path}")
ser.close()
_raw_log.close()
_analysis_log.close()
if __name__ == "__main__":
main()