#!/usr/bin/env python3 """ Stockcropper ESP32 Log Tool - Convert .bin → .csv (with V and A) - Generate beautiful multi-panel plots from a single .bin file - Stream and visualize live data from ESP32 """ import struct import glob import os import sys import argparse from datetime import datetime, timezone from pathlib import Path import numpy as np import matplotlib.pyplot as plt from matplotlib.patches import Rectangle import requests import time def fetch_log(ip, tail=None): """Fetch log data from ESP32 device""" url = f"http://{ip}/log" print(f"Fetching from {url}" + (f" (tail={tail})" if tail else "")) try: if tail is not None: resp = requests.post(url, json=int(tail), timeout=5) else: resp = requests.get(url, timeout=5) if resp.status_code == 200: print(f"Received {len(resp.content)} bytes") return resp.content else: print(f"Failed to fetch log: {resp.status_code} {resp.text}") return None except Exception as e: print(f"Error fetching log: {e}") return None ENTRY_SIZE = 32 FSM_STATE_NAMES = { 0: "IDLE", 1: "MOVE_START_DELAY", 2: "JACK_UP", 3: "DRIVE_START_DELAY", 4: "DRIVE", 5: "DRIVE_END_DELAY", 6: "JACK_DOWN", 7: "UNDO_JACK", 8: "UNDO_JACK_START", } STATE_COLORS = { 0: "#808080", # gray 1: "#d0d0d0", 2: "#3399ff", # blue 3: "#99ffaa", # light green 4: "#00ff00", # green 5: "#aaff99", # light green 6: "#aa55ee", 7: "#ff0000", # bright red 8: "#ff9999", # light red } def parse_log_data(raw): """ Parse binary log data and return (head, records) Returns: tuple: (head position, list of record dicts) """ if len(raw) < 8: print("Data too short for header") return None, [] (tail, head) = struct.unpack(">ll", raw[0:8]) print(f"Log header: tail={tail}, head={head}") offset = 8 records = [] while offset + ENTRY_SIZE <= len(raw): entry = raw[offset:offset + ENTRY_SIZE] if entry[0] != ENTRY_SIZE: # Hit the end of valid data break try: fields = struct.unpack(">BQiiiiiBBB", entry) except: # Corrupt or incomplete entry break ts_ms = fields[1] battery_mv = fields[2] i_drive = fields[3] / 1000.0 # mA → A i_jack = fields[4] / 1000.0 i_aux = fields[5] / 1000.0 counter = fields[6] sens_drive = fields[7] sens_jack = fields[8] state = fields[9] # Try to parse datetime dt = None try: dt = datetime.fromtimestamp(ts_ms / 1000.0, tz=timezone.utc) except: pass records.append({ "timestamp_ms": ts_ms, "datetime": dt, "battery_V": battery_mv / 1000.0, "current_drive_A": i_drive, "current_jack_A": i_jack, "current_aux_A": i_aux, "drive_counter": counter, "sensor_drive": sens_drive, "sensor_jack": sens_jack, "fsm_state": state, "fsm_name": FSM_STATE_NAMES.get(state, f"UNK({state})") }) offset += ENTRY_SIZE return head, records def load_bin_file(bin_path): """Load and parse a .bin file""" with open(bin_path, "rb") as f: raw = f.read() _, records = parse_log_data(raw) return records def bin_to_csv(bin_path, csv_path): """Convert a .bin file to CSV""" records = load_bin_file(bin_path) if not records: print(f"No valid records in {bin_path}") return with open(csv_path, "w", encoding="utf-8") as f: f.write("Timestamp_ms,Datetime_UTC,Battery_V,Current_Drive_A,Current_Jack_A,Current_Aux_A," "Drive_Counter,Sensor_Drive,Sensor_Jack,FSM_State_ID,FSM_State\n") for r in records: f.write(f"{r['timestamp_ms']},{r['datetime']},{r['battery_V']:.3f}," f"{r['current_drive_A']:.3f},{r['current_jack_A']:.3f},{r['current_aux_A']:.3f}," f"{r['drive_counter']},{r['sensor_drive']},{r['sensor_jack']}," f"{r['fsm_state']},{r['fsm_name']}\n") print(f"→ {len(records)} entries → {csv_path}") def plot_log(records, title_suffix=""): """Generate a multi-panel plot from log records""" if len(records) < 2: print("Not enough data to plot.") return arr = np.array([ (r['timestamp_ms'], r['battery_V'], r['current_drive_A'], r['current_jack_A'], r['current_aux_A'], r['drive_counter'], r['sensor_drive'], r['sensor_jack'], r['fsm_state']) for r in records ], dtype=np.float64) t_ms = arr[:,0] t_rel = (t_ms - t_ms[0]) / 1000.0 # seconds since start fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(14, 10), sharex=True, gridspec_kw={'height_ratios': [2, 1, 1.5]}) # --- Plot 1: Voltage + Currents --- ax1.plot(t_rel, arr[:,1], label="Battery", color="black", marker='o', linewidth=2, markersize=4) ax1.set_ylabel("Battery Voltage (V)") ax1.grid(True, alpha=0.3) ax1.legend(loc="upper left") ax1.set_ylim(0, 15.0) ax1b = ax1.twinx() ax1b.plot(t_rel, arr[:,2], label="Drive", color="tab:blue") ax1b.plot(t_rel, arr[:,3], label="Jack", color="tab:red") ax1b.plot(t_rel, arr[:,4], label="Aux", color="tab:orange") ax1b.set_ylabel("Current (A)") ax1b.legend(loc="upper right") ax1b.set_ylim(min(-1, np.min(arr[:,2:5])), max(20, np.max(arr[:,2:5]))) # --- Plot 2: FSM State (colored background) --- ax2.set_ylim(-0.0, 1.0) ax2.set_yticks([]) prev_t = t_rel[0] prev_state = int(arr[0,8]) for i in range(1, len(t_rel)): state = int(arr[i,8]) if state != prev_state or i == len(t_rel)-1: color = STATE_COLORS.get(prev_state, "#cccccc") width = t_rel[i-1] - prev_t if i < len(t_rel) else t_rel[-1] - prev_t ax2.add_patch(Rectangle((prev_t, 0), width, 1, facecolor=color, edgecolor="none", alpha=0.6)) mid = (prev_t + t_rel[i-1]) / 2 ax2.text(mid, 0.5, FSM_STATE_NAMES.get(prev_state, f"UNK{prev_state}"), ha="center", va="center", fontsize=10, color="black") prev_t = t_rel[i-1] prev_state = state ax2.set_ylabel("FSM State") # --- Plot 3: Sensors + Counter --- ax3.step(t_rel, arr[:,5], where='post', label="Drive Counter", color="purple") ax3.set_ylabel("Drive Counter") ax3.grid(True, alpha=0.3) ax3b = ax3.twinx() ax3b.plot(t_rel, arr[:,6], drawstyle="steps-post", label="Sensor Drive", color="green", linewidth=2) ax3b.plot(t_rel, arr[:,7], drawstyle="steps-post", label="Sensor Jack", color="cyan", linewidth=2) ax3b.set_ylim(-0.1, 1.1) ax3b.set_yticks([0, 1]) ax3b.set_ylabel("Sensors (0/1)") ax3b.legend(loc="upper right") ax3.set_xlabel("Time (seconds since start)") # Format title if records[0]['datetime'] and records[-1]['datetime']: time_str = (f"{records[0]['datetime'].strftime('%Y-%m-%d %H:%M:%S')} UTC → " f"{records[-1]['datetime'].strftime('%H:%M:%S')} UTC") else: time_str = f"{records[0]['timestamp_ms']} → {records[-1]['timestamp_ms']}" plt.suptitle(f"Stockcropper Log{title_suffix} | {len(records)} samples | {time_str}", fontsize=14) plt.tight_layout() return fig def plot_file(bin_path, save=False): """Plot data from a .bin file""" records = load_bin_file(bin_path) if not records: print(f"No data to plot in {bin_path}") return fig = plot_log(records) if save: plot_path = bin_path.replace(".bin", ".png") plt.savefig(plot_path, dpi=150, bbox_inches='tight') print(f"Plot saved → {plot_path}") else: plt.show() def update_plot_live(fig, axes, records): """Update existing plot with new data, preserving view limits""" if len(records) < 2: return ax1, ax2, ax3 = axes ax1b = ax1.get_shared_x_axes().get_siblings(ax1)[0] if hasattr(ax1, 'get_shared_x_axes') else None ax3b = ax3.get_shared_x_axes().get_siblings(ax3)[0] if hasattr(ax3, 'get_shared_x_axes') else None # Get current view limits before clearing xlim = ax1.get_xlim() ylim1 = ax1.get_ylim() ylim1b = ax1.lines[0].axes.get_ylim() if ax1.lines else None # Find twin axes properly for ax in fig.get_axes(): if ax != ax1 and ax != ax2 and ax != ax3 and ax.bbox.bounds == ax1.bbox.bounds: ax1b = ax ylim1b = ax1b.get_ylim() break # Prepare data arr = np.array([ (r['timestamp_ms'], r['battery_V'], r['current_drive_A'], r['current_jack_A'], r['current_aux_A'], r['drive_counter'], r['sensor_drive'], r['sensor_jack'], r['fsm_state']) for r in records ], dtype=np.float64) t_ms = arr[:,0] t_rel = (t_ms - t_ms[0]) / 1000.0 # Clear axes but keep them ax1.clear() if ax1b: ax1b.clear() ax2.clear() ax3.clear() if ax3b: for ax in fig.get_axes(): if ax != ax1 and ax != ax2 and ax != ax3 and ax.bbox.bounds == ax3.bbox.bounds: ax.clear() ax3b = ax break # Redraw plot 1 ax1.plot(t_rel, arr[:,1], label="Battery", color="black", marker='o', linewidth=2, markersize=4) ax1.set_ylabel("Battery Voltage (V)") ax1.grid(True, alpha=0.3) ax1.legend(loc="upper left") ax1.set_ylim(ylim1) if ax1b: ax1b.plot(t_rel, arr[:,2], label="Drive", color="tab:blue") ax1b.plot(t_rel, arr[:,3], label="Jack", color="tab:red") ax1b.plot(t_rel, arr[:,4], label="Aux", color="tab:orange") ax1b.set_ylabel("Current (A)") ax1b.legend(loc="upper right") if ylim1b: ax1b.set_ylim(ylim1b) # Redraw plot 2 (FSM states) ax2.set_ylim(-0.0, 1.0) ax2.set_yticks([]) prev_t = t_rel[0] prev_state = int(arr[0,8]) for i in range(1, len(t_rel)): state = int(arr[i,8]) if state != prev_state or i == len(t_rel)-1: color = STATE_COLORS.get(prev_state, "#cccccc") width = t_rel[i-1] - prev_t if i < len(t_rel) else t_rel[-1] - prev_t ax2.add_patch(Rectangle((prev_t, 0), width, 1, facecolor=color, edgecolor="none", alpha=0.6)) mid = (prev_t + t_rel[i-1]) / 2 ax2.text(mid, 0.5, FSM_STATE_NAMES.get(prev_state, f"UNK{prev_state}"), ha="center", va="center", fontsize=10, color="black") prev_t = t_rel[i-1] prev_state = state ax2.set_ylabel("FSM State") # Redraw plot 3 ax3.step(t_rel, arr[:,5], where='post', label="Drive Counter", color="purple") ax3.set_ylabel("Drive Counter") ax3.grid(True, alpha=0.3) if ax3b: ax3b.plot(t_rel, arr[:,6], drawstyle="steps-post", label="Sensor Drive", color="green", linewidth=2) ax3b.plot(t_rel, arr[:,7], drawstyle="steps-post", label="Sensor Jack", color="cyan", linewidth=2) ax3b.set_ylim(-0.1, 1.1) ax3b.set_yticks([0, 1]) ax3b.set_ylabel("Sensors (0/1)") ax3b.legend(loc="upper right") ax3.set_xlabel("Time (seconds since start)") # Restore x limits ax1.set_xlim(xlim) # Update title if records[0]['datetime'] and records[-1]['datetime']: time_str = (f"{records[0]['datetime'].strftime('%Y-%m-%d %H:%M:%S')} UTC → " f"{records[-1]['datetime'].strftime('%H:%M:%S')} UTC") else: time_str = f"{records[0]['timestamp_ms']} → {records[-1]['timestamp_ms']}" fig.suptitle(f"Stockcropper Log [LIVE] | {len(records)} samples | {time_str}", fontsize=14) fig.canvas.draw_idle() fig.canvas.flush_events() def stream_data(ip, update_interval=1.0, max_points=1000): """ Stream data from ESP32 and display live plot Args: ip: IP address of ESP32 update_interval: seconds between fetches max_points: maximum number of points to keep in buffer """ # Create session log file session_time = datetime.now().strftime("%d%b%Y_%H%M").upper() log_filename = f"stream_{session_time}.bin" print(f"Starting live stream from {ip}") print(f"Logging to: {log_filename}") print("Press Ctrl+C to stop") plt.ion() all_records = [] all_raw_data = bytearray() head = None fig = None axes = None try: while True: # Fetch new data raw = fetch_log(ip, head) if raw is None: print("Failed to fetch data, retrying...") time.sleep(update_interval) continue # Save raw data to file if raw: all_raw_data.extend(raw) with open(log_filename, "wb") as f: f.write(all_raw_data) # Parse and update head pointer new_head, new_records = parse_log_data(raw) if new_head is not None: head = new_head # Add new records if new_records: all_records.extend(new_records) print(f"Added {len(new_records)} new records (total: {len(all_records)})") # Keep only most recent data in memory if len(all_records) > max_points: all_records = all_records[-max_points:] # Create or update plot if len(all_records) >= 2: if fig is None: # Create initial plot fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True, gridspec_kw={'height_ratios': [2, 1, 1.5]}) ax1, ax2, ax3 = axes # Create twin axes ax1.twinx() ax3.twinx() plt.tight_layout() plt.show(block=False) # Update existing plot update_plot_live(fig, axes, all_records) # Wait before next fetch plt.pause(update_interval) except KeyboardInterrupt: print(f"\nStopping stream... Data saved to {log_filename}") plt.ioff() if fig and len(all_records) >= 2: print("Keeping final plot open...") plt.show() def main(): parser = argparse.ArgumentParser(description="Stockcropper Log → CSV + Plot Tool") parser.add_argument("files", nargs="*", help="log000.bin (for plot) or pattern like log???.bin") parser.add_argument("--merge", action="store_true", help="Merge all .bin files into one CSV") parser.add_argument("--plot", action="store_true", help="Generate plot from a single .bin file") parser.add_argument("--save", action="store_true", help="Save plot to PNG instead of displaying") parser.add_argument("--stream", action="store_true", help="Stream live data from ESP32 IP address") parser.add_argument("--interval", type=float, default=1.0, help="Stream update interval (seconds)") parser.add_argument("--maxpoints", type=int, default=1000, help="Maximum points to keep in stream buffer") args = parser.parse_args() # Stream mode if args.stream: if len(args.files) != 1: print("Usage: --stream ") print("Example: --stream 192.168.1.100") sys.exit(1) stream_data(args.files[0], update_interval=args.interval, max_points=args.maxpoints) return # Plot mode if args.plot: if len(args.files) != 1 or not args.files[0].endswith(".bin"): print("Usage: --plot logXXX.bin") sys.exit(1) plot_file(args.files[0], save=args.save) return # CSV conversion (default mode) pattern = args.files[0] if args.files else "log???.bin" bin_files = sorted(glob.glob(pattern)) if not bin_files: print(f"No .bin files found matching '{pattern}'") return if args.merge: out_csv = "stockcropper_all_merged.csv" with open(out_csv, "w", encoding="utf-8") as out: out.write("Source_File,Timestamp_ms,Datetime_UTC,Battery_V,Current_Drive_A,Current_Jack_A,Current_Aux_A," "Drive_Counter,Sensor_Drive,Sensor_Jack,FSM_State_ID,FSM_State\n") for bin_path in bin_files: records = load_bin_file(bin_path) for r in records: out.write(f"{os.path.basename(bin_path)},{r['timestamp_ms']},{r['datetime']}," f"{r['battery_V']:.3f},{r['current_drive_A']:.3f},{r['current_jack_A']:.3f}," f"{r['current_aux_A']:.3f},{r['drive_counter']},{r['sensor_drive']}," f"{r['sensor_jack']},{r['fsm_state']},{r['fsm_name']}\n") print(f"All {len(bin_files)} files merged → {out_csv}") else: for bin_path in bin_files: csv_path = bin_path.replace(".bin", ".csv") bin_to_csv(bin_path, csv_path) if __name__ == "__main__": main()