This commit is contained in:
Thaddeus Hughes
2026-03-04 17:41:58 -06:00
parent 6ce5dae3a4
commit e2451fce78
21 changed files with 9013 additions and 7 deletions

BIN
logtool/04MAR2026_1430.bin Normal file

Binary file not shown.

1926
logtool/04MAR2026_1430.txt Normal file

File diff suppressed because it is too large Load Diff

BIN
logtool/04MAR2026_1449.bin Normal file

Binary file not shown.

1935
logtool/04MAR2026_1449.txt Normal file

File diff suppressed because it is too large Load Diff

View File

View File

@@ -0,0 +1,4 @@
Streaming from http://sc.local/log (Ctrl-C to stop)
Time State Bat(V) Drive(A) Jack(A) Aux(A) Counter Stable Raw DrHeat JkHeat AxHeat
---- ----- ------ -------- ------- ------ ------- ------ --- ------ ------ ------

4129
logtool/04MAR2026_1505.txt Normal file

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

157
logtool/cli_view.py Normal file
View File

@@ -0,0 +1,157 @@
"""
CLI table output for SC-F001 logtool.
"""
from parser import LOG_TYPE_BAT, LOG_TYPE_CRASH, LOG_TYPE_BOOT, LOG_TYPE_TIME_SET
try:
from tabulate import tabulate
_TABULATE_OK = True
except ImportError:
_TABULATE_OK = False
_SENSOR_BITS = ['SAFETY', 'JACK', 'DRIVE', 'AUX2']
def _sensor_str(nibble: int) -> str:
active = [_SENSOR_BITS[i] for i in range(4) if (nibble >> i) & 1]
return '+'.join(active) if active else '-'
def _row(e: dict) -> list:
t = e.get('entry_type', -1)
name = e.get('state_name', '?')
if 0 <= t <= 12:
return [
e.get('time_str', ''),
name,
f"{e.get('bat_V', 0):.3f}",
f"{e.get('drive_A', 0):.2f}",
f"{e.get('jack_A', 0):.2f}",
f"{e.get('aux_A', 0):.2f}",
str(e.get('counter', 0)),
_sensor_str(e.get('sensors_stable', 0)),
_sensor_str(e.get('sensors_raw', 0)),
f"{e.get('drive_heat', 0):.1f}",
f"{e.get('jack_heat', 0):.1f}",
f"{e.get('aux_heat', 0):.1f}",
]
elif t == LOG_TYPE_BAT:
return [
e.get('time_str', ''),
'BAT',
f"{e.get('bat_V', 0):.3f}",
'', '', '', '', '', '', '', '', '',
]
elif t == LOG_TYPE_CRASH:
return [
e.get('time_str', ''),
f"*** CRASH: {e.get('reason_str', '?')}",
'', '', '', '', '', '', '', '', '', '',
]
elif t == LOG_TYPE_BOOT:
return [
e.get('time_str', ''),
f"BOOT rst={e.get('reason_str', '?')} wake={e.get('wake_str', '?')}",
'', '', '', '', '', '', '', '', '', '',
]
elif t == LOG_TYPE_TIME_SET:
return [
e.get('time_str', ''),
'TIME_SET',
'', '', '', '', '', '', '', '', '', '',
]
else:
return [
e.get('time_str', ''),
name,
'', '', '', '', '', '', '', '', '', '',
]
_HEADERS = ['Time', 'State', 'Bat(V)', 'Drive(A)', 'Jack(A)', 'Aux(A)',
'Counter', 'Stable', 'Raw', 'DrHeat', 'JkHeat', 'AxHeat']
def print_table(entries: list, type_filter: str = None):
"""Print a tabulate table of log entries to stdout."""
if type_filter:
tf = type_filter.lower()
if tf == 'fsm':
entries = [e for e in entries if 0 <= e.get('entry_type', -1) <= 12]
elif tf == 'bat':
entries = [e for e in entries if e.get('entry_type') == LOG_TYPE_BAT]
elif tf == 'crash':
entries = [e for e in entries if e.get('entry_type') == LOG_TYPE_CRASH]
rows = [_row(e) for e in entries]
if not rows:
print("(no entries)")
return
if _TABULATE_OK:
print(tabulate(rows, headers=_HEADERS, tablefmt='simple'))
else:
# Manual fallback
widths = [max(len(str(r[i])) for r in [_HEADERS] + rows) for i in range(len(_HEADERS))]
fmt = ' '.join(f'{{:<{w}}}' for w in widths)
print(fmt.format(*_HEADERS))
print(' '.join('-' * w for w in widths))
for row in rows:
print(fmt.format(*row))
def print_summary(entries: list):
"""Print a brief summary: time range, entry counts, voltage range."""
if not entries:
print("(empty log)")
return
fsm_entries = [e for e in entries if 0 <= e.get('entry_type', -1) <= 12]
bat_entries = [e for e in entries if e.get('entry_type') == LOG_TYPE_BAT]
crash_entries = [e for e in entries if e.get('entry_type') == LOG_TYPE_CRASH]
boot_entries = [e for e in entries if e.get('entry_type') == LOG_TYPE_BOOT]
time_set_entries = [e for e in entries if e.get('entry_type') == LOG_TYPE_TIME_SET]
all_ts = [e.get('ts_ms', 0) for e in entries if e.get('ts_ms')]
ts_min = min(all_ts) if all_ts else 0
ts_max = max(all_ts) if all_ts else 0
all_bat = [e['bat_V'] for e in entries if 'bat_V' in e]
print(f"Entries : {len(entries)} total "
f"({len(fsm_entries)} FSM, {len(bat_entries)} BAT, "
f"{len(crash_entries)} CRASH, {len(boot_entries)} BOOT, "
f"{len(time_set_entries)} TIME_SET)")
if all_ts:
from parser import _ts_to_str
print(f"Time : {_ts_to_str(ts_min)}{_ts_to_str(ts_max)}")
dur_s = (ts_max - ts_min) / 1000
print(f"Duration: {dur_s:.1f} s ({dur_s/60:.1f} min)")
if all_bat:
print(f"Battery : {min(all_bat):.3f} V {max(all_bat):.3f} V")
if boot_entries:
print(f"\nBOOT events:")
for e in boot_entries:
print(f" {e.get('time_str', '?')} rst={e.get('reason_str', '?')} wake={e.get('wake_str', '?')}")
if crash_entries:
print(f"\nCRASH events:")
for e in crash_entries:
print(f" {e.get('time_str', '?')} reason={e.get('reason_str', '?')}")
if time_set_entries:
print(f"\nTIME_SET events:")
for e in time_set_entries:
print(f" {e.get('time_str', '?')}")
def append_rows(new_entries: list):
"""Print new rows without header (for streaming append mode)."""
for e in new_entries:
row = _row(e)
if _TABULATE_OK:
print(tabulate([row], tablefmt='plain'))
else:
print(' '.join(str(c) for c in row))

