Add logtool.py

This commit is contained in:
2026-02-06 07:30:32 -06:00
commit 48762c614f

503
logtool.py Normal file
View File

@@ -0,0 +1,503 @@
#!/usr/bin/env python3
"""
Stockcropper ESP32 Log Tool
- Convert .bin → .csv (with V and A)
- Generate beautiful multi-panel plots from a single .bin file
- Stream and visualize live data from ESP32
"""
import struct
import glob
import os
import sys
import argparse
from datetime import datetime, timezone
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
import requests
import time
def fetch_log(ip, tail=None):
"""Fetch log data from ESP32 device"""
url = f"http://{ip}/log"
print(f"Fetching from {url}" + (f" (tail={tail})" if tail else ""))
try:
if tail is not None:
resp = requests.post(url, json=int(tail), timeout=5)
else:
resp = requests.get(url, timeout=5)
if resp.status_code == 200:
print(f"Received {len(resp.content)} bytes")
return resp.content
else:
print(f"Failed to fetch log: {resp.status_code} {resp.text}")
return None
except Exception as e:
print(f"Error fetching log: {e}")
return None
ENTRY_SIZE = 32
FSM_STATE_NAMES = {
0: "IDLE",
1: "MOVE_START_DELAY",
2: "JACK_UP",
3: "DRIVE_START_DELAY",
4: "DRIVE",
5: "DRIVE_END_DELAY",
6: "JACK_DOWN",
7: "UNDO_JACK",
8: "UNDO_JACK_START",
}
STATE_COLORS = {
0: "#808080", # gray
1: "#d0d0d0",
2: "#3399ff", # blue
3: "#99ffaa", # light green
4: "#00ff00", # green
5: "#aaff99", # light green
6: "#aa55ee",
7: "#ff0000", # bright red
8: "#ff9999", # light red
}
def parse_log_data(raw):
"""
Parse binary log data and return (head, records)
Returns:
tuple: (head position, list of record dicts)
"""
if len(raw) < 8:
print("Data too short for header")
return None, []
(tail, head) = struct.unpack(">ll", raw[0:8])
print(f"Log header: tail={tail}, head={head}")
offset = 8
records = []
while offset + ENTRY_SIZE <= len(raw):
entry = raw[offset:offset + ENTRY_SIZE]
if entry[0] != ENTRY_SIZE:
# Hit the end of valid data
break
try:
fields = struct.unpack(">BQiiiiiBBB", entry)
except:
# Corrupt or incomplete entry
break
ts_ms = fields[1]
battery_mv = fields[2]
i_drive = fields[3] / 1000.0 # mA → A
i_jack = fields[4] / 1000.0
i_aux = fields[5] / 1000.0
counter = fields[6]
sens_drive = fields[7]
sens_jack = fields[8]
state = fields[9]
# Try to parse datetime
dt = None
try:
dt = datetime.fromtimestamp(ts_ms / 1000.0, tz=timezone.utc)
except:
pass
records.append({
"timestamp_ms": ts_ms,
"datetime": dt,
"battery_V": battery_mv / 1000.0,
"current_drive_A": i_drive,
"current_jack_A": i_jack,
"current_aux_A": i_aux,
"drive_counter": counter,
"sensor_drive": sens_drive,
"sensor_jack": sens_jack,
"fsm_state": state,
"fsm_name": FSM_STATE_NAMES.get(state, f"UNK({state})")
})
offset += ENTRY_SIZE
return head, records
def load_bin_file(bin_path):
"""Load and parse a .bin file"""
with open(bin_path, "rb") as f:
raw = f.read()
_, records = parse_log_data(raw)
return records
def bin_to_csv(bin_path, csv_path):
"""Convert a .bin file to CSV"""
records = load_bin_file(bin_path)
if not records:
print(f"No valid records in {bin_path}")
return
with open(csv_path, "w", encoding="utf-8") as f:
f.write("Timestamp_ms,Datetime_UTC,Battery_V,Current_Drive_A,Current_Jack_A,Current_Aux_A,"
"Drive_Counter,Sensor_Drive,Sensor_Jack,FSM_State_ID,FSM_State\n")
for r in records:
f.write(f"{r['timestamp_ms']},{r['datetime']},{r['battery_V']:.3f},"
f"{r['current_drive_A']:.3f},{r['current_jack_A']:.3f},{r['current_aux_A']:.3f},"
f"{r['drive_counter']},{r['sensor_drive']},{r['sensor_jack']},"
f"{r['fsm_state']},{r['fsm_name']}\n")
print(f"{len(records)} entries → {csv_path}")
def plot_log(records, title_suffix=""):
"""Generate a multi-panel plot from log records"""
if len(records) < 2:
print("Not enough data to plot.")
return
arr = np.array([
(r['timestamp_ms'],
r['battery_V'],
r['current_drive_A'],
r['current_jack_A'],
r['current_aux_A'],
r['drive_counter'],
r['sensor_drive'],
r['sensor_jack'],
r['fsm_state'])
for r in records
], dtype=np.float64)
t_ms = arr[:,0]
t_rel = (t_ms - t_ms[0]) / 1000.0 # seconds since start
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(14, 10), sharex=True,
gridspec_kw={'height_ratios': [2, 1, 1.5]})
# --- Plot 1: Voltage + Currents ---
ax1.plot(t_rel, arr[:,1], label="Battery", color="black", marker='o', linewidth=2, markersize=4)
ax1.set_ylabel("Battery Voltage (V)")
ax1.grid(True, alpha=0.3)
ax1.legend(loc="upper left")
ax1.set_ylim(0, 15.0)
ax1b = ax1.twinx()
ax1b.plot(t_rel, arr[:,2], label="Drive", color="tab:blue")
ax1b.plot(t_rel, arr[:,3], label="Jack", color="tab:red")
ax1b.plot(t_rel, arr[:,4], label="Aux", color="tab:orange")
ax1b.set_ylabel("Current (A)")
ax1b.legend(loc="upper right")
ax1b.set_ylim(min(-1, np.min(arr[:,2:5])), max(20, np.max(arr[:,2:5])))
# --- Plot 2: FSM State (colored background) ---
ax2.set_ylim(-0.0, 1.0)
ax2.set_yticks([])
prev_t = t_rel[0]
prev_state = int(arr[0,8])
for i in range(1, len(t_rel)):
state = int(arr[i,8])
if state != prev_state or i == len(t_rel)-1:
color = STATE_COLORS.get(prev_state, "#cccccc")
width = t_rel[i-1] - prev_t if i < len(t_rel) else t_rel[-1] - prev_t
ax2.add_patch(Rectangle((prev_t, 0), width, 1,
facecolor=color, edgecolor="none", alpha=0.6))
mid = (prev_t + t_rel[i-1]) / 2
ax2.text(mid, 0.5, FSM_STATE_NAMES.get(prev_state, f"UNK{prev_state}"),
ha="center", va="center", fontsize=10, color="black")
prev_t = t_rel[i-1]
prev_state = state
ax2.set_ylabel("FSM State")
# --- Plot 3: Sensors + Counter ---
ax3.step(t_rel, arr[:,5], where='post', label="Drive Counter", color="purple")
ax3.set_ylabel("Drive Counter")
ax3.grid(True, alpha=0.3)
ax3b = ax3.twinx()
ax3b.plot(t_rel, arr[:,6], drawstyle="steps-post", label="Sensor Drive", color="green", linewidth=2)
ax3b.plot(t_rel, arr[:,7], drawstyle="steps-post", label="Sensor Jack", color="cyan", linewidth=2)
ax3b.set_ylim(-0.1, 1.1)
ax3b.set_yticks([0, 1])
ax3b.set_ylabel("Sensors (0/1)")
ax3b.legend(loc="upper right")
ax3.set_xlabel("Time (seconds since start)")
# Format title
if records[0]['datetime'] and records[-1]['datetime']:
time_str = (f"{records[0]['datetime'].strftime('%Y-%m-%d %H:%M:%S')} UTC → "
f"{records[-1]['datetime'].strftime('%H:%M:%S')} UTC")
else:
time_str = f"{records[0]['timestamp_ms']}{records[-1]['timestamp_ms']}"
plt.suptitle(f"Stockcropper Log{title_suffix} | {len(records)} samples | {time_str}",
fontsize=14)
plt.tight_layout()
return fig
def plot_file(bin_path, save=False):
"""Plot data from a .bin file"""
records = load_bin_file(bin_path)
if not records:
print(f"No data to plot in {bin_path}")
return
fig = plot_log(records)
if save:
plot_path = bin_path.replace(".bin", ".png")
plt.savefig(plot_path, dpi=150, bbox_inches='tight')
print(f"Plot saved → {plot_path}")
else:
plt.show()
def update_plot_live(fig, axes, records):
"""Update existing plot with new data, preserving view limits"""
if len(records) < 2:
return
ax1, ax2, ax3 = axes
ax1b = ax1.get_shared_x_axes().get_siblings(ax1)[0] if hasattr(ax1, 'get_shared_x_axes') else None
ax3b = ax3.get_shared_x_axes().get_siblings(ax3)[0] if hasattr(ax3, 'get_shared_x_axes') else None
# Get current view limits before clearing
xlim = ax1.get_xlim()
ylim1 = ax1.get_ylim()
ylim1b = ax1.lines[0].axes.get_ylim() if ax1.lines else None
# Find twin axes properly
for ax in fig.get_axes():
if ax != ax1 and ax != ax2 and ax != ax3 and ax.bbox.bounds == ax1.bbox.bounds:
ax1b = ax
ylim1b = ax1b.get_ylim()
break
# Prepare data
arr = np.array([
(r['timestamp_ms'], r['battery_V'], r['current_drive_A'], r['current_jack_A'],
r['current_aux_A'], r['drive_counter'], r['sensor_drive'], r['sensor_jack'], r['fsm_state'])
for r in records
], dtype=np.float64)
t_ms = arr[:,0]
t_rel = (t_ms - t_ms[0]) / 1000.0
# Clear axes but keep them
ax1.clear()
if ax1b:
ax1b.clear()
ax2.clear()
ax3.clear()
if ax3b:
for ax in fig.get_axes():
if ax != ax1 and ax != ax2 and ax != ax3 and ax.bbox.bounds == ax3.bbox.bounds:
ax.clear()
ax3b = ax
break
# Redraw plot 1
ax1.plot(t_rel, arr[:,1], label="Battery", color="black", marker='o', linewidth=2, markersize=4)
ax1.set_ylabel("Battery Voltage (V)")
ax1.grid(True, alpha=0.3)
ax1.legend(loc="upper left")
ax1.set_ylim(ylim1)
if ax1b:
ax1b.plot(t_rel, arr[:,2], label="Drive", color="tab:blue")
ax1b.plot(t_rel, arr[:,3], label="Jack", color="tab:red")
ax1b.plot(t_rel, arr[:,4], label="Aux", color="tab:orange")
ax1b.set_ylabel("Current (A)")
ax1b.legend(loc="upper right")
if ylim1b:
ax1b.set_ylim(ylim1b)
# Redraw plot 2 (FSM states)
ax2.set_ylim(-0.0, 1.0)
ax2.set_yticks([])
prev_t = t_rel[0]
prev_state = int(arr[0,8])
for i in range(1, len(t_rel)):
state = int(arr[i,8])
if state != prev_state or i == len(t_rel)-1:
color = STATE_COLORS.get(prev_state, "#cccccc")
width = t_rel[i-1] - prev_t if i < len(t_rel) else t_rel[-1] - prev_t
ax2.add_patch(Rectangle((prev_t, 0), width, 1, facecolor=color, edgecolor="none", alpha=0.6))
mid = (prev_t + t_rel[i-1]) / 2
ax2.text(mid, 0.5, FSM_STATE_NAMES.get(prev_state, f"UNK{prev_state}"),
ha="center", va="center", fontsize=10, color="black")
prev_t = t_rel[i-1]
prev_state = state
ax2.set_ylabel("FSM State")
# Redraw plot 3
ax3.step(t_rel, arr[:,5], where='post', label="Drive Counter", color="purple")
ax3.set_ylabel("Drive Counter")
ax3.grid(True, alpha=0.3)
if ax3b:
ax3b.plot(t_rel, arr[:,6], drawstyle="steps-post", label="Sensor Drive", color="green", linewidth=2)
ax3b.plot(t_rel, arr[:,7], drawstyle="steps-post", label="Sensor Jack", color="cyan", linewidth=2)
ax3b.set_ylim(-0.1, 1.1)
ax3b.set_yticks([0, 1])
ax3b.set_ylabel("Sensors (0/1)")
ax3b.legend(loc="upper right")
ax3.set_xlabel("Time (seconds since start)")
# Restore x limits
ax1.set_xlim(xlim)
# Update title
if records[0]['datetime'] and records[-1]['datetime']:
time_str = (f"{records[0]['datetime'].strftime('%Y-%m-%d %H:%M:%S')} UTC → "
f"{records[-1]['datetime'].strftime('%H:%M:%S')} UTC")
else:
time_str = f"{records[0]['timestamp_ms']}{records[-1]['timestamp_ms']}"
fig.suptitle(f"Stockcropper Log [LIVE] | {len(records)} samples | {time_str}", fontsize=14)
fig.canvas.draw_idle()
fig.canvas.flush_events()
def stream_data(ip, update_interval=1.0, max_points=1000):
"""
Stream data from ESP32 and display live plot
Args:
ip: IP address of ESP32
update_interval: seconds between fetches
max_points: maximum number of points to keep in buffer
"""
# Create session log file
session_time = datetime.now().strftime("%d%b%Y_%H%M").upper()
log_filename = f"stream_{session_time}.bin"
print(f"Starting live stream from {ip}")
print(f"Logging to: {log_filename}")
print("Press Ctrl+C to stop")
plt.ion()
all_records = []
all_raw_data = bytearray()
head = None
fig = None
axes = None
try:
while True:
# Fetch new data
raw = fetch_log(ip, head)
if raw is None:
print("Failed to fetch data, retrying...")
time.sleep(update_interval)
continue
# Save raw data to file
if raw:
all_raw_data.extend(raw)
with open(log_filename, "wb") as f:
f.write(all_raw_data)
# Parse and update head pointer
new_head, new_records = parse_log_data(raw)
if new_head is not None:
head = new_head
# Add new records
if new_records:
all_records.extend(new_records)
print(f"Added {len(new_records)} new records (total: {len(all_records)})")
# Keep only most recent data in memory
if len(all_records) > max_points:
all_records = all_records[-max_points:]
# Create or update plot
if len(all_records) >= 2:
if fig is None:
# Create initial plot
fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True,
gridspec_kw={'height_ratios': [2, 1, 1.5]})
ax1, ax2, ax3 = axes
# Create twin axes
ax1.twinx()
ax3.twinx()
plt.tight_layout()
plt.show(block=False)
# Update existing plot
update_plot_live(fig, axes, all_records)
# Wait before next fetch
plt.pause(update_interval)
except KeyboardInterrupt:
print(f"\nStopping stream... Data saved to {log_filename}")
plt.ioff()
if fig and len(all_records) >= 2:
print("Keeping final plot open...")
plt.show()
def main():
parser = argparse.ArgumentParser(description="Stockcropper Log → CSV + Plot Tool")
parser.add_argument("files", nargs="*", help="log000.bin (for plot) or pattern like log???.bin")
parser.add_argument("--merge", action="store_true", help="Merge all .bin files into one CSV")
parser.add_argument("--plot", action="store_true", help="Generate plot from a single .bin file")
parser.add_argument("--save", action="store_true", help="Save plot to PNG instead of displaying")
parser.add_argument("--stream", action="store_true", help="Stream live data from ESP32 IP address")
parser.add_argument("--interval", type=float, default=1.0, help="Stream update interval (seconds)")
parser.add_argument("--maxpoints", type=int, default=1000, help="Maximum points to keep in stream buffer")
args = parser.parse_args()
# Stream mode
if args.stream:
if len(args.files) != 1:
print("Usage: --stream <IP_ADDRESS>")
print("Example: --stream 192.168.1.100")
sys.exit(1)
stream_data(args.files[0], update_interval=args.interval, max_points=args.maxpoints)
return
# Plot mode
if args.plot:
if len(args.files) != 1 or not args.files[0].endswith(".bin"):
print("Usage: --plot logXXX.bin")
sys.exit(1)
plot_file(args.files[0], save=args.save)
return
# CSV conversion (default mode)
pattern = args.files[0] if args.files else "log???.bin"
bin_files = sorted(glob.glob(pattern))
if not bin_files:
print(f"No .bin files found matching '{pattern}'")
return
if args.merge:
out_csv = "stockcropper_all_merged.csv"
with open(out_csv, "w", encoding="utf-8") as out:
out.write("Source_File,Timestamp_ms,Datetime_UTC,Battery_V,Current_Drive_A,Current_Jack_A,Current_Aux_A,"
"Drive_Counter,Sensor_Drive,Sensor_Jack,FSM_State_ID,FSM_State\n")
for bin_path in bin_files:
records = load_bin_file(bin_path)
for r in records:
out.write(f"{os.path.basename(bin_path)},{r['timestamp_ms']},{r['datetime']},"
f"{r['battery_V']:.3f},{r['current_drive_A']:.3f},{r['current_jack_A']:.3f},"
f"{r['current_aux_A']:.3f},{r['drive_counter']},{r['sensor_drive']},"
f"{r['sensor_jack']},{r['fsm_state']},{r['fsm_name']}\n")
print(f"All {len(bin_files)} files merged → {out_csv}")
else:
for bin_path in bin_files:
csv_path = bin_path.replace(".bin", ".csv")
bin_to_csv(bin_path, csv_path)
if __name__ == "__main__":
main()