861 lines
31 KiB
Python
861 lines
31 KiB
Python
import re
|
|
import csv
|
|
import tkinter as tk
|
|
from tkinter import filedialog, messagebox
|
|
import subprocess
|
|
import os
|
|
from datetime import datetime
|
|
import platform
|
|
import requests
|
|
from typing import List, Union
|
|
try:
|
|
from openpyxl import load_workbook
|
|
XLSX_AVAILABLE = True
|
|
except ImportError:
|
|
XLSX_AVAILABLE = False
|
|
|
|
# RSLogger Downloader Classes
|
|
class RSLoggerDownloader:
|
|
def __init__(self, base_url: str):
|
|
"""
|
|
Initialize downloader for RS Logger device
|
|
|
|
Args:
|
|
base_url: Base URL like "http://rslogger" or "http://192.168.1.100"
|
|
"""
|
|
self.base_url = base_url.rstrip('/')
|
|
self.session = requests.Session()
|
|
|
|
def get_config(self) -> dict:
|
|
"""Get logger configuration parameters"""
|
|
url = f"{self.base_url}/logc.xml"
|
|
response = self.session.get(url, timeout=5)
|
|
response.raise_for_status()
|
|
|
|
parts = response.text.split('#')
|
|
config = {
|
|
'date_from': parts[0],
|
|
'date_to': parts[1],
|
|
'timestamp': int(parts[2]),
|
|
'file_mode': int(parts[3]),
|
|
'data_format': int(parts[4]),
|
|
'timestamp_char': chr(int(parts[5])),
|
|
'time_format': int(parts[6])
|
|
}
|
|
|
|
return config
|
|
|
|
def _get_progress(self, data: bytearray) -> int:
|
|
"""Extract progress percentage from end of data"""
|
|
length = len(data)
|
|
while length >= 2 and data[length-1] == 10 and data[length-2] == 13:
|
|
length -= 2
|
|
|
|
if length > 0:
|
|
return data[length - 1]
|
|
return 0
|
|
|
|
def _clear_endofline(self, data: bytearray) -> int:
|
|
"""Remove trailing carriage returns and get data length"""
|
|
length = len(data)
|
|
while length >= 2 and data[length-1] == 10 and data[length-2] == 13:
|
|
length -= 2
|
|
return length
|
|
|
|
def decode_control_characters(self, data: bytes) -> str:
|
|
"""
|
|
Decode the <13>, <10>, etc. control character sequences back to actual characters.
|
|
|
|
Args:
|
|
data: Raw bytes with <NN> sequences
|
|
|
|
Returns:
|
|
String with control characters properly decoded
|
|
"""
|
|
# Convert bytes to string
|
|
text = data.decode('ascii', errors='replace')
|
|
|
|
# Replace <NN> patterns with actual characters
|
|
def replace_control(match):
|
|
code = int(match.group(1))
|
|
return chr(code)
|
|
|
|
# Match <digits> pattern and replace with the actual character
|
|
decoded = re.sub(r'<(\d+)>', replace_control, text)
|
|
|
|
return decoded
|
|
|
|
def parse_log_data(self, chunks: List[bytearray], config: dict, channel: int = 3) -> bytearray:
|
|
"""
|
|
Parse raw log data chunks into readable format
|
|
|
|
Args:
|
|
chunks: List of raw data chunks
|
|
config: Configuration dictionary from get_config()
|
|
channel: Channel to parse (1=A, 2=B, 3=both)
|
|
|
|
Returns:
|
|
Parsed data as bytearray
|
|
"""
|
|
result = bytearray()
|
|
|
|
# State variables for parsing
|
|
state = {
|
|
'last_ch': 0,
|
|
'last_char': 0,
|
|
'day_stamp': True,
|
|
'last_ta': 0,
|
|
'last_tb': 0,
|
|
'h': 0,
|
|
'd': 0,
|
|
'm': 0,
|
|
'y': 0
|
|
}
|
|
|
|
timestamp = config['timestamp']
|
|
data_format = config['data_format']
|
|
ts_char = config['timestamp_char']
|
|
time_format = config['time_format']
|
|
|
|
# Determine time interval
|
|
t_interval = 0
|
|
if timestamp >= 50000:
|
|
t_interval = timestamp - 50000
|
|
elif timestamp > 1000:
|
|
t_interval = timestamp - 1000
|
|
elif timestamp > 2:
|
|
timestamp = 2
|
|
|
|
for chunk_idx, chunk in enumerate(chunks):
|
|
length = self._clear_endofline(chunk)
|
|
if length <= 0:
|
|
continue
|
|
|
|
# Remove progress byte at end
|
|
length -= 1
|
|
|
|
if length <= 0:
|
|
continue
|
|
|
|
parsed = self._parse_chunk(
|
|
chunk, length, channel, timestamp, t_interval,
|
|
data_format, ts_char, time_format, state
|
|
)
|
|
result.extend(parsed)
|
|
|
|
return result
|
|
|
|
def _parse_chunk(self, data: bytearray, length: int, channel: int,
|
|
timestamp: int, t_interval: int, data_format: int,
|
|
ts_char: str, time_format: int, state: dict) -> bytearray:
|
|
"""Parse a single chunk of data"""
|
|
result = bytearray()
|
|
index = 0
|
|
|
|
while index < length:
|
|
if (length - index) < 4:
|
|
break
|
|
|
|
byte0 = data[index]
|
|
byte1 = data[index + 1]
|
|
byte2 = data[index + 2]
|
|
byte3 = data[index + 3]
|
|
|
|
# Check if this is a timestamp marker (high bit set)
|
|
if byte0 & 0x80:
|
|
# Date/time record
|
|
state['h'] = byte0 & 0x7F
|
|
dtmp = byte1
|
|
mtmp = byte2
|
|
ytmp = byte3 + 2000
|
|
|
|
if state['d'] != dtmp or state['m'] != mtmp or state['y'] != ytmp:
|
|
state['d'] = dtmp
|
|
state['m'] = mtmp
|
|
state['y'] = ytmp
|
|
|
|
if timestamp != 0:
|
|
if len(result) > 0:
|
|
result.extend(b'\r\n')
|
|
date_str = f"[{ytmp}-{mtmp:02d}-{dtmp:02d}]"
|
|
result.extend(date_str.encode('ascii'))
|
|
|
|
state['last_ta'] = 0
|
|
state['last_tb'] = 0
|
|
state['last_ch'] = 0
|
|
state['day_stamp'] = True
|
|
else:
|
|
# Data record
|
|
ch = 'B' if (byte0 & 0x40) else 'A'
|
|
ch_mask = 2 if ch == 'B' else 1
|
|
|
|
minute = byte0 & 0x3F
|
|
data_byte = byte1
|
|
tu16 = byte2 | (byte3 << 8)
|
|
ms = tu16 & 0x3FF
|
|
s = (byte3 >> 2) & 0x3F
|
|
|
|
if (channel & ch_mask):
|
|
# Format time string
|
|
if time_format == 0:
|
|
h_str = f"{state['h']:2d}"
|
|
else:
|
|
if state['h'] == 0:
|
|
h_str = "A12"
|
|
elif state['h'] < 12:
|
|
h_str = f"A{state['h']:2d}"
|
|
elif state['h'] == 12:
|
|
h_str = "P12"
|
|
else:
|
|
h_str = f"P{state['h']-12:2d}"
|
|
|
|
time_str = f"{h_str}:{minute:02d}:{s:02d}.{ms:03d}"
|
|
if channel == 3:
|
|
time_str += ch
|
|
time_str += ts_char
|
|
|
|
# Decide if we need to add timestamp based on config
|
|
add_time = self._should_add_timestamp(
|
|
timestamp, t_interval, ch_mask, s, minute, state['h'],
|
|
state, channel
|
|
)
|
|
|
|
if add_time:
|
|
result.extend(b'\r\n')
|
|
result.extend(time_str.encode('ascii'))
|
|
|
|
# Add the data byte
|
|
if data_format == 1:
|
|
# Hex format
|
|
result.extend(ts_char.encode('ascii'))
|
|
result.extend(f"{data_byte:x}".encode('ascii'))
|
|
else:
|
|
# ASCII format
|
|
if data_byte < 32:
|
|
result.extend(f"<{data_byte}>".encode('ascii'))
|
|
else:
|
|
result.append(data_byte)
|
|
|
|
state['day_stamp'] = False
|
|
|
|
state['last_ch'] = ch_mask
|
|
state['last_char'] = data_byte
|
|
|
|
index += 4
|
|
|
|
return result
|
|
|
|
def _should_add_timestamp(self, timestamp: int, t_interval: int,
|
|
ch_mask: int, s: int, minute: int, h: int,
|
|
state: dict, channel: int) -> bool:
|
|
"""Determine if timestamp should be added based on configuration"""
|
|
if timestamp == 1:
|
|
return True
|
|
|
|
if timestamp >= 50000:
|
|
if timestamp < 50256:
|
|
if t_interval == state['last_char']:
|
|
return True
|
|
elif timestamp == 2 or t_interval > 0:
|
|
t = s + 60 * minute + 3600 * h
|
|
should_add = False
|
|
|
|
if state['last_ch'] != ch_mask:
|
|
should_add = True
|
|
if t_interval > 0 and channel != 3:
|
|
should_add = False
|
|
|
|
if t_interval > 0:
|
|
t_last = state['last_ta'] if ch_mask == 1 else state['last_tb']
|
|
if (t - t_last) > t_interval:
|
|
should_add = True
|
|
|
|
if should_add:
|
|
if channel == 3:
|
|
state['last_ta'] = t
|
|
state['last_tb'] = t
|
|
elif ch_mask == 1:
|
|
state['last_ta'] = t
|
|
else:
|
|
state['last_tb'] = t
|
|
return True
|
|
|
|
return False
|
|
|
|
def download_via_pages(self, output_file: str = None, decode_controls: bool = True) -> Union[bytes, str]:
|
|
"""
|
|
Download data by fetching pages (like the preview does).
|
|
This bypasses the date range issue entirely.
|
|
|
|
Args:
|
|
output_file: Optional filename to save to
|
|
decode_controls: If True, decode <13>, <10> etc. to actual control characters
|
|
|
|
Returns:
|
|
Parsed log data as bytes or string (if decoded)
|
|
"""
|
|
print("Getting configuration...")
|
|
config = self.get_config()
|
|
print(f"Config: {config}")
|
|
|
|
all_data = bytearray()
|
|
|
|
# Get first page
|
|
print("\nFetching first page...")
|
|
url = f"{self.base_url}/page.xml?Page=0"
|
|
response = self.session.get(url, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
first_page = bytearray(response.content)
|
|
print(f"First page: {len(first_page)} bytes")
|
|
|
|
if len(first_page) > 4:
|
|
all_data.extend(first_page)
|
|
|
|
# Get last page to see total size
|
|
print("Fetching last page...")
|
|
url = f"{self.base_url}/page.xml?Page=1"
|
|
response = self.session.get(url, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
last_page = bytearray(response.content)
|
|
print(f"Last page: {len(last_page)} bytes")
|
|
|
|
# Now keep getting next pages until we get back to the last page
|
|
print("\nFetching all pages...")
|
|
current_page = first_page
|
|
page_count = 1
|
|
max_pages = 1000 # Safety limit
|
|
|
|
while page_count < max_pages:
|
|
url = f"{self.base_url}/page.xml?Page=3" # 3 = next page
|
|
response = self.session.get(url, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
next_page = bytearray(response.content)
|
|
|
|
# Check if we've reached the end (data repeats)
|
|
if next_page == current_page or next_page == last_page:
|
|
print(f"Reached end after {page_count} pages")
|
|
break
|
|
|
|
if len(next_page) > 4:
|
|
all_data.extend(next_page)
|
|
page_count += 1
|
|
if page_count % 10 == 0:
|
|
print(f"Fetched {page_count} pages, {len(all_data)} bytes total...")
|
|
|
|
current_page = next_page
|
|
|
|
print(f"\nTotal raw data collected: {len(all_data)} bytes from {page_count} pages")
|
|
|
|
# Parse the data
|
|
print("Parsing data...")
|
|
file_mode = config['file_mode']
|
|
|
|
# Treat all_data as a single chunk
|
|
chunks = [all_data]
|
|
|
|
if file_mode == 2:
|
|
# Separate files for channel A and B
|
|
data_a = self.parse_log_data(chunks, config, channel=1)
|
|
data_b = self.parse_log_data(chunks, config, channel=2)
|
|
|
|
# Decode control characters if requested
|
|
if decode_controls:
|
|
data_a_decoded = self.decode_control_characters(data_a)
|
|
data_b_decoded = self.decode_control_characters(data_b)
|
|
|
|
if output_file:
|
|
with open(f"{output_file}_A.txt", 'w', encoding='utf-8') as f:
|
|
f.write(data_a_decoded)
|
|
with open(f"{output_file}_B.txt", 'w', encoding='utf-8') as f:
|
|
f.write(data_b_decoded)
|
|
print(f"\nSaved decoded data to {output_file}_A.txt and {output_file}_B.txt")
|
|
|
|
return data_a_decoded, data_b_decoded
|
|
else:
|
|
if output_file:
|
|
with open(f"{output_file}_A.txt", 'wb') as f:
|
|
f.write(data_a)
|
|
with open(f"{output_file}_B.txt", 'wb') as f:
|
|
f.write(data_b)
|
|
print(f"\nSaved raw data to {output_file}_A.txt and {output_file}_B.txt")
|
|
|
|
return data_a, data_b
|
|
else:
|
|
# Combined file
|
|
data = self.parse_log_data(chunks, config, channel=3)
|
|
|
|
# Decode control characters if requested
|
|
if decode_controls:
|
|
data_decoded = self.decode_control_characters(data)
|
|
|
|
if output_file:
|
|
with open(f"{output_file}.txt", 'w', encoding='utf-8') as f:
|
|
f.write(data_decoded)
|
|
print(f"\nSaved {len(data_decoded)} characters to {output_file}.txt")
|
|
|
|
return data_decoded
|
|
else:
|
|
if output_file:
|
|
with open(f"{output_file}.txt", 'wb') as f:
|
|
f.write(data)
|
|
print(f"\nSaved {len(data)} bytes to {output_file}.txt")
|
|
|
|
return data
|
|
|
|
|
|
# Log parsing and conversion functions
|
|
def parse_logs(log_text):
|
|
lines = log_text.splitlines()
|
|
if lines and '=~' in lines[0]:
|
|
lines = lines[1:]
|
|
|
|
pairs = []
|
|
i = 0
|
|
while i < len(lines):
|
|
line = lines[i].strip()
|
|
if not line:
|
|
i += 1
|
|
continue
|
|
|
|
# Updated regex to capture any units (lb, kg, g, etc.)
|
|
match = re.match(r'^GROSS\s+(\d+)\s*([a-zA-Z]+)$', line)
|
|
if match:
|
|
weight = int(match.group(1))
|
|
units = match.group(2)
|
|
i += 2 # Skip to timestamp line
|
|
if i < len(lines):
|
|
ts_line = lines[i].strip()
|
|
if re.match(r'^\d{2}:\d{2} (AM|PM) \d{2}/\d{2}/\d{2}$', ts_line):
|
|
# Convert timestamp to Excel-friendly 12-hour format
|
|
try:
|
|
dt = datetime.strptime(ts_line, '%I:%M %p %m/%d/%y')
|
|
excel_ts = dt.strftime('%m/%d/%Y %I:%M %p')
|
|
pairs.append((weight, units, excel_ts, dt))
|
|
print((weight, units, excel_ts))
|
|
except ValueError:
|
|
i += 1
|
|
continue
|
|
i += 1 # Move to next weight entry
|
|
else:
|
|
i += 1
|
|
else:
|
|
i += 1
|
|
|
|
return pairs
|
|
|
|
def remove_duplicates(pairs):
|
|
"""Remove duplicate entries based on weight (within 20) and timestamp (within 1 minute)
|
|
Returns list of tuples where each tuple contains all the duplicate entries"""
|
|
if not pairs:
|
|
return []
|
|
|
|
# Group entries that are duplicates
|
|
deduplicated = []
|
|
i = 0
|
|
|
|
while i < len(pairs):
|
|
weight, units, excel_ts, dt = pairs[i]
|
|
duplicate_group = [(weight, units, excel_ts)]
|
|
|
|
# Look ahead for duplicates
|
|
j = i + 1
|
|
while j < len(pairs):
|
|
next_weight, next_units, next_excel_ts, next_dt = pairs[j]
|
|
|
|
# Check if it's a duplicate:
|
|
# - Weight within 20
|
|
# - Timestamp within 1 minute
|
|
weight_diff = abs(weight - next_weight)
|
|
time_diff = abs((dt - next_dt).total_seconds())
|
|
|
|
if weight_diff <= 20 and time_diff <= 60:
|
|
# It's a duplicate, add to group
|
|
duplicate_group.append((next_weight, next_units, next_excel_ts))
|
|
j += 1
|
|
else:
|
|
break
|
|
|
|
# Add the group as a tuple
|
|
deduplicated.append(tuple(duplicate_group))
|
|
i = j if j > i + 1 else i + 1
|
|
|
|
return deduplicated
|
|
|
|
def write_csv1(pairs, filename):
|
|
"""Write sequential CSV with duplicates pushed to the right"""
|
|
with open(filename, 'w', newline='') as f:
|
|
writer = csv.writer(f)
|
|
|
|
# Find max number of duplicates to determine column count
|
|
max_dups = max(len(group) for group in pairs)
|
|
|
|
# Create headers: WEIGHT, UNITS, TIME repeated for each duplicate
|
|
headers = []
|
|
for i in range(max_dups):
|
|
suffix = f"_{i+1}" if i > 0 else ""
|
|
headers.extend([f'WEIGHT{suffix}', f'UNITS{suffix}', f'TIME{suffix}'])
|
|
writer.writerow(headers)
|
|
|
|
# Write data rows
|
|
for group in pairs:
|
|
row = []
|
|
for weight, units, timestamp in group:
|
|
row.extend([weight, units, timestamp])
|
|
# Pad with empty strings if this group has fewer duplicates than max
|
|
while len(row) < max_dups * 3:
|
|
row.append('')
|
|
writer.writerow(row)
|
|
|
|
def write_csv2(pairs, filename):
|
|
"""Write joined CSV with duplicates pushed to the right"""
|
|
with open(filename, 'w', newline='') as f:
|
|
writer = csv.writer(f)
|
|
|
|
# Find max number of duplicates to determine column count
|
|
max_dups = max(len(group) for group in pairs) if pairs else 1
|
|
|
|
# Create headers for gross and tare, repeated for duplicates
|
|
headers = []
|
|
for i in range(max_dups):
|
|
suffix = f"_{i+1}" if i > 0 else ""
|
|
headers.extend([f'GROSS_WT{suffix}', f'GROSS_UNITS{suffix}', f'GROSS_T{suffix}'])
|
|
for i in range(max_dups):
|
|
suffix = f"_{i+1}" if i > 0 else ""
|
|
headers.extend([f'TARE_WT{suffix}', f'TARE_UNITS{suffix}', f'TARE_T{suffix}'])
|
|
# Add NET columns at the end
|
|
for i in range(max_dups):
|
|
suffix = f"_{i+1}" if i > 0 else ""
|
|
headers.extend([f'NET_WT{suffix}', f'NET_UNITS{suffix}'])
|
|
|
|
writer.writerow(headers)
|
|
|
|
# Process pairs (gross/tare)
|
|
for j in range(0, len(pairs), 2):
|
|
row = []
|
|
|
|
# Write all GROSS entries
|
|
if j < len(pairs):
|
|
gross_group = pairs[j]
|
|
for weight, units, timestamp in gross_group:
|
|
row.extend([weight, units, timestamp])
|
|
# Pad gross to max_dups
|
|
while len(row) < max_dups * 3:
|
|
row.append('')
|
|
|
|
# Write all TARE entries
|
|
if j + 1 < len(pairs):
|
|
tare_group = pairs[j + 1]
|
|
for weight, units, timestamp in tare_group:
|
|
row.extend([weight, units, timestamp])
|
|
# Pad tare to max_dups
|
|
while len(row) < max_dups * 6:
|
|
row.append('')
|
|
|
|
# Calculate NET for each duplicate pair
|
|
gross_group = pairs[j]
|
|
for k in range(max_dups):
|
|
if k < len(gross_group) and k < len(tare_group):
|
|
gross_weight = gross_group[k][0]
|
|
gross_units = gross_group[k][1]
|
|
tare_weight = tare_group[k][0]
|
|
tare_units = tare_group[k][1]
|
|
|
|
if gross_units == tare_units:
|
|
net = gross_weight - tare_weight
|
|
net_units = tare_units
|
|
else:
|
|
net = 'UNIT MISMATCH'
|
|
net_units = 'MISMATCH'
|
|
row.extend([net, net_units])
|
|
else:
|
|
row.extend(['', ''])
|
|
else:
|
|
# Odd number of items, pad tare and net
|
|
while len(row) < max_dups * 6:
|
|
row.append('')
|
|
for k in range(max_dups):
|
|
row.extend(['', ''])
|
|
|
|
writer.writerow(row)
|
|
|
|
def write_xlsx(pairs, output_filename, template_path='template.xlsx'):
|
|
"""Write sequential data to XLSX file using template with duplicates pushed to the right"""
|
|
if not XLSX_AVAILABLE:
|
|
raise ImportError("openpyxl is required for XLSX export. Install with: pip install openpyxl")
|
|
|
|
if not os.path.exists(template_path):
|
|
raise FileNotFoundError(f"Template file not found: {template_path}")
|
|
|
|
# Load the template workbook (data_only=False preserves formulas)
|
|
wb = load_workbook(template_path, data_only=False)
|
|
|
|
# Check if SEQUENTIAL sheet exists
|
|
if 'SEQUENTIAL' not in wb.sheetnames:
|
|
raise ValueError("Template does not contain a 'SEQUENTIAL' sheet")
|
|
|
|
# Get the SEQUENTIAL sheet
|
|
ws = wb['SEQUENTIAL']
|
|
|
|
# Clear existing data (starting from row 2, assuming row 1 has headers)
|
|
# First, delete all rows below the header
|
|
if ws.max_row > 1:
|
|
ws.delete_rows(2, ws.max_row - 1)
|
|
|
|
# Find max number of duplicates
|
|
max_dups = max(len(group) for group in pairs) if pairs else 1
|
|
|
|
# Update headers if needed (row 1)
|
|
col = 1
|
|
for i in range(max_dups):
|
|
suffix = f"_{i+1}" if i > 0 else ""
|
|
ws.cell(row=1, column=col, value=f'WEIGHT{suffix}')
|
|
ws.cell(row=1, column=col+1, value=f'UNITS{suffix}')
|
|
ws.cell(row=1, column=col+2, value=f'TIME{suffix}')
|
|
col += 3
|
|
|
|
# Write data starting from row 2
|
|
for row_idx, group in enumerate(pairs, start=2):
|
|
col = 1
|
|
for weight, units, timestamp in group:
|
|
ws.cell(row=row_idx, column=col, value=weight)
|
|
ws.cell(row=row_idx, column=col+1, value=units)
|
|
ws.cell(row=row_idx, column=col+2, value=timestamp)
|
|
col += 3
|
|
|
|
# Set COMBINED as the active sheet (default sheet when opened)
|
|
if 'COMBINED' in wb.sheetnames:
|
|
wb.active = wb.sheetnames.index('COMBINED')
|
|
|
|
# Save to new filename (never overwrite template)
|
|
wb.save(output_filename)
|
|
|
|
def show_files_in_explorer(files):
|
|
"""Open file explorer and highlight the generated files"""
|
|
if not files:
|
|
return
|
|
|
|
# Get the directory of the first file
|
|
directory = os.path.dirname(os.path.abspath(files[0]))
|
|
|
|
system = platform.system()
|
|
|
|
if system == 'Windows':
|
|
# On Windows, use explorer with /select to highlight the file
|
|
# If multiple files, just select the first one
|
|
subprocess.run(['explorer', '/select,', os.path.abspath(files[0])])
|
|
elif system == 'Darwin': # macOS
|
|
# On Mac, use 'open' with -R to reveal in Finder
|
|
subprocess.run(['open', '-R', os.path.abspath(files[0])])
|
|
else: # Linux and others
|
|
# On Linux, just open the directory
|
|
subprocess.run(['xdg-open', directory])
|
|
|
|
def run_update_script():
|
|
try:
|
|
# Run update.sh script
|
|
result = subprocess.run(['sh', 'C:\\Program Files\\rslogger-merger\\update.sh'], capture_output=True, text=True, cwd=os.getcwd())
|
|
if result.returncode == 0:
|
|
messagebox.showinfo("Success", f"update.sh executed successfully!\n\nOutput:\n{result.stdout}")
|
|
else:
|
|
messagebox.showerror("Error", f"update.sh failed with return code {result.returncode}\n\nError:\n{result.stderr}")
|
|
except FileNotFoundError:
|
|
messagebox.showerror("Error", "update.sh not found in current directory")
|
|
except Exception as e:
|
|
messagebox.showerror("Error", f"Failed to run update.sh: {str(e)}")
|
|
|
|
def convert_local_file():
|
|
"""Convert a local file selected by the user"""
|
|
input_file = filedialog.askopenfilename(
|
|
title="Select Log File to Convert",
|
|
filetypes=[("Text files", "*.txt"), ("All files", "*.*")]
|
|
)
|
|
|
|
if not input_file:
|
|
return # User cancelled
|
|
|
|
generate_csv = csv_var.get()
|
|
generate_xlsx = xlsx_var.get()
|
|
|
|
if not generate_csv and not generate_xlsx:
|
|
messagebox.showerror("Error", "Please select at least one output format (CSV or XLSX).")
|
|
return
|
|
|
|
# Auto-generate output filenames
|
|
base = os.path.splitext(input_file)[0]
|
|
output_file1 = f"{base}.sequential.csv"
|
|
output_file2 = f"{base}.joined.csv"
|
|
output_xlsx = f"{base}.xlsx"
|
|
|
|
generated_files = []
|
|
error_messages = []
|
|
|
|
try:
|
|
with open(input_file, 'r') as f:
|
|
log_text = f.read()
|
|
pairs = parse_logs(log_text)
|
|
|
|
# Remove duplicates
|
|
pairs = remove_duplicates(pairs)
|
|
|
|
# Generate CSV files if checked
|
|
if generate_csv:
|
|
try:
|
|
write_csv1(pairs, output_file1)
|
|
generated_files.append(output_file1)
|
|
write_csv2(pairs, output_file2)
|
|
generated_files.append(output_file2)
|
|
except Exception as e:
|
|
error_messages.append(f"CSV export failed: {str(e)}")
|
|
|
|
# Generate XLSX file if checked
|
|
if generate_xlsx:
|
|
try:
|
|
write_xlsx(pairs, output_xlsx)
|
|
generated_files.append(output_xlsx)
|
|
except ImportError as e:
|
|
error_messages.append(f"XLSX export skipped: {str(e)}")
|
|
except Exception as e:
|
|
error_messages.append(f"XLSX export failed: {str(e)}")
|
|
|
|
# Build success message
|
|
if generated_files:
|
|
files_list = "\n".join(generated_files)
|
|
error_info = ""
|
|
if error_messages:
|
|
error_info = "\n\nWarnings:\n" + "\n".join(error_messages)
|
|
|
|
# Custom dialog with "Show the files" button
|
|
response = messagebox.askquestion("Success",
|
|
f"Files generated:\n{files_list}{error_info}\n\nShow the files?",
|
|
icon='info')
|
|
|
|
if response == 'yes':
|
|
show_files_in_explorer(generated_files)
|
|
else:
|
|
messagebox.showerror("Error", "No files were generated.\n\n" + "\n".join(error_messages))
|
|
|
|
except Exception as e:
|
|
messagebox.showerror("Error", f"An error occurred: {str(e)}")
|
|
|
|
def download_and_convert():
|
|
"""Download from RS Logger and convert"""
|
|
# Ask for destination file
|
|
output_file = filedialog.asksaveasfilename(
|
|
title="Save Downloaded Log As",
|
|
defaultextension=".txt",
|
|
filetypes=[("Text files", "*.txt"), ("All files", "*.*")]
|
|
)
|
|
|
|
if not output_file:
|
|
return # User cancelled
|
|
|
|
generate_csv = csv_var.get()
|
|
generate_xlsx = xlsx_var.get()
|
|
|
|
if not generate_csv and not generate_xlsx:
|
|
messagebox.showerror("Error", "Please select at least one output format (CSV or XLSX).")
|
|
return
|
|
|
|
try:
|
|
# Download from RS Logger
|
|
messagebox.showinfo("Downloading", "Connecting to RS Logger at http://rslogger\nThis may take a moment...")
|
|
|
|
downloader = RSLoggerDownloader("http://rslogger")
|
|
log_text = downloader.download_via_pages(
|
|
output_file=os.path.splitext(output_file)[0],
|
|
decode_controls=True
|
|
)
|
|
|
|
if not log_text:
|
|
messagebox.showerror("Error", "No data was downloaded from RS Logger")
|
|
return
|
|
|
|
# Now convert the downloaded data
|
|
base = os.path.splitext(output_file)[0]
|
|
output_file1 = f"{base}.sequential.csv"
|
|
output_file2 = f"{base}.joined.csv"
|
|
output_xlsx = f"{base}.xlsx"
|
|
|
|
generated_files = [f"{base}.txt"] # The downloaded txt file
|
|
error_messages = []
|
|
|
|
pairs = parse_logs(log_text)
|
|
|
|
# Remove duplicates
|
|
pairs = remove_duplicates(pairs)
|
|
|
|
# Generate CSV files if checked
|
|
if generate_csv:
|
|
try:
|
|
write_csv1(pairs, output_file1)
|
|
generated_files.append(output_file1)
|
|
write_csv2(pairs, output_file2)
|
|
generated_files.append(output_file2)
|
|
except Exception as e:
|
|
error_messages.append(f"CSV export failed: {str(e)}")
|
|
|
|
# Generate XLSX file if checked
|
|
if generate_xlsx:
|
|
try:
|
|
write_xlsx(pairs, output_xlsx)
|
|
generated_files.append(output_xlsx)
|
|
except ImportError as e:
|
|
error_messages.append(f"XLSX export skipped: {str(e)}")
|
|
except Exception as e:
|
|
error_messages.append(f"XLSX export failed: {str(e)}")
|
|
|
|
# Build success message
|
|
if generated_files:
|
|
files_list = "\n".join(generated_files)
|
|
error_info = ""
|
|
if error_messages:
|
|
error_info = "\n\nWarnings:\n" + "\n".join(error_messages)
|
|
|
|
# Custom dialog with "Show the files" button
|
|
response = messagebox.askquestion("Success",
|
|
f"Downloaded and converted!\n\nFiles generated:\n{files_list}{error_info}\n\nShow the files?",
|
|
icon='info')
|
|
|
|
if response == 'yes':
|
|
show_files_in_explorer(generated_files)
|
|
else:
|
|
messagebox.showerror("Error", "No files were generated.\n\n" + "\n".join(error_messages))
|
|
|
|
except Exception as e:
|
|
messagebox.showerror("Error", f"Download failed: {str(e)}")
|
|
|
|
# GUI Setup
|
|
root = tk.Tk()
|
|
root.title("RS Logger Converter")
|
|
root.geometry("500x220")
|
|
|
|
csv_var = tk.BooleanVar(value=False) # CSV unchecked by default
|
|
xlsx_var = tk.BooleanVar(value=True) # XLSX checked by default
|
|
|
|
# Title label
|
|
title_label = tk.Label(root, text="RS Logger Data Converter", font=("Arial", 16, "bold"))
|
|
title_label.pack(pady=10)
|
|
|
|
# Output format checkboxes
|
|
format_frame = tk.Frame(root)
|
|
format_frame.pack(pady=5, padx=10, fill='x')
|
|
tk.Label(format_frame, text="Output Formats:").pack(side='left')
|
|
tk.Checkbutton(format_frame, text="CSV", variable=csv_var).pack(side='left', padx=10)
|
|
tk.Checkbutton(format_frame, text="XLSX", variable=xlsx_var).pack(side='left', padx=10)
|
|
|
|
# Convert local file button
|
|
tk.Button(root, text="CONVERT LOCAL FILE", command=convert_local_file,
|
|
font=("Arial", 12), relief="raised", bg="#2196F3", fg="white").pack(pady=5, padx=10, fill='x')
|
|
|
|
# Download and convert button
|
|
tk.Button(root, text="DOWNLOAD FROM RS LOGGER", command=download_and_convert,
|
|
font=("Arial", 12), relief="raised", bg="#4CAF50", fg="white").pack(pady=5, padx=10, fill='x')
|
|
|
|
# Update script button
|
|
tk.Button(root, text="Update this Application", command=run_update_script,
|
|
font=("Arial", 10), relief="raised").pack(pady=5, padx=10, fill='x')
|
|
|
|
root.mainloop() |