207
logtool/gui_view.py Normal file
View File

@@ -0,0 +1,207 @@
"""
Matplotlib GUI for SC-F001 logtool.
"""
from parser import LOG_TYPE_BAT, LOG_TYPE_CRASH, _ts_to_str
try:
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.animation import FuncAnimation
import numpy as np
_MPL_OK = True
except ImportError:
_MPL_OK = False
def _check_mpl():
if not _MPL_OK:
raise ImportError("'matplotlib' and 'numpy' required for GUI mode. Install: pip install matplotlib")
def _entries_to_arrays(entries: list) -> dict:
"""Split entries into typed arrays for plotting."""
fsm = [e for e in entries if 0 <= e.get('entry_type', -1) <= 12]
bat = [e for e in entries if e.get('entry_type') == LOG_TYPE_BAT]
crash = [e for e in entries if e.get('entry_type') == LOG_TYPE_CRASH]
return {'fsm': fsm, 'bat': bat, 'crash': crash}
def _ts_arr(entries, key='ts_ms'):
import numpy as np
return np.array([e.get(key, 0) / 1000.0 for e in entries])
def _val_arr(entries, key):
import numpy as np
return np.array([e.get(key, float('nan')) for e in entries])
def show_plots(entries: list, title: str = "SC-F001 Log"):
_check_mpl()
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import numpy as np
from datetime import datetime
arrays = _entries_to_arrays(entries)
fsm = arrays['fsm']
bat = arrays['bat']
crash = arrays['crash']
crash_ts = [e.get('ts_ms', 0) / 1000.0 for e in crash]
fig, axes = plt.subplots(4, 1, figsize=(14, 10), sharex=True)
fig.suptitle(title)
def add_crash_lines(ax):
for ts in crash_ts:
ax.axvline(x=datetime.utcfromtimestamp(ts), color='red',
linestyle='--', linewidth=1.0, alpha=0.7)
def to_dt(ts_arr):
return [datetime.utcfromtimestamp(t) for t in ts_arr]
# 1. Battery voltage
ax0 = axes[0]
ax0.set_ylabel('Battery (V)')
all_bat_entries = fsm + bat
all_bat_entries.sort(key=lambda e: e.get('ts_ms', 0))
if all_bat_entries:
ts = to_dt(_ts_arr(all_bat_entries))
vs = _val_arr(all_bat_entries, 'bat_V')
ax0.plot(ts, vs, color='green', linewidth=1)
add_crash_lines(ax0)
ax0.grid(True, alpha=0.3)
# 2. Currents
ax1 = axes[1]
ax1.set_ylabel('Current (A)')
if fsm:
ts = to_dt(_ts_arr(fsm))
ax1.plot(ts, _val_arr(fsm, 'drive_A'), label='Drive', linewidth=1)
ax1.plot(ts, _val_arr(fsm, 'jack_A'), label='Jack', linewidth=1)
ax1.plot(ts, _val_arr(fsm, 'aux_A'), label='Aux', linewidth=1)
ax1.legend(fontsize=8, loc='upper right')
add_crash_lines(ax1)
ax1.grid(True, alpha=0.3)
# 3. FSM state
ax2 = axes[2]
ax2.set_ylabel('FSM State')
if fsm:
ts = to_dt(_ts_arr(fsm))
states = _val_arr(fsm, 'entry_type')
ax2.step(ts, states, where='post', linewidth=1, color='navy')
# y-tick labels: use state names from first entry if available
state_map = {}
for e in fsm:
state_map[e['entry_type']] = e.get('state_name', str(e['entry_type']))
yticks = sorted(state_map.keys())
ax2.set_yticks(yticks)
ax2.set_yticklabels([state_map[k] for k in yticks], fontsize=7)
add_crash_lines(ax2)
ax2.grid(True, alpha=0.3)
# 4. Thermal accumulators
ax3 = axes[3]
ax3.set_ylabel('Heat (I²t)')
if fsm:
ts = to_dt(_ts_arr(fsm))
ax3.plot(ts, _val_arr(fsm, 'drive_heat'), label='Drive', linewidth=1)
ax3.plot(ts, _val_arr(fsm, 'jack_heat'), label='Jack', linewidth=1)
ax3.plot(ts, _val_arr(fsm, 'aux_heat'), label='Aux', linewidth=1)
ax3.legend(fontsize=8, loc='upper right')
add_crash_lines(ax3)
ax3.grid(True, alpha=0.3)
ax3.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
fig.autofmt_xdate()
plt.tight_layout()
plt.show()
def live_plot(url: str, interval_s: float = 2.0):
"""Live-updating matplotlib plot using FuncAnimation."""
_check_mpl()
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.animation import FuncAnimation
from datetime import datetime
import source as src
import parser as prs
all_entries = []
fig, axes = plt.subplots(4, 1, figsize=(14, 10), sharex=True)
fig.suptitle("SC-F001 Live Log")
labels = ['Battery (V)', 'Current (A)', 'FSM State', 'Heat (I²t)']
for ax, lbl in zip(axes, labels):
ax.set_ylabel(lbl)
ax.grid(True, alpha=0.3)
lines = {
'bat': axes[0].plot([], [], color='green', linewidth=1)[0],
'drive': axes[1].plot([], [], label='Drive', linewidth=1)[0],
'jack': axes[1].plot([], [], label='Jack', linewidth=1)[0],
'aux': axes[1].plot([], [], label='Aux', linewidth=1)[0],
'state': axes[2].step([], [], where='post', linewidth=1, color='navy')[0],
'drheat': axes[3].plot([], [], label='Drive', linewidth=1)[0],
'jkheat': axes[3].plot([], [], label='Jack', linewidth=1)[0],
'axheat': axes[3].plot([], [], label='Aux', linewidth=1)[0],
}
axes[1].legend(fontsize=8, loc='upper right')
axes[3].legend(fontsize=8, loc='upper right')
axes[3].xaxis.set_major_formatter(mdates.DateFormatter('%H:%M:%S'))
state = {'current_tail': 0, 'first': True}
def to_dt(ts_list):
return [datetime.utcfromtimestamp(t / 1000.0) for t in ts_list]
def update(_frame):
try:
if state['first']:
blob = src.fetch_full(url)
meta, tail, head, new_entries = prs.autodetect_and_parse(blob)
state['current_tail'] = head or 0
state['first'] = False
else:
binary, new_head = src.fetch_incremental(url, state['current_tail'])
new_entries = prs.parse_entries(binary) if binary else []
state['current_tail'] = new_head
all_entries.extend(new_entries)
except Exception as exc:
print(f"[live_plot] fetch error: {exc}")
return
fsm = [e for e in all_entries if 0 <= e.get('entry_type', -1) <= 12]
bat = [e for e in all_entries if e.get('entry_type') in (LOG_TYPE_BAT,) or 'bat_V' in e]
crash = [e for e in all_entries if e.get('entry_type') == LOG_TYPE_CRASH]
if fsm:
ts = to_dt([e['ts_ms'] for e in fsm])
lines['drive'].set_data(ts, [e.get('drive_A', 0) for e in fsm])
lines['jack'].set_data( ts, [e.get('jack_A', 0) for e in fsm])
lines['aux'].set_data( ts, [e.get('aux_A', 0) for e in fsm])
lines['state'].set_data(ts, [e.get('entry_type', 0) for e in fsm])
lines['drheat'].set_data(ts, [e.get('drive_heat', 0) for e in fsm])
lines['jkheat'].set_data(ts, [e.get('jack_heat', 0) for e in fsm])
lines['axheat'].set_data(ts, [e.get('aux_heat', 0) for e in fsm])
all_bat = sorted(
[e for e in all_entries if 'bat_V' in e],
key=lambda e: e.get('ts_ms', 0)
)
if all_bat:
ts = to_dt([e['ts_ms'] for e in all_bat])
lines['bat'].set_data(ts, [e['bat_V'] for e in all_bat])
for ax in axes:
ax.relim()
ax.autoscale_view()
fig.autofmt_xdate()
ani = FuncAnimation(fig, update, interval=interval_s * 1000, cache_frame_data=False)
plt.tight_layout()
plt.show()

