import re import csv import tkinter as tk from tkinter import filedialog, messagebox import subprocess import os from datetime import datetime import platform import requests from typing import List, Union try: from openpyxl import load_workbook XLSX_AVAILABLE = True except ImportError: XLSX_AVAILABLE = False # RSLogger Downloader Classes class RSLoggerDownloader: def __init__(self, base_url: str): """ Initialize downloader for RS Logger device Args: base_url: Base URL like "http://rslogger" or "http://192.168.1.100" """ self.base_url = base_url.rstrip('/') self.session = requests.Session() def get_config(self) -> dict: """Get logger configuration parameters""" url = f"{self.base_url}/logc.xml" response = self.session.get(url, timeout=5) response.raise_for_status() parts = response.text.split('#') config = { 'date_from': parts[0], 'date_to': parts[1], 'timestamp': int(parts[2]), 'file_mode': int(parts[3]), 'data_format': int(parts[4]), 'timestamp_char': chr(int(parts[5])), 'time_format': int(parts[6]) } return config def _get_progress(self, data: bytearray) -> int: """Extract progress percentage from end of data""" length = len(data) while length >= 2 and data[length-1] == 10 and data[length-2] == 13: length -= 2 if length > 0: return data[length - 1] return 0 def _clear_endofline(self, data: bytearray) -> int: """Remove trailing carriage returns and get data length""" length = len(data) while length >= 2 and data[length-1] == 10 and data[length-2] == 13: length -= 2 return length def decode_control_characters(self, data: bytes) -> str: """ Decode the <13>, <10>, etc. control character sequences back to actual characters. Args: data: Raw bytes with sequences Returns: String with control characters properly decoded """ # Convert bytes to string text = data.decode('ascii', errors='replace') # Replace patterns with actual characters def replace_control(match): code = int(match.group(1)) return chr(code) # Match pattern and replace with the actual character decoded = re.sub(r'<(\d+)>', replace_control, text) return decoded def parse_log_data(self, chunks: List[bytearray], config: dict, channel: int = 3) -> bytearray: """ Parse raw log data chunks into readable format Args: chunks: List of raw data chunks config: Configuration dictionary from get_config() channel: Channel to parse (1=A, 2=B, 3=both) Returns: Parsed data as bytearray """ result = bytearray() # State variables for parsing state = { 'last_ch': 0, 'last_char': 0, 'day_stamp': True, 'last_ta': 0, 'last_tb': 0, 'h': 0, 'd': 0, 'm': 0, 'y': 0 } timestamp = config['timestamp'] data_format = config['data_format'] ts_char = config['timestamp_char'] time_format = config['time_format'] # Determine time interval t_interval = 0 if timestamp >= 50000: t_interval = timestamp - 50000 elif timestamp > 1000: t_interval = timestamp - 1000 elif timestamp > 2: timestamp = 2 for chunk_idx, chunk in enumerate(chunks): length = self._clear_endofline(chunk) if length <= 0: continue # Remove progress byte at end length -= 1 if length <= 0: continue parsed = self._parse_chunk( chunk, length, channel, timestamp, t_interval, data_format, ts_char, time_format, state ) result.extend(parsed) return result def _parse_chunk(self, data: bytearray, length: int, channel: int, timestamp: int, t_interval: int, data_format: int, ts_char: str, time_format: int, state: dict) -> bytearray: """Parse a single chunk of data""" result = bytearray() index = 0 while index < length: if (length - index) < 4: break byte0 = data[index] byte1 = data[index + 1] byte2 = data[index + 2] byte3 = data[index + 3] # Check if this is a timestamp marker (high bit set) if byte0 & 0x80: # Date/time record state['h'] = byte0 & 0x7F dtmp = byte1 mtmp = byte2 ytmp = byte3 + 2000 if state['d'] != dtmp or state['m'] != mtmp or state['y'] != ytmp: state['d'] = dtmp state['m'] = mtmp state['y'] = ytmp if timestamp != 0: if len(result) > 0: result.extend(b'\r\n') date_str = f"[{ytmp}-{mtmp:02d}-{dtmp:02d}]" result.extend(date_str.encode('ascii')) state['last_ta'] = 0 state['last_tb'] = 0 state['last_ch'] = 0 state['day_stamp'] = True else: # Data record ch = 'B' if (byte0 & 0x40) else 'A' ch_mask = 2 if ch == 'B' else 1 minute = byte0 & 0x3F data_byte = byte1 tu16 = byte2 | (byte3 << 8) ms = tu16 & 0x3FF s = (byte3 >> 2) & 0x3F if (channel & ch_mask): # Format time string if time_format == 0: h_str = f"{state['h']:2d}" else: if state['h'] == 0: h_str = "A12" elif state['h'] < 12: h_str = f"A{state['h']:2d}" elif state['h'] == 12: h_str = "P12" else: h_str = f"P{state['h']-12:2d}" time_str = f"{h_str}:{minute:02d}:{s:02d}.{ms:03d}" if channel == 3: time_str += ch time_str += ts_char # Decide if we need to add timestamp based on config add_time = self._should_add_timestamp( timestamp, t_interval, ch_mask, s, minute, state['h'], state, channel ) if add_time: result.extend(b'\r\n') result.extend(time_str.encode('ascii')) # Add the data byte if data_format == 1: # Hex format result.extend(ts_char.encode('ascii')) result.extend(f"{data_byte:x}".encode('ascii')) else: # ASCII format if data_byte < 32: result.extend(f"<{data_byte}>".encode('ascii')) else: result.append(data_byte) state['day_stamp'] = False state['last_ch'] = ch_mask state['last_char'] = data_byte index += 4 return result def _should_add_timestamp(self, timestamp: int, t_interval: int, ch_mask: int, s: int, minute: int, h: int, state: dict, channel: int) -> bool: """Determine if timestamp should be added based on configuration""" if timestamp == 1: return True if timestamp >= 50000: if timestamp < 50256: if t_interval == state['last_char']: return True elif timestamp == 2 or t_interval > 0: t = s + 60 * minute + 3600 * h should_add = False if state['last_ch'] != ch_mask: should_add = True if t_interval > 0 and channel != 3: should_add = False if t_interval > 0: t_last = state['last_ta'] if ch_mask == 1 else state['last_tb'] if (t - t_last) > t_interval: should_add = True if should_add: if channel == 3: state['last_ta'] = t state['last_tb'] = t elif ch_mask == 1: state['last_ta'] = t else: state['last_tb'] = t return True return False def download_via_pages(self, output_file: str = None, decode_controls: bool = True) -> Union[bytes, str]: """ Download data by fetching pages (like the preview does). This bypasses the date range issue entirely. Args: output_file: Optional filename to save to decode_controls: If True, decode <13>, <10> etc. to actual control characters Returns: Parsed log data as bytes or string (if decoded) """ print("Getting configuration...") config = self.get_config() print(f"Config: {config}") all_data = bytearray() # Get first page print("\nFetching first page...") url = f"{self.base_url}/page.xml?Page=0" response = self.session.get(url, timeout=10) response.raise_for_status() first_page = bytearray(response.content) print(f"First page: {len(first_page)} bytes") if len(first_page) > 4: all_data.extend(first_page) # Get last page to see total size print("Fetching last page...") url = f"{self.base_url}/page.xml?Page=1" response = self.session.get(url, timeout=10) response.raise_for_status() last_page = bytearray(response.content) print(f"Last page: {len(last_page)} bytes") # Now keep getting next pages until we get back to the last page print("\nFetching all pages...") current_page = first_page page_count = 1 max_pages = 1000 # Safety limit while page_count < max_pages: url = f"{self.base_url}/page.xml?Page=3" # 3 = next page response = self.session.get(url, timeout=10) response.raise_for_status() next_page = bytearray(response.content) # Check if we've reached the end (data repeats) if next_page == current_page or next_page == last_page: print(f"Reached end after {page_count} pages") break if len(next_page) > 4: all_data.extend(next_page) page_count += 1 if page_count % 10 == 0: print(f"Fetched {page_count} pages, {len(all_data)} bytes total...") current_page = next_page print(f"\nTotal raw data collected: {len(all_data)} bytes from {page_count} pages") # Parse the data print("Parsing data...") file_mode = config['file_mode'] # Treat all_data as a single chunk chunks = [all_data] if file_mode == 2: # Separate files for channel A and B data_a = self.parse_log_data(chunks, config, channel=1) data_b = self.parse_log_data(chunks, config, channel=2) # Decode control characters if requested if decode_controls: data_a_decoded = self.decode_control_characters(data_a) data_b_decoded = self.decode_control_characters(data_b) if output_file: with open(f"{output_file}_A.txt", 'w', encoding='utf-8') as f: f.write(data_a_decoded) with open(f"{output_file}_B.txt", 'w', encoding='utf-8') as f: f.write(data_b_decoded) print(f"\nSaved decoded data to {output_file}_A.txt and {output_file}_B.txt") return data_a_decoded, data_b_decoded else: if output_file: with open(f"{output_file}_A.txt", 'wb') as f: f.write(data_a) with open(f"{output_file}_B.txt", 'wb') as f: f.write(data_b) print(f"\nSaved raw data to {output_file}_A.txt and {output_file}_B.txt") return data_a, data_b else: # Combined file data = self.parse_log_data(chunks, config, channel=3) # Decode control characters if requested if decode_controls: data_decoded = self.decode_control_characters(data) if output_file: with open(f"{output_file}.txt", 'w', encoding='utf-8') as f: f.write(data_decoded) print(f"\nSaved {len(data_decoded)} characters to {output_file}.txt") return data_decoded else: if output_file: with open(f"{output_file}.txt", 'wb') as f: f.write(data) print(f"\nSaved {len(data)} bytes to {output_file}.txt") return data # Log parsing and conversion functions def parse_logs(log_text): lines = log_text.splitlines() if lines and '=~' in lines[0]: lines = lines[1:] pairs = [] i = 0 while i < len(lines): line = lines[i].strip() if not line: i += 1 continue # Updated regex to capture any units (lb, kg, g, etc.) match = re.match(r'^GROSS\s+(\d+)\s*([a-zA-Z]+)$', line) if match: weight = int(match.group(1)) units = match.group(2) i += 2 # Skip to timestamp line if i < len(lines): ts_line = lines[i].strip() if re.match(r'^\d{2}:\d{2} (AM|PM) \d{2}/\d{2}/\d{2}$', ts_line): # Convert timestamp to Excel-friendly 12-hour format try: dt = datetime.strptime(ts_line, '%I:%M %p %m/%d/%y') excel_ts = dt.strftime('%m/%d/%Y %I:%M %p') pairs.append((weight, units, excel_ts, dt)) print((weight, units, excel_ts)) except ValueError: i += 1 continue i += 1 # Move to next weight entry else: i += 1 else: i += 1 return pairs def remove_duplicates(pairs): """Remove duplicate entries based on weight (within 20) and timestamp (within 1 minute) Returns list of tuples where each tuple contains all the duplicate entries""" if not pairs: return [] # Group entries that are duplicates deduplicated = [] i = 0 while i < len(pairs): weight, units, excel_ts, dt = pairs[i] duplicate_group = [(weight, units, excel_ts)] # Look ahead for duplicates j = i + 1 while j < len(pairs): next_weight, next_units, next_excel_ts, next_dt = pairs[j] # Check if it's a duplicate: # - Weight within 20 # - Timestamp within 1 minute weight_diff = abs(weight - next_weight) time_diff = abs((dt - next_dt).total_seconds()) if weight_diff <= 20 and time_diff <= 60: # It's a duplicate, add to group duplicate_group.append((next_weight, next_units, next_excel_ts)) j += 1 else: break # Add the group as a tuple deduplicated.append(tuple(duplicate_group)) i = j if j > i + 1 else i + 1 return deduplicated def write_csv1(pairs, filename): """Write sequential CSV with duplicates pushed to the right""" with open(filename, 'w', newline='') as f: writer = csv.writer(f) # Find max number of duplicates to determine column count max_dups = max(len(group) for group in pairs) # Create headers: WEIGHT, UNITS, TIME repeated for each duplicate headers = [] for i in range(max_dups): suffix = f"_{i+1}" if i > 0 else "" headers.extend([f'WEIGHT{suffix}', f'UNITS{suffix}', f'TIME{suffix}']) writer.writerow(headers) # Write data rows for group in pairs: row = [] for weight, units, timestamp in group: row.extend([weight, units, timestamp]) # Pad with empty strings if this group has fewer duplicates than max while len(row) < max_dups * 3: row.append('') writer.writerow(row) def write_csv2(pairs, filename): """Write joined CSV with duplicates pushed to the right""" with open(filename, 'w', newline='') as f: writer = csv.writer(f) # Find max number of duplicates to determine column count max_dups = max(len(group) for group in pairs) if pairs else 1 # Create headers for gross and tare, repeated for duplicates headers = [] for i in range(max_dups): suffix = f"_{i+1}" if i > 0 else "" headers.extend([f'GROSS_WT{suffix}', f'GROSS_UNITS{suffix}', f'GROSS_T{suffix}']) for i in range(max_dups): suffix = f"_{i+1}" if i > 0 else "" headers.extend([f'TARE_WT{suffix}', f'TARE_UNITS{suffix}', f'TARE_T{suffix}']) # Add NET columns at the end for i in range(max_dups): suffix = f"_{i+1}" if i > 0 else "" headers.extend([f'NET_WT{suffix}', f'NET_UNITS{suffix}']) writer.writerow(headers) # Process pairs (gross/tare) for j in range(0, len(pairs), 2): row = [] # Write all GROSS entries if j < len(pairs): gross_group = pairs[j] for weight, units, timestamp in gross_group: row.extend([weight, units, timestamp]) # Pad gross to max_dups while len(row) < max_dups * 3: row.append('') # Write all TARE entries if j + 1 < len(pairs): tare_group = pairs[j + 1] for weight, units, timestamp in tare_group: row.extend([weight, units, timestamp]) # Pad tare to max_dups while len(row) < max_dups * 6: row.append('') # Calculate NET for each duplicate pair gross_group = pairs[j] for k in range(max_dups): if k < len(gross_group) and k < len(tare_group): gross_weight = gross_group[k][0] gross_units = gross_group[k][1] tare_weight = tare_group[k][0] tare_units = tare_group[k][1] if gross_units == tare_units: net = gross_weight - tare_weight net_units = tare_units else: net = 'UNIT MISMATCH' net_units = 'MISMATCH' row.extend([net, net_units]) else: row.extend(['', '']) else: # Odd number of items, pad tare and net while len(row) < max_dups * 6: row.append('') for k in range(max_dups): row.extend(['', '']) writer.writerow(row) def write_xlsx(pairs, output_filename, template_path='template.xlsx'): """Write sequential data to XLSX file using template with duplicates pushed to the right""" if not XLSX_AVAILABLE: raise ImportError("openpyxl is required for XLSX export. Install with: pip install openpyxl") if not os.path.exists(template_path): raise FileNotFoundError(f"Template file not found: {template_path}") # Load the template workbook (data_only=False preserves formulas) wb = load_workbook(template_path, data_only=False) # Check if SEQUENTIAL sheet exists if 'SEQUENTIAL' not in wb.sheetnames: raise ValueError("Template does not contain a 'SEQUENTIAL' sheet") # Get the SEQUENTIAL sheet ws = wb['SEQUENTIAL'] # Clear existing data (starting from row 2, assuming row 1 has headers) # First, delete all rows below the header if ws.max_row > 1: ws.delete_rows(2, ws.max_row - 1) # Find max number of duplicates max_dups = max(len(group) for group in pairs) if pairs else 1 # Update headers if needed (row 1) col = 1 for i in range(max_dups): suffix = f"_{i+1}" if i > 0 else "" ws.cell(row=1, column=col, value=f'WEIGHT{suffix}') ws.cell(row=1, column=col+1, value=f'UNITS{suffix}') ws.cell(row=1, column=col+2, value=f'TIME{suffix}') col += 3 # Write data starting from row 2 for row_idx, group in enumerate(pairs, start=2): col = 1 for weight, units, timestamp in group: ws.cell(row=row_idx, column=col, value=weight) ws.cell(row=row_idx, column=col+1, value=units) ws.cell(row=row_idx, column=col+2, value=timestamp) col += 3 # Set COMBINED as the active sheet (default sheet when opened) if 'COMBINED' in wb.sheetnames: wb.active = wb.sheetnames.index('COMBINED') # Save to new filename (never overwrite template) wb.save(output_filename) def show_files_in_explorer(files): """Open file explorer and highlight the generated files""" if not files: return # Get the directory of the first file directory = os.path.dirname(os.path.abspath(files[0])) system = platform.system() if system == 'Windows': # On Windows, use explorer with /select to highlight the file # If multiple files, just select the first one subprocess.run(['explorer', '/select,', os.path.abspath(files[0])]) elif system == 'Darwin': # macOS # On Mac, use 'open' with -R to reveal in Finder subprocess.run(['open', '-R', os.path.abspath(files[0])]) else: # Linux and others # On Linux, just open the directory subprocess.run(['xdg-open', directory]) def run_update_script(): try: # Run update.sh script result = subprocess.run(['sh', 'C:\\Program Files\\rslogger-merger\\update.sh'], capture_output=True, text=True, cwd=os.getcwd()) if result.returncode == 0: messagebox.showinfo("Success", f"update.sh executed successfully!\n\nOutput:\n{result.stdout}") else: messagebox.showerror("Error", f"update.sh failed with return code {result.returncode}\n\nError:\n{result.stderr}") except FileNotFoundError: messagebox.showerror("Error", "update.sh not found in current directory") except Exception as e: messagebox.showerror("Error", f"Failed to run update.sh: {str(e)}") def convert_local_file(): """Convert a local file selected by the user""" input_file = filedialog.askopenfilename( title="Select Log File to Convert", filetypes=[("Text files", "*.txt"), ("All files", "*.*")] ) if not input_file: return # User cancelled generate_csv = csv_var.get() generate_xlsx = xlsx_var.get() if not generate_csv and not generate_xlsx: messagebox.showerror("Error", "Please select at least one output format (CSV or XLSX).") return # Auto-generate output filenames base = os.path.splitext(input_file)[0] output_file1 = f"{base}.sequential.csv" output_file2 = f"{base}.joined.csv" output_xlsx = f"{base}.xlsx" generated_files = [] error_messages = [] try: with open(input_file, 'r') as f: log_text = f.read() pairs = parse_logs(log_text) # Remove duplicates pairs = remove_duplicates(pairs) # Generate CSV files if checked if generate_csv: try: write_csv1(pairs, output_file1) generated_files.append(output_file1) write_csv2(pairs, output_file2) generated_files.append(output_file2) except Exception as e: error_messages.append(f"CSV export failed: {str(e)}") # Generate XLSX file if checked if generate_xlsx: try: write_xlsx(pairs, output_xlsx) generated_files.append(output_xlsx) except ImportError as e: error_messages.append(f"XLSX export skipped: {str(e)}") except Exception as e: error_messages.append(f"XLSX export failed: {str(e)}") # Build success message if generated_files: files_list = "\n".join(generated_files) error_info = "" if error_messages: error_info = "\n\nWarnings:\n" + "\n".join(error_messages) # Custom dialog with "Show the files" button response = messagebox.askquestion("Success", f"Files generated:\n{files_list}{error_info}\n\nShow the files?", icon='info') if response == 'yes': show_files_in_explorer(generated_files) else: messagebox.showerror("Error", "No files were generated.\n\n" + "\n".join(error_messages)) except Exception as e: messagebox.showerror("Error", f"An error occurred: {str(e)}") def download_and_convert(): """Download from RS Logger and convert""" # Ask for destination file output_file = filedialog.asksaveasfilename( title="Save Downloaded Log As", defaultextension=".txt", filetypes=[("Text files", "*.txt"), ("All files", "*.*")] ) if not output_file: return # User cancelled generate_csv = csv_var.get() generate_xlsx = xlsx_var.get() if not generate_csv and not generate_xlsx: messagebox.showerror("Error", "Please select at least one output format (CSV or XLSX).") return try: # Download from RS Logger messagebox.showinfo("Downloading", "Connecting to RS Logger at http://rslogger\nThis may take a moment...") downloader = RSLoggerDownloader("http://rslogger") log_text = downloader.download_via_pages( output_file=os.path.splitext(output_file)[0], decode_controls=True ) if not log_text: messagebox.showerror("Error", "No data was downloaded from RS Logger") return # Now convert the downloaded data base = os.path.splitext(output_file)[0] output_file1 = f"{base}.sequential.csv" output_file2 = f"{base}.joined.csv" output_xlsx = f"{base}.xlsx" generated_files = [f"{base}.txt"] # The downloaded txt file error_messages = [] pairs = parse_logs(log_text) # Remove duplicates pairs = remove_duplicates(pairs) # Generate CSV files if checked if generate_csv: try: write_csv1(pairs, output_file1) generated_files.append(output_file1) write_csv2(pairs, output_file2) generated_files.append(output_file2) except Exception as e: error_messages.append(f"CSV export failed: {str(e)}") # Generate XLSX file if checked if generate_xlsx: try: write_xlsx(pairs, output_xlsx) generated_files.append(output_xlsx) except ImportError as e: error_messages.append(f"XLSX export skipped: {str(e)}") except Exception as e: error_messages.append(f"XLSX export failed: {str(e)}") # Build success message if generated_files: files_list = "\n".join(generated_files) error_info = "" if error_messages: error_info = "\n\nWarnings:\n" + "\n".join(error_messages) # Custom dialog with "Show the files" button response = messagebox.askquestion("Success", f"Downloaded and converted!\n\nFiles generated:\n{files_list}{error_info}\n\nShow the files?", icon='info') if response == 'yes': show_files_in_explorer(generated_files) else: messagebox.showerror("Error", "No files were generated.\n\n" + "\n".join(error_messages)) except Exception as e: messagebox.showerror("Error", f"Download failed: {str(e)}") # GUI Setup root = tk.Tk() root.title("RS Logger Converter") root.geometry("500x220") csv_var = tk.BooleanVar(value=False) # CSV unchecked by default xlsx_var = tk.BooleanVar(value=True) # XLSX checked by default # Title label title_label = tk.Label(root, text="RS Logger Data Converter", font=("Arial", 16, "bold")) title_label.pack(pady=10) # Output format checkboxes format_frame = tk.Frame(root) format_frame.pack(pady=5, padx=10, fill='x') tk.Label(format_frame, text="Output Formats:").pack(side='left') tk.Checkbutton(format_frame, text="CSV", variable=csv_var).pack(side='left', padx=10) tk.Checkbutton(format_frame, text="XLSX", variable=xlsx_var).pack(side='left', padx=10) # Convert local file button tk.Button(root, text="CONVERT LOCAL FILE", command=convert_local_file, font=("Arial", 12), relief="raised", bg="#2196F3", fg="white").pack(pady=5, padx=10, fill='x') # Download and convert button tk.Button(root, text="DOWNLOAD FROM RS LOGGER", command=download_and_convert, font=("Arial", 12), relief="raised", bg="#4CAF50", fg="white").pack(pady=5, padx=10, fill='x') # Update script button tk.Button(root, text="Update this Application", command=run_update_script, font=("Arial", 10), relief="raised").pack(pady=5, padx=10, fill='x') root.mainloop()