Files
rslogger-merger/main.py
Thaddeus Hughes 150cd51cd5 download!
2025-10-15 14:35:27 -05:00

861 lines
31 KiB
Python

import re
import csv
import tkinter as tk
from tkinter import filedialog, messagebox
import subprocess
import os
from datetime import datetime
import platform
import requests
from typing import List, Union
try:
from openpyxl import load_workbook
XLSX_AVAILABLE = True
except ImportError:
XLSX_AVAILABLE = False
# RSLogger Downloader Classes
class RSLoggerDownloader:
def __init__(self, base_url: str):
"""
Initialize downloader for RS Logger device
Args:
base_url: Base URL like "http://rslogger" or "http://192.168.1.100"
"""
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
def get_config(self) -> dict:
"""Get logger configuration parameters"""
url = f"{self.base_url}/logc.xml"
response = self.session.get(url, timeout=5)
response.raise_for_status()
parts = response.text.split('#')
config = {
'date_from': parts[0],
'date_to': parts[1],
'timestamp': int(parts[2]),
'file_mode': int(parts[3]),
'data_format': int(parts[4]),
'timestamp_char': chr(int(parts[5])),
'time_format': int(parts[6])
}
return config
def _get_progress(self, data: bytearray) -> int:
"""Extract progress percentage from end of data"""
length = len(data)
while length >= 2 and data[length-1] == 10 and data[length-2] == 13:
length -= 2
if length > 0:
return data[length - 1]
return 0
def _clear_endofline(self, data: bytearray) -> int:
"""Remove trailing carriage returns and get data length"""
length = len(data)
while length >= 2 and data[length-1] == 10 and data[length-2] == 13:
length -= 2
return length
def decode_control_characters(self, data: bytes) -> str:
"""
Decode the <13>, <10>, etc. control character sequences back to actual characters.
Args:
data: Raw bytes with <NN> sequences
Returns:
String with control characters properly decoded
"""
# Convert bytes to string
text = data.decode('ascii', errors='replace')
# Replace <NN> patterns with actual characters
def replace_control(match):
code = int(match.group(1))
return chr(code)
# Match <digits> pattern and replace with the actual character
decoded = re.sub(r'<(\d+)>', replace_control, text)
return decoded
def parse_log_data(self, chunks: List[bytearray], config: dict, channel: int = 3) -> bytearray:
"""
Parse raw log data chunks into readable format
Args:
chunks: List of raw data chunks
config: Configuration dictionary from get_config()
channel: Channel to parse (1=A, 2=B, 3=both)
Returns:
Parsed data as bytearray
"""
result = bytearray()
# State variables for parsing
state = {
'last_ch': 0,
'last_char': 0,
'day_stamp': True,
'last_ta': 0,
'last_tb': 0,
'h': 0,
'd': 0,
'm': 0,
'y': 0
}
timestamp = config['timestamp']
data_format = config['data_format']
ts_char = config['timestamp_char']
time_format = config['time_format']
# Determine time interval
t_interval = 0
if timestamp >= 50000:
t_interval = timestamp - 50000
elif timestamp > 1000:
t_interval = timestamp - 1000
elif timestamp > 2:
timestamp = 2
for chunk_idx, chunk in enumerate(chunks):
length = self._clear_endofline(chunk)
if length <= 0:
continue
# Remove progress byte at end
length -= 1
if length <= 0:
continue
parsed = self._parse_chunk(
chunk, length, channel, timestamp, t_interval,
data_format, ts_char, time_format, state
)
result.extend(parsed)
return result
def _parse_chunk(self, data: bytearray, length: int, channel: int,
timestamp: int, t_interval: int, data_format: int,
ts_char: str, time_format: int, state: dict) -> bytearray:
"""Parse a single chunk of data"""
result = bytearray()
index = 0
while index < length:
if (length - index) < 4:
break
byte0 = data[index]
byte1 = data[index + 1]
byte2 = data[index + 2]
byte3 = data[index + 3]
# Check if this is a timestamp marker (high bit set)
if byte0 & 0x80:
# Date/time record
state['h'] = byte0 & 0x7F
dtmp = byte1
mtmp = byte2
ytmp = byte3 + 2000
if state['d'] != dtmp or state['m'] != mtmp or state['y'] != ytmp:
state['d'] = dtmp
state['m'] = mtmp
state['y'] = ytmp
if timestamp != 0:
if len(result) > 0:
result.extend(b'\r\n')
date_str = f"[{ytmp}-{mtmp:02d}-{dtmp:02d}]"
result.extend(date_str.encode('ascii'))
state['last_ta'] = 0
state['last_tb'] = 0
state['last_ch'] = 0
state['day_stamp'] = True
else:
# Data record
ch = 'B' if (byte0 & 0x40) else 'A'
ch_mask = 2 if ch == 'B' else 1
minute = byte0 & 0x3F
data_byte = byte1
tu16 = byte2 | (byte3 << 8)
ms = tu16 & 0x3FF
s = (byte3 >> 2) & 0x3F
if (channel & ch_mask):
# Format time string
if time_format == 0:
h_str = f"{state['h']:2d}"
else:
if state['h'] == 0:
h_str = "A12"
elif state['h'] < 12:
h_str = f"A{state['h']:2d}"
elif state['h'] == 12:
h_str = "P12"
else:
h_str = f"P{state['h']-12:2d}"
time_str = f"{h_str}:{minute:02d}:{s:02d}.{ms:03d}"
if channel == 3:
time_str += ch
time_str += ts_char
# Decide if we need to add timestamp based on config
add_time = self._should_add_timestamp(
timestamp, t_interval, ch_mask, s, minute, state['h'],
state, channel
)
if add_time:
result.extend(b'\r\n')
result.extend(time_str.encode('ascii'))
# Add the data byte
if data_format == 1:
# Hex format
result.extend(ts_char.encode('ascii'))
result.extend(f"{data_byte:x}".encode('ascii'))
else:
# ASCII format
if data_byte < 32:
result.extend(f"<{data_byte}>".encode('ascii'))
else:
result.append(data_byte)
state['day_stamp'] = False
state['last_ch'] = ch_mask
state['last_char'] = data_byte
index += 4
return result
def _should_add_timestamp(self, timestamp: int, t_interval: int,
ch_mask: int, s: int, minute: int, h: int,
state: dict, channel: int) -> bool:
"""Determine if timestamp should be added based on configuration"""
if timestamp == 1:
return True
if timestamp >= 50000:
if timestamp < 50256:
if t_interval == state['last_char']:
return True
elif timestamp == 2 or t_interval > 0:
t = s + 60 * minute + 3600 * h
should_add = False
if state['last_ch'] != ch_mask:
should_add = True
if t_interval > 0 and channel != 3:
should_add = False
if t_interval > 0:
t_last = state['last_ta'] if ch_mask == 1 else state['last_tb']
if (t - t_last) > t_interval:
should_add = True
if should_add:
if channel == 3:
state['last_ta'] = t
state['last_tb'] = t
elif ch_mask == 1:
state['last_ta'] = t
else:
state['last_tb'] = t
return True
return False
def download_via_pages(self, output_file: str = None, decode_controls: bool = True) -> Union[bytes, str]:
"""
Download data by fetching pages (like the preview does).
This bypasses the date range issue entirely.
Args:
output_file: Optional filename to save to
decode_controls: If True, decode <13>, <10> etc. to actual control characters
Returns:
Parsed log data as bytes or string (if decoded)
"""
print("Getting configuration...")
config = self.get_config()
print(f"Config: {config}")
all_data = bytearray()
# Get first page
print("\nFetching first page...")
url = f"{self.base_url}/page.xml?Page=0"
response = self.session.get(url, timeout=10)
response.raise_for_status()
first_page = bytearray(response.content)
print(f"First page: {len(first_page)} bytes")
if len(first_page) > 4:
all_data.extend(first_page)
# Get last page to see total size
print("Fetching last page...")
url = f"{self.base_url}/page.xml?Page=1"
response = self.session.get(url, timeout=10)
response.raise_for_status()
last_page = bytearray(response.content)
print(f"Last page: {len(last_page)} bytes")
# Now keep getting next pages until we get back to the last page
print("\nFetching all pages...")
current_page = first_page
page_count = 1
max_pages = 1000 # Safety limit
while page_count < max_pages:
url = f"{self.base_url}/page.xml?Page=3" # 3 = next page
response = self.session.get(url, timeout=10)
response.raise_for_status()
next_page = bytearray(response.content)
# Check if we've reached the end (data repeats)
if next_page == current_page or next_page == last_page:
print(f"Reached end after {page_count} pages")
break
if len(next_page) > 4:
all_data.extend(next_page)
page_count += 1
if page_count % 10 == 0:
print(f"Fetched {page_count} pages, {len(all_data)} bytes total...")
current_page = next_page
print(f"\nTotal raw data collected: {len(all_data)} bytes from {page_count} pages")
# Parse the data
print("Parsing data...")
file_mode = config['file_mode']
# Treat all_data as a single chunk
chunks = [all_data]
if file_mode == 2:
# Separate files for channel A and B
data_a = self.parse_log_data(chunks, config, channel=1)
data_b = self.parse_log_data(chunks, config, channel=2)
# Decode control characters if requested
if decode_controls:
data_a_decoded = self.decode_control_characters(data_a)
data_b_decoded = self.decode_control_characters(data_b)
if output_file:
with open(f"{output_file}_A.txt", 'w', encoding='utf-8') as f:
f.write(data_a_decoded)
with open(f"{output_file}_B.txt", 'w', encoding='utf-8') as f:
f.write(data_b_decoded)
print(f"\nSaved decoded data to {output_file}_A.txt and {output_file}_B.txt")
return data_a_decoded, data_b_decoded
else:
if output_file:
with open(f"{output_file}_A.txt", 'wb') as f:
f.write(data_a)
with open(f"{output_file}_B.txt", 'wb') as f:
f.write(data_b)
print(f"\nSaved raw data to {output_file}_A.txt and {output_file}_B.txt")
return data_a, data_b
else:
# Combined file
data = self.parse_log_data(chunks, config, channel=3)
# Decode control characters if requested
if decode_controls:
data_decoded = self.decode_control_characters(data)
if output_file:
with open(f"{output_file}.txt", 'w', encoding='utf-8') as f:
f.write(data_decoded)
print(f"\nSaved {len(data_decoded)} characters to {output_file}.txt")
return data_decoded
else:
if output_file:
with open(f"{output_file}.txt", 'wb') as f:
f.write(data)
print(f"\nSaved {len(data)} bytes to {output_file}.txt")
return data
# Log parsing and conversion functions
def parse_logs(log_text):
lines = log_text.splitlines()
if lines and '=~' in lines[0]:
lines = lines[1:]
pairs = []
i = 0
while i < len(lines):
line = lines[i].strip()
if not line:
i += 1
continue
# Updated regex to capture any units (lb, kg, g, etc.)
match = re.match(r'^GROSS\s+(\d+)\s*([a-zA-Z]+)$', line)
if match:
weight = int(match.group(1))
units = match.group(2)
i += 2 # Skip to timestamp line
if i < len(lines):
ts_line = lines[i].strip()
if re.match(r'^\d{2}:\d{2} (AM|PM) \d{2}/\d{2}/\d{2}$', ts_line):
# Convert timestamp to Excel-friendly 12-hour format
try:
dt = datetime.strptime(ts_line, '%I:%M %p %m/%d/%y')
excel_ts = dt.strftime('%m/%d/%Y %I:%M %p')
pairs.append((weight, units, excel_ts, dt))
print((weight, units, excel_ts))
except ValueError:
i += 1
continue
i += 1 # Move to next weight entry
else:
i += 1
else:
i += 1
return pairs
def remove_duplicates(pairs):
"""Remove duplicate entries based on weight (within 20) and timestamp (within 1 minute)
Returns list of tuples where each tuple contains all the duplicate entries"""
if not pairs:
return []
# Group entries that are duplicates
deduplicated = []
i = 0
while i < len(pairs):
weight, units, excel_ts, dt = pairs[i]
duplicate_group = [(weight, units, excel_ts)]
# Look ahead for duplicates
j = i + 1
while j < len(pairs):
next_weight, next_units, next_excel_ts, next_dt = pairs[j]
# Check if it's a duplicate:
# - Weight within 20
# - Timestamp within 1 minute
weight_diff = abs(weight - next_weight)
time_diff = abs((dt - next_dt).total_seconds())
if weight_diff <= 20 and time_diff <= 60:
# It's a duplicate, add to group
duplicate_group.append((next_weight, next_units, next_excel_ts))
j += 1
else:
break
# Add the group as a tuple
deduplicated.append(tuple(duplicate_group))
i = j if j > i + 1 else i + 1
return deduplicated
def write_csv1(pairs, filename):
"""Write sequential CSV with duplicates pushed to the right"""
with open(filename, 'w', newline='') as f:
writer = csv.writer(f)
# Find max number of duplicates to determine column count
max_dups = max(len(group) for group in pairs)
# Create headers: WEIGHT, UNITS, TIME repeated for each duplicate
headers = []
for i in range(max_dups):
suffix = f"_{i+1}" if i > 0 else ""
headers.extend([f'WEIGHT{suffix}', f'UNITS{suffix}', f'TIME{suffix}'])
writer.writerow(headers)
# Write data rows
for group in pairs:
row = []
for weight, units, timestamp in group:
row.extend([weight, units, timestamp])
# Pad with empty strings if this group has fewer duplicates than max
while len(row) < max_dups * 3:
row.append('')
writer.writerow(row)
def write_csv2(pairs, filename):
"""Write joined CSV with duplicates pushed to the right"""
with open(filename, 'w', newline='') as f:
writer = csv.writer(f)
# Find max number of duplicates to determine column count
max_dups = max(len(group) for group in pairs) if pairs else 1
# Create headers for gross and tare, repeated for duplicates
headers = []
for i in range(max_dups):
suffix = f"_{i+1}" if i > 0 else ""
headers.extend([f'GROSS_WT{suffix}', f'GROSS_UNITS{suffix}', f'GROSS_T{suffix}'])
for i in range(max_dups):
suffix = f"_{i+1}" if i > 0 else ""
headers.extend([f'TARE_WT{suffix}', f'TARE_UNITS{suffix}', f'TARE_T{suffix}'])
# Add NET columns at the end
for i in range(max_dups):
suffix = f"_{i+1}" if i > 0 else ""
headers.extend([f'NET_WT{suffix}', f'NET_UNITS{suffix}'])
writer.writerow(headers)
# Process pairs (gross/tare)
for j in range(0, len(pairs), 2):
row = []
# Write all GROSS entries
if j < len(pairs):
gross_group = pairs[j]
for weight, units, timestamp in gross_group:
row.extend([weight, units, timestamp])
# Pad gross to max_dups
while len(row) < max_dups * 3:
row.append('')
# Write all TARE entries
if j + 1 < len(pairs):
tare_group = pairs[j + 1]
for weight, units, timestamp in tare_group:
row.extend([weight, units, timestamp])
# Pad tare to max_dups
while len(row) < max_dups * 6:
row.append('')
# Calculate NET for each duplicate pair
gross_group = pairs[j]
for k in range(max_dups):
if k < len(gross_group) and k < len(tare_group):
gross_weight = gross_group[k][0]
gross_units = gross_group[k][1]
tare_weight = tare_group[k][0]
tare_units = tare_group[k][1]
if gross_units == tare_units:
net = gross_weight - tare_weight
net_units = tare_units
else:
net = 'UNIT MISMATCH'
net_units = 'MISMATCH'
row.extend([net, net_units])
else:
row.extend(['', ''])
else:
# Odd number of items, pad tare and net
while len(row) < max_dups * 6:
row.append('')
for k in range(max_dups):
row.extend(['', ''])
writer.writerow(row)
def write_xlsx(pairs, output_filename, template_path='template.xlsx'):
"""Write sequential data to XLSX file using template with duplicates pushed to the right"""
if not XLSX_AVAILABLE:
raise ImportError("openpyxl is required for XLSX export. Install with: pip install openpyxl")
if not os.path.exists(template_path):
raise FileNotFoundError(f"Template file not found: {template_path}")
# Load the template workbook (data_only=False preserves formulas)
wb = load_workbook(template_path, data_only=False)
# Check if SEQUENTIAL sheet exists
if 'SEQUENTIAL' not in wb.sheetnames:
raise ValueError("Template does not contain a 'SEQUENTIAL' sheet")
# Get the SEQUENTIAL sheet
ws = wb['SEQUENTIAL']
# Clear existing data (starting from row 2, assuming row 1 has headers)
# First, delete all rows below the header
if ws.max_row > 1:
ws.delete_rows(2, ws.max_row - 1)
# Find max number of duplicates
max_dups = max(len(group) for group in pairs) if pairs else 1
# Update headers if needed (row 1)
col = 1
for i in range(max_dups):
suffix = f"_{i+1}" if i > 0 else ""
ws.cell(row=1, column=col, value=f'WEIGHT{suffix}')
ws.cell(row=1, column=col+1, value=f'UNITS{suffix}')
ws.cell(row=1, column=col+2, value=f'TIME{suffix}')
col += 3
# Write data starting from row 2
for row_idx, group in enumerate(pairs, start=2):
col = 1
for weight, units, timestamp in group:
ws.cell(row=row_idx, column=col, value=weight)
ws.cell(row=row_idx, column=col+1, value=units)
ws.cell(row=row_idx, column=col+2, value=timestamp)
col += 3
# Set COMBINED as the active sheet (default sheet when opened)
if 'COMBINED' in wb.sheetnames:
wb.active = wb.sheetnames.index('COMBINED')
# Save to new filename (never overwrite template)
wb.save(output_filename)
def show_files_in_explorer(files):
"""Open file explorer and highlight the generated files"""
if not files:
return
# Get the directory of the first file
directory = os.path.dirname(os.path.abspath(files[0]))
system = platform.system()
if system == 'Windows':
# On Windows, use explorer with /select to highlight the file
# If multiple files, just select the first one
subprocess.run(['explorer', '/select,', os.path.abspath(files[0])])
elif system == 'Darwin': # macOS
# On Mac, use 'open' with -R to reveal in Finder
subprocess.run(['open', '-R', os.path.abspath(files[0])])
else: # Linux and others
# On Linux, just open the directory
subprocess.run(['xdg-open', directory])
def run_update_script():
try:
# Run update.sh script
result = subprocess.run(['sh', 'C:\\Program Files\\rslogger-merger\\update.sh'], capture_output=True, text=True, cwd=os.getcwd())
if result.returncode == 0:
messagebox.showinfo("Success", f"update.sh executed successfully!\n\nOutput:\n{result.stdout}")
else:
messagebox.showerror("Error", f"update.sh failed with return code {result.returncode}\n\nError:\n{result.stderr}")
except FileNotFoundError:
messagebox.showerror("Error", "update.sh not found in current directory")
except Exception as e:
messagebox.showerror("Error", f"Failed to run update.sh: {str(e)}")
def convert_local_file():
"""Convert a local file selected by the user"""
input_file = filedialog.askopenfilename(
title="Select Log File to Convert",
filetypes=[("Text files", "*.txt"), ("All files", "*.*")]
)
if not input_file:
return # User cancelled
generate_csv = csv_var.get()
generate_xlsx = xlsx_var.get()
if not generate_csv and not generate_xlsx:
messagebox.showerror("Error", "Please select at least one output format (CSV or XLSX).")
return
# Auto-generate output filenames
base = os.path.splitext(input_file)[0]
output_file1 = f"{base}.sequential.csv"
output_file2 = f"{base}.joined.csv"
output_xlsx = f"{base}.xlsx"
generated_files = []
error_messages = []
try:
with open(input_file, 'r') as f:
log_text = f.read()
pairs = parse_logs(log_text)
# Remove duplicates
pairs = remove_duplicates(pairs)
# Generate CSV files if checked
if generate_csv:
try:
write_csv1(pairs, output_file1)
generated_files.append(output_file1)
write_csv2(pairs, output_file2)
generated_files.append(output_file2)
except Exception as e:
error_messages.append(f"CSV export failed: {str(e)}")
# Generate XLSX file if checked
if generate_xlsx:
try:
write_xlsx(pairs, output_xlsx)
generated_files.append(output_xlsx)
except ImportError as e:
error_messages.append(f"XLSX export skipped: {str(e)}")
except Exception as e:
error_messages.append(f"XLSX export failed: {str(e)}")
# Build success message
if generated_files:
files_list = "\n".join(generated_files)
error_info = ""
if error_messages:
error_info = "\n\nWarnings:\n" + "\n".join(error_messages)
# Custom dialog with "Show the files" button
response = messagebox.askquestion("Success",
f"Files generated:\n{files_list}{error_info}\n\nShow the files?",
icon='info')
if response == 'yes':
show_files_in_explorer(generated_files)
else:
messagebox.showerror("Error", "No files were generated.\n\n" + "\n".join(error_messages))
except Exception as e:
messagebox.showerror("Error", f"An error occurred: {str(e)}")
def download_and_convert():
"""Download from RS Logger and convert"""
# Ask for destination file
output_file = filedialog.asksaveasfilename(
title="Save Downloaded Log As",
defaultextension=".txt",
filetypes=[("Text files", "*.txt"), ("All files", "*.*")]
)
if not output_file:
return # User cancelled
generate_csv = csv_var.get()
generate_xlsx = xlsx_var.get()
if not generate_csv and not generate_xlsx:
messagebox.showerror("Error", "Please select at least one output format (CSV or XLSX).")
return
try:
# Download from RS Logger
messagebox.showinfo("Downloading", "Connecting to RS Logger at http://rslogger\nThis may take a moment...")
downloader = RSLoggerDownloader("http://rslogger")
log_text = downloader.download_via_pages(
output_file=os.path.splitext(output_file)[0],
decode_controls=True
)
if not log_text:
messagebox.showerror("Error", "No data was downloaded from RS Logger")
return
# Now convert the downloaded data
base = os.path.splitext(output_file)[0]
output_file1 = f"{base}.sequential.csv"
output_file2 = f"{base}.joined.csv"
output_xlsx = f"{base}.xlsx"
generated_files = [f"{base}.txt"] # The downloaded txt file
error_messages = []
pairs = parse_logs(log_text)
# Remove duplicates
pairs = remove_duplicates(pairs)
# Generate CSV files if checked
if generate_csv:
try:
write_csv1(pairs, output_file1)
generated_files.append(output_file1)
write_csv2(pairs, output_file2)
generated_files.append(output_file2)
except Exception as e:
error_messages.append(f"CSV export failed: {str(e)}")
# Generate XLSX file if checked
if generate_xlsx:
try:
write_xlsx(pairs, output_xlsx)
generated_files.append(output_xlsx)
except ImportError as e:
error_messages.append(f"XLSX export skipped: {str(e)}")
except Exception as e:
error_messages.append(f"XLSX export failed: {str(e)}")
# Build success message
if generated_files:
files_list = "\n".join(generated_files)
error_info = ""
if error_messages:
error_info = "\n\nWarnings:\n" + "\n".join(error_messages)
# Custom dialog with "Show the files" button
response = messagebox.askquestion("Success",
f"Downloaded and converted!\n\nFiles generated:\n{files_list}{error_info}\n\nShow the files?",
icon='info')
if response == 'yes':
show_files_in_explorer(generated_files)
else:
messagebox.showerror("Error", "No files were generated.\n\n" + "\n".join(error_messages))
except Exception as e:
messagebox.showerror("Error", f"Download failed: {str(e)}")
# GUI Setup
root = tk.Tk()
root.title("RS Logger Converter")
root.geometry("500x220")
csv_var = tk.BooleanVar(value=False) # CSV unchecked by default
xlsx_var = tk.BooleanVar(value=True) # XLSX checked by default
# Title label
title_label = tk.Label(root, text="RS Logger Data Converter", font=("Arial", 16, "bold"))
title_label.pack(pady=10)
# Output format checkboxes
format_frame = tk.Frame(root)
format_frame.pack(pady=5, padx=10, fill='x')
tk.Label(format_frame, text="Output Formats:").pack(side='left')
tk.Checkbutton(format_frame, text="CSV", variable=csv_var).pack(side='left', padx=10)
tk.Checkbutton(format_frame, text="XLSX", variable=xlsx_var).pack(side='left', padx=10)
# Convert local file button
tk.Button(root, text="CONVERT LOCAL FILE", command=convert_local_file,
font=("Arial", 12), relief="raised", bg="#2196F3", fg="white").pack(pady=5, padx=10, fill='x')
# Download and convert button
tk.Button(root, text="DOWNLOAD FROM RS LOGGER", command=download_and_convert,
font=("Arial", 12), relief="raised", bg="#4CAF50", fg="white").pack(pady=5, padx=10, fill='x')
# Update script button
tk.Button(root, text="Update this Application", command=run_update_script,
font=("Arial", 10), relief="raised").pack(pady=5, padx=10, fill='x')
root.mainloop()