234
logtool/logtool.py Normal file
View File

@@ -0,0 +1,234 @@
#!/usr/bin/env python3
"""
SC-F001 Log Tool
Usage:
logtool.py <source> [options]
<source> *.bin file path → read local file
anything else → treated as hostname/URL:
sc.local → http://sc.local/log
192.168.4.1 → http://192.168.4.1/log
http://host/log → used as-is
Output files are always written:
<basename>.bin raw bytes received (HTTP response or file contents)
<basename>.txt stdout capture (table / summary)
Default basename: 04MAR2026_1052 (date+time of invocation).
Specify with --out <basename>.
Options:
--gui Show matplotlib plots instead of CLI table
--stream Poll for new entries (HTTP only)
--type fsm|bat|crash Filter entry type
--tail <offset> Start from a specific flash offset (HTTP POST mode)
--interval <s> Polling interval in seconds (default: 2.0)
--fw <path> Path to firmware main/ directory (for state names)
--summary Print summary statistics only
--out <basename> Output file basename (no extension)
"""
import sys
import io
import argparse
from datetime import datetime
from pathlib import Path
# Ensure logtool directory is on the path
sys.path.insert(0, str(Path(__file__).parent))
import parser as prs
import source as src
import cli_view
import gui_view
def _default_basename() -> str:
return datetime.now().strftime('%d%b%Y_%H%M').upper()
def _normalize_source(raw: str) -> tuple:
"""
Returns (is_http: bool, resolved: str).
*.bin → (False, path)
http(s)://... → (True, url as-is)
anything else → (True, http://<raw>/log)
"""
if raw.endswith('.bin'):
return False, raw
if raw.startswith('http://') or raw.startswith('https://'):
return True, raw
return True, f'http://{raw}/log'
class _Tee:
"""Write to two streams simultaneously (stdout + file)."""
def __init__(self, primary, secondary):
self.primary = primary
self.secondary = secondary
def write(self, data):
self.primary.write(data)
self.secondary.write(data)
return len(data)
def flush(self):
self.primary.flush()
self.secondary.flush()
def __getattr__(self, name):
return getattr(self.primary, name)
def main():
ap = argparse.ArgumentParser(
description='SC-F001 flash log viewer',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__
)
ap.add_argument('source', help='*.bin file, hostname, or full URL')
ap.add_argument('--gui', action='store_true', help='Show matplotlib GUI')
ap.add_argument('--stream', action='store_true', help='Live streaming mode (HTTP only)')
ap.add_argument('--type', choices=['fsm', 'bat', 'crash'], dest='entry_type',
help='Filter by entry type')
ap.add_argument('--tail', type=int, default=None,
help='Start from flash offset (HTTP only)')
ap.add_argument('--interval', type=float, default=2.0,
help='Polling interval in seconds (default: 2.0)')
ap.add_argument('--fw', default=None,
help='Path to firmware main/ directory (default: ../main)')
ap.add_argument('--summary', action='store_true',
help='Print summary statistics only')
ap.add_argument('--out', default=None, metavar='BASENAME',
help='Output file basename (default: 04MAR2026_1052 style)')
args = ap.parse_args()
# Resolve source
is_http, resolved = _normalize_source(args.source)
# Output file basename
basename = args.out or _default_basename()
bin_path = Path(basename + '.bin')
txt_path = Path(basename + '.txt')
# Tee stdout → .txt file
txt_file = txt_path.open('w', encoding='utf-8')
sys.stdout = _Tee(sys.__stdout__, txt_file)
try:
_run(args, is_http, resolved, bin_path, basename)
finally:
sys.stdout = sys.__stdout__
txt_file.close()
print(f"Saved: {bin_path} {txt_path}")
def _run(args, is_http, resolved, bin_path, basename):
# Load FSM state names from firmware source
fw_path = args.fw or (Path(__file__).parent.parent / 'main')
fsm_states = prs.load_fsm_states(fw_path)
# ── Streaming mode ────────────────────────────────────────────────────────
if args.stream:
if not is_http:
print("Error: --stream requires an HTTP source", file=sys.stderr)
sys.exit(1)
if args.gui:
print(f"Starting live GUI from {resolved} ...")
# GUI handles its own data fetching; raw bytes aren't easily capturable
bin_path.write_bytes(b'')
gui_view.live_plot(resolved, interval_s=args.interval)
return
print(f"Streaming from {resolved} (Ctrl-C to stop)\n")
print(' '.join(cli_view._HEADERS))
print(' '.join('-' * len(h) for h in cli_view._HEADERS))
accumulated_bin = io.BytesIO()
def on_batch(entries, meta, is_first):
if args.entry_type:
tf = args.entry_type.lower()
if tf == 'fsm':
entries = [e for e in entries if 0 <= e.get('entry_type', -1) <= 12]
elif tf == 'bat':
entries = [e for e in entries if e.get('entry_type') == prs.LOG_TYPE_BAT]
elif tf == 'crash':
entries = [e for e in entries if e.get('entry_type') == prs.LOG_TYPE_CRASH]
if entries:
cli_view.append_rows(entries)
def _patched_stream():
"""Like source.stream but also captures raw bytes."""
blob = src.fetch_full(resolved)
accumulated_bin.write(blob)
meta, tail, head, entries = prs.autodetect_and_parse(blob, fsm_states)
on_batch(entries, meta, is_first=True)
current_tail = head or 0
import time
while True:
time.sleep(args.interval)
try:
binary, new_head = src.fetch_incremental(resolved, current_tail)
if binary:
accumulated_bin.write(binary)
new_entries = prs.parse_entries(binary, fsm_states)
if new_entries:
on_batch(new_entries, None, is_first=False)
current_tail = new_head
except Exception as exc:
print(f"[stream] poll error: {exc}")
try:
_patched_stream()
except KeyboardInterrupt:
print("\nStopped.")
finally:
bin_path.write_bytes(accumulated_bin.getvalue())
return
# ── One-shot mode ─────────────────────────────────────────────────────────
if is_http:
print(f"Fetching {resolved} ...")
if args.tail is not None:
binary, new_head = src.fetch_incremental(resolved, args.tail)
blob = binary
entries = prs.parse_entries(binary, fsm_states)
meta = None
print(f"Incremental fetch: new_head={new_head} entries={len(entries)}")
else:
blob = src.fetch_full(resolved)
meta, tail, head, entries = prs.parse_response(blob, fsm_states)
print(f"Log offsets: tail={tail} head={head} entries={len(entries)}")
else:
print(f"Reading {resolved} ...")
blob = src.read_file(resolved)
meta, tail, head, entries = prs.autodetect_and_parse(blob, fsm_states)
if head is not None:
print(f"Log offsets: tail={tail} head={head}")
print(f"Parsed {len(entries)} entries")
# Save raw binary
bin_path.write_bytes(blob or b'')
if meta:
ver = meta.get('version', '?')
t = meta.get('time', '?')
print(f"Device: version={ver} time={t}")
if args.summary:
cli_view.print_summary(entries)
elif args.gui:
title = f"SC-F001 Log — {args.source}"
gui_view.show_plots(entries, title=title)
else:
cli_view.print_table(entries, type_filter=args.entry_type)
print()
cli_view.print_summary(entries)
if __name__ == '__main__':
main()

