import requests from typing import List, Optional, Union, Tuple import re class RSLoggerDownloader: def __init__(self, base_url: str): """ Initialize downloader for RS Logger device Args: base_url: Base URL like "http://rslogger" or "http://192.168.1.100" """ self.base_url = base_url.rstrip('/') self.session = requests.Session() def get_config(self) -> dict: """Get logger configuration parameters""" url = f"{self.base_url}/logc.xml" response = self.session.get(url, timeout=5) response.raise_for_status() parts = response.text.split('#') config = { 'date_from': parts[0], 'date_to': parts[1], 'timestamp': int(parts[2]), 'file_mode': int(parts[3]), 'data_format': int(parts[4]), 'timestamp_char': chr(int(parts[5])), 'time_format': int(parts[6]) } return config def _get_progress(self, data: bytearray) -> int: """Extract progress percentage from end of data""" length = len(data) while length >= 2 and data[length-1] == 10 and data[length-2] == 13: length -= 2 if length > 0: return data[length - 1] return 0 def _clear_endofline(self, data: bytearray) -> int: """Remove trailing carriage returns and get data length""" length = len(data) while length >= 2 and data[length-1] == 10 and data[length-2] == 13: length -= 2 return length def decode_control_characters(self, data: bytes) -> str: """ Decode the <13>, <10>, etc. control character sequences back to actual characters. Args: data: Raw bytes with sequences Returns: String with control characters properly decoded """ # Convert bytes to string text = data.decode('ascii', errors='replace') # Replace patterns with actual characters def replace_control(match): code = int(match.group(1)) return chr(code) # Match pattern and replace with the actual character decoded = re.sub(r'<(\d+)>', replace_control, text) return decoded def parse_log_data(self, chunks: List[bytearray], config: dict, channel: int = 3) -> bytearray: """ Parse raw log data chunks into readable format Args: chunks: List of raw data chunks config: Configuration dictionary from get_config() channel: Channel to parse (1=A, 2=B, 3=both) Returns: Parsed data as bytearray """ result = bytearray() # State variables for parsing state = { 'last_ch': 0, 'last_char': 0, 'day_stamp': True, 'last_ta': 0, 'last_tb': 0, 'h': 0, 'd': 0, 'm': 0, 'y': 0 } timestamp = config['timestamp'] data_format = config['data_format'] ts_char = config['timestamp_char'] time_format = config['time_format'] # Determine time interval t_interval = 0 if timestamp >= 50000: t_interval = timestamp - 50000 elif timestamp > 1000: t_interval = timestamp - 1000 elif timestamp > 2: timestamp = 2 for chunk_idx, chunk in enumerate(chunks): length = self._clear_endofline(chunk) if length <= 0: continue # Remove progress byte at end length -= 1 if length <= 0: continue parsed = self._parse_chunk( chunk, length, channel, timestamp, t_interval, data_format, ts_char, time_format, state ) result.extend(parsed) return result def _parse_chunk(self, data: bytearray, length: int, channel: int, timestamp: int, t_interval: int, data_format: int, ts_char: str, time_format: int, state: dict) -> bytearray: """Parse a single chunk of data""" result = bytearray() index = 0 while index < length: if (length - index) < 4: break byte0 = data[index] byte1 = data[index + 1] byte2 = data[index + 2] byte3 = data[index + 3] # Check if this is a timestamp marker (high bit set) if byte0 & 0x80: # Date/time record state['h'] = byte0 & 0x7F dtmp = byte1 mtmp = byte2 ytmp = byte3 + 2000 if state['d'] != dtmp or state['m'] != mtmp or state['y'] != ytmp: state['d'] = dtmp state['m'] = mtmp state['y'] = ytmp if timestamp != 0: if len(result) > 0: result.extend(b'\r\n') date_str = f"[{ytmp}-{mtmp:02d}-{dtmp:02d}]" result.extend(date_str.encode('ascii')) state['last_ta'] = 0 state['last_tb'] = 0 state['last_ch'] = 0 state['day_stamp'] = True else: # Data record ch = 'B' if (byte0 & 0x40) else 'A' ch_mask = 2 if ch == 'B' else 1 minute = byte0 & 0x3F data_byte = byte1 tu16 = byte2 | (byte3 << 8) ms = tu16 & 0x3FF s = (byte3 >> 2) & 0x3F if (channel & ch_mask): # Format time string if time_format == 0: h_str = f"{state['h']:2d}" else: if state['h'] == 0: h_str = "A12" elif state['h'] < 12: h_str = f"A{state['h']:2d}" elif state['h'] == 12: h_str = "P12" else: h_str = f"P{state['h']-12:2d}" time_str = f"{h_str}:{minute:02d}:{s:02d}.{ms:03d}" if channel == 3: time_str += ch time_str += ts_char # Decide if we need to add timestamp based on config add_time = self._should_add_timestamp( timestamp, t_interval, ch_mask, s, minute, state['h'], state, channel ) if add_time: result.extend(b'\r\n') result.extend(time_str.encode('ascii')) # Add the data byte if data_format == 1: # Hex format result.extend(ts_char.encode('ascii')) result.extend(f"{data_byte:x}".encode('ascii')) else: # ASCII format if data_byte < 32: result.extend(f"<{data_byte}>".encode('ascii')) else: result.append(data_byte) state['day_stamp'] = False state['last_ch'] = ch_mask state['last_char'] = data_byte index += 4 return result def _should_add_timestamp(self, timestamp: int, t_interval: int, ch_mask: int, s: int, minute: int, h: int, state: dict, channel: int) -> bool: """Determine if timestamp should be added based on configuration""" if timestamp == 1: return True if timestamp >= 50000: if timestamp < 50256: if t_interval == state['last_char']: return True elif timestamp == 2 or t_interval > 0: t = s + 60 * minute + 3600 * h should_add = False if state['last_ch'] != ch_mask: should_add = True if t_interval > 0 and channel != 3: should_add = False if t_interval > 0: t_last = state['last_ta'] if ch_mask == 1 else state['last_tb'] if (t - t_last) > t_interval: should_add = True if should_add: if channel == 3: state['last_ta'] = t state['last_tb'] = t elif ch_mask == 1: state['last_ta'] = t else: state['last_tb'] = t return True return False def download_via_pages(self, output_file: str = None, decode_controls: bool = True) -> Union[bytes, str]: """ Download data by fetching pages (like the preview does). This bypasses the date range issue entirely. Args: output_file: Optional filename to save to decode_controls: If True, decode <13>, <10> etc. to actual control characters Returns: Parsed log data as bytes or string (if decoded) """ print("Getting configuration...") config = self.get_config() print(f"Config: {config}") all_data = bytearray() # Get first page print("\nFetching first page...") url = f"{self.base_url}/page.xml?Page=0" response = self.session.get(url, timeout=10) response.raise_for_status() first_page = bytearray(response.content) print(f"First page: {len(first_page)} bytes") if len(first_page) > 4: all_data.extend(first_page) # Get last page to see total size print("Fetching last page...") url = f"{self.base_url}/page.xml?Page=1" response = self.session.get(url, timeout=10) response.raise_for_status() last_page = bytearray(response.content) print(f"Last page: {len(last_page)} bytes") # Now keep getting next pages until we get back to the last page print("\nFetching all pages...") current_page = first_page page_count = 1 max_pages = 1000 # Safety limit while page_count < max_pages: url = f"{self.base_url}/page.xml?Page=3" # 3 = next page response = self.session.get(url, timeout=10) response.raise_for_status() next_page = bytearray(response.content) # Check if we've reached the end (data repeats) if next_page == current_page or next_page == last_page: print(f"Reached end after {page_count} pages") break if len(next_page) > 4: all_data.extend(next_page) page_count += 1 if page_count % 10 == 0: print(f"Fetched {page_count} pages, {len(all_data)} bytes total...") current_page = next_page print(f"\nTotal raw data collected: {len(all_data)} bytes from {page_count} pages") # Parse the data print("Parsing data...") file_mode = config['file_mode'] # Treat all_data as a single chunk chunks = [all_data] if file_mode == 2: # Separate files for channel A and B data_a = self.parse_log_data(chunks, config, channel=1) data_b = self.parse_log_data(chunks, config, channel=2) # Decode control characters if requested if decode_controls: data_a_decoded = self.decode_control_characters(data_a) data_b_decoded = self.decode_control_characters(data_b) if output_file: with open(f"{output_file}_A.txt", 'w', encoding='utf-8') as f: f.write(data_a_decoded) with open(f"{output_file}_B.txt", 'w', encoding='utf-8') as f: f.write(data_b_decoded) print(f"\nSaved decoded data to {output_file}_A.txt and {output_file}_B.txt") return data_a_decoded, data_b_decoded else: if output_file: with open(f"{output_file}_A.txt", 'wb') as f: f.write(data_a) with open(f"{output_file}_B.txt", 'wb') as f: f.write(data_b) print(f"\nSaved raw data to {output_file}_A.txt and {output_file}_B.txt") return data_a, data_b else: # Combined file data = self.parse_log_data(chunks, config, channel=3) # Decode control characters if requested if decode_controls: data_decoded = self.decode_control_characters(data) if output_file: with open(f"{output_file}.txt", 'w', encoding='utf-8') as f: f.write(data_decoded) print(f"\nSaved {len(data_decoded)} characters to {output_file}.txt") return data_decoded else: if output_file: with open(f"{output_file}.txt", 'wb') as f: f.write(data) print(f"\nSaved {len(data)} bytes to {output_file}.txt") return data # Usage example if __name__ == "__main__": downloader = RSLoggerDownloader("http://rslogger") print("Downloading via page-by-page method...") print("="*60 + "\n") data = downloader.download_via_pages( output_file="rslogger_data", decode_controls=True # This will decode <13>, <10> etc. ) if data and len(data) > 0: print("\n" + "="*60) print("Download complete!") print("="*60) print(f"\nFirst 1000 characters:") print(data[:1000]) else: print("\nNo data was downloaded.")