Files
rslogger-merger/downloader.py
Thaddeus Hughes 150cd51cd5 download!
2025-10-15 14:35:27 -05:00

415 lines
15 KiB
Python

import requests
from typing import List, Optional, Union, Tuple
import re
class RSLoggerDownloader:
def __init__(self, base_url: str):
"""
Initialize downloader for RS Logger device
Args:
base_url: Base URL like "http://rslogger" or "http://192.168.1.100"
"""
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
def get_config(self) -> dict:
"""Get logger configuration parameters"""
url = f"{self.base_url}/logc.xml"
response = self.session.get(url, timeout=5)
response.raise_for_status()
parts = response.text.split('#')
config = {
'date_from': parts[0],
'date_to': parts[1],
'timestamp': int(parts[2]),
'file_mode': int(parts[3]),
'data_format': int(parts[4]),
'timestamp_char': chr(int(parts[5])),
'time_format': int(parts[6])
}
return config
def _get_progress(self, data: bytearray) -> int:
"""Extract progress percentage from end of data"""
length = len(data)
while length >= 2 and data[length-1] == 10 and data[length-2] == 13:
length -= 2
if length > 0:
return data[length - 1]
return 0
def _clear_endofline(self, data: bytearray) -> int:
"""Remove trailing carriage returns and get data length"""
length = len(data)
while length >= 2 and data[length-1] == 10 and data[length-2] == 13:
length -= 2
return length
def decode_control_characters(self, data: bytes) -> str:
"""
Decode the <13>, <10>, etc. control character sequences back to actual characters.
Args:
data: Raw bytes with <NN> sequences
Returns:
String with control characters properly decoded
"""
# Convert bytes to string
text = data.decode('ascii', errors='replace')
# Replace <NN> patterns with actual characters
def replace_control(match):
code = int(match.group(1))
return chr(code)
# Match <digits> pattern and replace with the actual character
decoded = re.sub(r'<(\d+)>', replace_control, text)
return decoded
def parse_log_data(self, chunks: List[bytearray], config: dict, channel: int = 3) -> bytearray:
"""
Parse raw log data chunks into readable format
Args:
chunks: List of raw data chunks
config: Configuration dictionary from get_config()
channel: Channel to parse (1=A, 2=B, 3=both)
Returns:
Parsed data as bytearray
"""
result = bytearray()
# State variables for parsing
state = {
'last_ch': 0,
'last_char': 0,
'day_stamp': True,
'last_ta': 0,
'last_tb': 0,
'h': 0,
'd': 0,
'm': 0,
'y': 0
}
timestamp = config['timestamp']
data_format = config['data_format']
ts_char = config['timestamp_char']
time_format = config['time_format']
# Determine time interval
t_interval = 0
if timestamp >= 50000:
t_interval = timestamp - 50000
elif timestamp > 1000:
t_interval = timestamp - 1000
elif timestamp > 2:
timestamp = 2
for chunk_idx, chunk in enumerate(chunks):
length = self._clear_endofline(chunk)
if length <= 0:
continue
# Remove progress byte at end
length -= 1
if length <= 0:
continue
parsed = self._parse_chunk(
chunk, length, channel, timestamp, t_interval,
data_format, ts_char, time_format, state
)
result.extend(parsed)
return result
def _parse_chunk(self, data: bytearray, length: int, channel: int,
timestamp: int, t_interval: int, data_format: int,
ts_char: str, time_format: int, state: dict) -> bytearray:
"""Parse a single chunk of data"""
result = bytearray()
index = 0
while index < length:
if (length - index) < 4:
break
byte0 = data[index]
byte1 = data[index + 1]
byte2 = data[index + 2]
byte3 = data[index + 3]
# Check if this is a timestamp marker (high bit set)
if byte0 & 0x80:
# Date/time record
state['h'] = byte0 & 0x7F
dtmp = byte1
mtmp = byte2
ytmp = byte3 + 2000
if state['d'] != dtmp or state['m'] != mtmp or state['y'] != ytmp:
state['d'] = dtmp
state['m'] = mtmp
state['y'] = ytmp
if timestamp != 0:
if len(result) > 0:
result.extend(b'\r\n')
date_str = f"[{ytmp}-{mtmp:02d}-{dtmp:02d}]"
result.extend(date_str.encode('ascii'))
state['last_ta'] = 0
state['last_tb'] = 0
state['last_ch'] = 0
state['day_stamp'] = True
else:
# Data record
ch = 'B' if (byte0 & 0x40) else 'A'
ch_mask = 2 if ch == 'B' else 1
minute = byte0 & 0x3F
data_byte = byte1
tu16 = byte2 | (byte3 << 8)
ms = tu16 & 0x3FF
s = (byte3 >> 2) & 0x3F
if (channel & ch_mask):
# Format time string
if time_format == 0:
h_str = f"{state['h']:2d}"
else:
if state['h'] == 0:
h_str = "A12"
elif state['h'] < 12:
h_str = f"A{state['h']:2d}"
elif state['h'] == 12:
h_str = "P12"
else:
h_str = f"P{state['h']-12:2d}"
time_str = f"{h_str}:{minute:02d}:{s:02d}.{ms:03d}"
if channel == 3:
time_str += ch
time_str += ts_char
# Decide if we need to add timestamp based on config
add_time = self._should_add_timestamp(
timestamp, t_interval, ch_mask, s, minute, state['h'],
state, channel
)
if add_time:
result.extend(b'\r\n')
result.extend(time_str.encode('ascii'))
# Add the data byte
if data_format == 1:
# Hex format
result.extend(ts_char.encode('ascii'))
result.extend(f"{data_byte:x}".encode('ascii'))
else:
# ASCII format
if data_byte < 32:
result.extend(f"<{data_byte}>".encode('ascii'))
else:
result.append(data_byte)
state['day_stamp'] = False
state['last_ch'] = ch_mask
state['last_char'] = data_byte
index += 4
return result
def _should_add_timestamp(self, timestamp: int, t_interval: int,
ch_mask: int, s: int, minute: int, h: int,
state: dict, channel: int) -> bool:
"""Determine if timestamp should be added based on configuration"""
if timestamp == 1:
return True
if timestamp >= 50000:
if timestamp < 50256:
if t_interval == state['last_char']:
return True
elif timestamp == 2 or t_interval > 0:
t = s + 60 * minute + 3600 * h
should_add = False
if state['last_ch'] != ch_mask:
should_add = True
if t_interval > 0 and channel != 3:
should_add = False
if t_interval > 0:
t_last = state['last_ta'] if ch_mask == 1 else state['last_tb']
if (t - t_last) > t_interval:
should_add = True
if should_add:
if channel == 3:
state['last_ta'] = t
state['last_tb'] = t
elif ch_mask == 1:
state['last_ta'] = t
else:
state['last_tb'] = t
return True
return False
def download_via_pages(self, output_file: str = None, decode_controls: bool = True) -> Union[bytes, str]:
"""
Download data by fetching pages (like the preview does).
This bypasses the date range issue entirely.
Args:
output_file: Optional filename to save to
decode_controls: If True, decode <13>, <10> etc. to actual control characters
Returns:
Parsed log data as bytes or string (if decoded)
"""
print("Getting configuration...")
config = self.get_config()
print(f"Config: {config}")
all_data = bytearray()
# Get first page
print("\nFetching first page...")
url = f"{self.base_url}/page.xml?Page=0"
response = self.session.get(url, timeout=10)
response.raise_for_status()
first_page = bytearray(response.content)
print(f"First page: {len(first_page)} bytes")
if len(first_page) > 4:
all_data.extend(first_page)
# Get last page to see total size
print("Fetching last page...")
url = f"{self.base_url}/page.xml?Page=1"
response = self.session.get(url, timeout=10)
response.raise_for_status()
last_page = bytearray(response.content)
print(f"Last page: {len(last_page)} bytes")
# Now keep getting next pages until we get back to the last page
print("\nFetching all pages...")
current_page = first_page
page_count = 1
max_pages = 1000 # Safety limit
while page_count < max_pages:
url = f"{self.base_url}/page.xml?Page=3" # 3 = next page
response = self.session.get(url, timeout=10)
response.raise_for_status()
next_page = bytearray(response.content)
# Check if we've reached the end (data repeats)
if next_page == current_page or next_page == last_page:
print(f"Reached end after {page_count} pages")
break
if len(next_page) > 4:
all_data.extend(next_page)
page_count += 1
if page_count % 10 == 0:
print(f"Fetched {page_count} pages, {len(all_data)} bytes total...")
current_page = next_page
print(f"\nTotal raw data collected: {len(all_data)} bytes from {page_count} pages")
# Parse the data
print("Parsing data...")
file_mode = config['file_mode']
# Treat all_data as a single chunk
chunks = [all_data]
if file_mode == 2:
# Separate files for channel A and B
data_a = self.parse_log_data(chunks, config, channel=1)
data_b = self.parse_log_data(chunks, config, channel=2)
# Decode control characters if requested
if decode_controls:
data_a_decoded = self.decode_control_characters(data_a)
data_b_decoded = self.decode_control_characters(data_b)
if output_file:
with open(f"{output_file}_A.txt", 'w', encoding='utf-8') as f:
f.write(data_a_decoded)
with open(f"{output_file}_B.txt", 'w', encoding='utf-8') as f:
f.write(data_b_decoded)
print(f"\nSaved decoded data to {output_file}_A.txt and {output_file}_B.txt")
return data_a_decoded, data_b_decoded
else:
if output_file:
with open(f"{output_file}_A.txt", 'wb') as f:
f.write(data_a)
with open(f"{output_file}_B.txt", 'wb') as f:
f.write(data_b)
print(f"\nSaved raw data to {output_file}_A.txt and {output_file}_B.txt")
return data_a, data_b
else:
# Combined file
data = self.parse_log_data(chunks, config, channel=3)
# Decode control characters if requested
if decode_controls:
data_decoded = self.decode_control_characters(data)
if output_file:
with open(f"{output_file}.txt", 'w', encoding='utf-8') as f:
f.write(data_decoded)
print(f"\nSaved {len(data_decoded)} characters to {output_file}.txt")
return data_decoded
else:
if output_file:
with open(f"{output_file}.txt", 'wb') as f:
f.write(data)
print(f"\nSaved {len(data)} bytes to {output_file}.txt")
return data
# Usage example
if __name__ == "__main__":
downloader = RSLoggerDownloader("http://rslogger")
print("Downloading via page-by-page method...")
print("="*60 + "\n")
data = downloader.download_via_pages(
output_file="rslogger_data",
decode_controls=True # This will decode <13>, <10> etc.
)
if data and len(data) > 0:
print("\n" + "="*60)
print("Download complete!")
print("="*60)
print(f"\nFirst 1000 characters:")
print(data[:1000])
else:
print("\nNo data was downloaded.")