302
logtool/parser.py Normal file
View File

@@ -0,0 +1,302 @@
"""
Binary log parser for SC-F001 flash logs.
On-disk entry format: [len u8][payload (len-1 bytes)][type u8] = len+1 total bytes
The firmware does len++ before writing, so stored len = payload_size + 1.
All values are little-endian.
"""
import struct
import json
import re
from pathlib import Path
from datetime import datetime
LOG_TYPE_BAT = 100
LOG_TYPE_CRASH = 101
LOG_TYPE_BOOT = 102
LOG_TYPE_TIME_SET = 103
# Fallback FSM state map (matches control_fsm.h)
_FALLBACK_FSM_STATES = {
0: "IDLE",
1: "MOVE_START_DELAY",
2: "JACK_UP_START",
3: "JACK_UP",
4: "DRIVE_START_DELAY",
5: "DRIVE",
6: "DRIVE_END_DELAY",
7: "JACK_DOWN",
8: "UNDO_JACK_START",
9: "CALIBRATE_JACK_DELAY",
10: "CALIBRATE_JACK_MOVE",
11: "CALIBRATE_DRIVE_DELAY",
12: "CALIBRATE_DRIVE_MOVE",
}
ESP_RESET_REASONS = {
0: "UNKNOWN",
1: "POWERON",
2: "EXT",
3: "SW",
4: "PANIC",
5: "INT_WDT",
6: "TASK_WDT",
7: "WDT",
8: "DEEPSLEEP",
9: "BROWNOUT",
10: "SDIO",
}
def load_fsm_states(fw_path=None) -> dict:
"""
Parse FSM state names from control_fsm.h.
Returns dict mapping int -> name string (e.g. {0: 'IDLE', ...}).
Falls back to hardcoded dict if the file can't be found or parsed.
"""
if fw_path is None:
# Default: sibling directory ../main relative to this file
fw_path = Path(__file__).parent.parent / "main"
header = Path(fw_path) / "control_fsm.h"
if not header.exists():
return dict(_FALLBACK_FSM_STATES)
try:
text = header.read_text()
# Find the fsm_state_t enum block
m = re.search(r'typedef\s+enum\s*\{([^}]+)\}\s*fsm_state_t\s*;', text, re.DOTALL)
if not m:
return dict(_FALLBACK_FSM_STATES)
states = {}
value = 0
for line in m.group(1).splitlines():
line = line.strip().rstrip(',')
if not line or line.startswith('//'):
continue
if '=' in line:
name, val = line.split('=', 1)
name = name.strip()
val = val.strip().split('//')[0].strip()
try:
value = int(val, 0)
except ValueError:
pass
else:
name = line.split('//')[0].strip()
if name:
# Strip STATE_ prefix for display brevity
display = name.removeprefix('STATE_') if hasattr(str, 'removeprefix') else (
name[6:] if name.startswith('STATE_') else name)
states[value] = display
value += 1
return states if states else dict(_FALLBACK_FSM_STATES)
except Exception:
return dict(_FALLBACK_FSM_STATES)
def _ts_to_str(ts_ms: int) -> str:
"""Convert ms-since-epoch (local-as-UTC) to display string."""
try:
dt = datetime.utcfromtimestamp(ts_ms / 1000.0)
return dt.strftime("%Y-%m-%d %H:%M:%S.") + f"{ts_ms % 1000:03d}"
except (OSError, ValueError):
return str(ts_ms)
def _unpack_fsm(payload: bytes, fsm_states: dict) -> dict:
if len(payload) < 39:
raise ValueError(f"FSM payload too short: {len(payload)} < 39")
ts_ms, bat_V, drive_A, jack_A, aux_A, counter, sensors, \
drive_heat, jack_heat, aux_heat = struct.unpack_from('<QffffhBfff', payload, 0)
return {
'ts_ms': ts_ms,
'time_str': _ts_to_str(ts_ms),
'bat_V': round(bat_V, 3),
'drive_A': round(drive_A, 3),
'jack_A': round(jack_A, 3),
'aux_A': round(aux_A, 3),
'counter': counter,
'sensors_stable': sensors & 0x0F,
'sensors_raw': (sensors >> 4) & 0x0F,
'drive_heat': round(drive_heat, 2),
'jack_heat': round(jack_heat, 2),
'aux_heat': round(aux_heat, 2),
}
def _unpack_bat(payload: bytes) -> dict:
if len(payload) < 12:
raise ValueError(f"BAT payload too short: {len(payload)} < 12")
ts_ms, bat_V = struct.unpack_from('<Qf', payload, 0)
return {
'ts_ms': ts_ms,
'time_str': _ts_to_str(ts_ms),
'bat_V': round(bat_V, 3),
}
def _unpack_crash(payload: bytes) -> dict:
if len(payload) < 9:
raise ValueError(f"CRASH payload too short: {len(payload)} < 9")
ts_ms, reason = struct.unpack_from('<QB', payload, 0)
return {
'ts_ms': ts_ms,
'time_str': _ts_to_str(ts_ms),
'reset_reason': reason,
'reason_str': ESP_RESET_REASONS.get(reason, f"UNKNOWN({reason})"),
}
ESP_WAKEUP_CAUSES = {
0: 'NORMAL',
2: 'EXT0',
4: 'TIMER',
5: 'ULP',
6: 'TOUCHPAD',
7: 'ULP',
}
def _unpack_boot(payload: bytes) -> dict:
if len(payload) < 9:
raise ValueError(f"BOOT payload too short: {len(payload)} < 9")
ts_ms, boot_info = struct.unpack_from('<QB', payload, 0)
reset_reason = boot_info & 0x0F
wake_cause = (boot_info >> 4) & 0x0F
return {
'ts_ms': ts_ms,
'time_str': _ts_to_str(ts_ms),
'reset_reason': reset_reason,
'reason_str': ESP_RESET_REASONS.get(reset_reason, f"UNKNOWN({reset_reason})"),
'wake_cause': wake_cause,
'wake_str': ESP_WAKEUP_CAUSES.get(wake_cause, f"UNKNOWN({wake_cause})"),
}
def _unpack_time_set(payload: bytes) -> dict:
if len(payload) < 8:
raise ValueError(f"TIME_SET payload too short: {len(payload)} < 8")
ts_ms, = struct.unpack_from('<Q', payload, 0)
return {
'ts_ms': ts_ms,
'time_str': _ts_to_str(ts_ms),
}
def parse_entries(data: bytes, fsm_states: dict = None) -> list:
"""
Parse a stream of raw binary log entries.
Returns list of dicts, each with 'entry_type' and type-specific fields.
"""
if fsm_states is None:
fsm_states = _FALLBACK_FSM_STATES
entries = []
i = 0
n = len(data)
while i < n:
b = data[i]
# Erased flash or sector padding → done or skip sector
if b == 0xFF:
break
if b == 0x00:
# Sector padding: skip to next 4096-byte boundary
sector_size = 4096
next_sector = ((i // sector_size) + 1) * sector_size
i = next_sector
continue
entry_len = b # stored len = payload_size + 1
payload_size = entry_len - 1
type_offset = i + 1 + payload_size # = i + entry_len
if type_offset >= n:
break # truncated
payload = data[i + 1 : i + 1 + payload_size]
entry_type = data[type_offset]
try:
if 0 <= entry_type <= 12:
e = _unpack_fsm(payload, fsm_states)
e['entry_type'] = entry_type
e['state_name'] = fsm_states.get(entry_type, f"STATE_{entry_type}")
elif entry_type == LOG_TYPE_BAT:
e = _unpack_bat(payload)
e['entry_type'] = LOG_TYPE_BAT
e['state_name'] = 'BAT'
elif entry_type == LOG_TYPE_CRASH:
e = _unpack_crash(payload)
e['entry_type'] = LOG_TYPE_CRASH
e['state_name'] = 'CRASH'
elif entry_type == LOG_TYPE_BOOT:
e = _unpack_boot(payload)
e['entry_type'] = LOG_TYPE_BOOT
e['state_name'] = 'BOOT'
elif entry_type == LOG_TYPE_TIME_SET:
e = _unpack_time_set(payload)
e['entry_type'] = LOG_TYPE_TIME_SET
e['state_name'] = 'TIME_SET'
else:
e = {
'entry_type': entry_type,
'state_name': f'UNK({entry_type:#04x})',
'raw': payload.hex(),
}
except Exception as exc:
e = {
'entry_type': entry_type,
'state_name': 'PARSE_ERR',
'error': str(exc),
'raw': payload.hex(),
}
entries.append(e)
i = type_offset + 1 # advance past type byte
return entries
def parse_response(blob: bytes, fsm_states: dict = None) -> tuple:
"""
Parse a full HTTP /log response blob.
Returns (json_meta: dict, tail: int, head: int, entries: list).
"""
if len(blob) < 8:
raise ValueError("Response too short")
json_len = struct.unpack_from('>I', blob, 0)[0]
if json_len > 65536 or len(blob) < 4 + json_len + 8:
raise ValueError(f"Invalid json_len {json_len}")
json_bytes = blob[4 : 4 + json_len]
meta = json.loads(json_bytes.decode('utf-8'))
tail, head = struct.unpack_from('>II', blob, 4 + json_len)
binary = blob[4 + json_len + 8:]
entries = parse_entries(binary, fsm_states)
return meta, tail, head, entries
def autodetect_and_parse(blob: bytes, fsm_states: dict = None) -> tuple:
"""
Auto-detect whether blob is HTTP response format or raw flash binary.
Returns (json_meta_or_None, tail_or_None, head_or_None, entries).
"""
# HTTP format: first 4 bytes = BE uint32 json_len, byte 4 should be '{'
if len(blob) >= 5:
candidate_len = struct.unpack_from('>I', blob, 0)[0]
if candidate_len < 8192 and blob[4:5] == b'{':
meta, tail, head, entries = parse_response(blob, fsm_states)
return meta, tail, head, entries
# Raw binary
entries = parse_entries(blob, fsm_states)
return None, None, None, entries

3
logtool/requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
requests
matplotlib
tabulate

91
logtool/source.py Normal file
View File

@@ -0,0 +1,91 @@
"""
Data sources for SC-F001 logtool.
Supports local .bin files and HTTP /log endpoint (full GET + incremental POST).
"""
import struct
import time
from pathlib import Path
try:
import requests
_REQUESTS_OK = True
except ImportError:
_REQUESTS_OK = False
def read_file(path: str) -> bytes:
return Path(path).read_bytes()
def _check_requests():
if not _REQUESTS_OK:
raise ImportError("'requests' package required for HTTP mode. Install: pip install requests")
def fetch_full(url: str, timeout: float = 10.0) -> bytes:
"""GET /log — returns full response blob."""
_check_requests()
resp = requests.get(url, timeout=timeout)
resp.raise_for_status()
return resp.content
def fetch_incremental(url: str, tail: int, timeout: float = 10.0) -> tuple:
"""
POST /log with tail offset.
Returns (raw_binary: bytes, new_head: int).
The raw_binary starts at the given tail and ends at new_head.
"""
_check_requests()
resp = requests.post(url, data=str(tail), timeout=timeout)
resp.raise_for_status()
blob = resp.content
# Response is the same format: [4B json_len BE][JSON][4B tail BE][4B head BE][binary]
import struct as _s
import json as _j
if len(blob) < 8:
return b'', tail
json_len = _s.unpack_from('>I', blob, 0)[0]
if json_len > 65536 or len(blob) < 4 + json_len + 8:
return b'', tail
new_tail, new_head = _s.unpack_from('>II', blob, 4 + json_len)
binary = blob[4 + json_len + 8:]
return binary, new_head
def stream(url: str, callback, interval_s: float = 2.0):
"""
Stream new log entries from an HTTP /log endpoint.
- Fetches the full log on first call (GET).
- Polls for new entries every `interval_s` seconds (POST with last head as new tail).
- Calls callback(entries: list, is_first: bool) for each batch.
- Runs until KeyboardInterrupt.
"""
from parser import autodetect_and_parse
_check_requests()
# Initial full fetch
blob = fetch_full(url)
meta, tail, head, entries = autodetect_and_parse(blob)
callback(entries, meta, is_first=True)
current_tail = head if head is not None else 0
# Poll for incremental updates
while True:
time.sleep(interval_s)
try:
binary, new_head = fetch_incremental(url, current_tail)
if binary:
from parser import parse_entries
new_entries = parse_entries(binary)
if new_entries:
callback(new_entries, None, is_first=False)
current_tail = new_head
except Exception as exc:
print(f"[stream] poll error: {exc}")