415 lines
15 KiB
Python
415 lines
15 KiB
Python
import requests
|
|
from typing import List, Optional, Union, Tuple
|
|
import re
|
|
|
|
class RSLoggerDownloader:
|
|
def __init__(self, base_url: str):
|
|
"""
|
|
Initialize downloader for RS Logger device
|
|
|
|
Args:
|
|
base_url: Base URL like "http://rslogger" or "http://192.168.1.100"
|
|
"""
|
|
self.base_url = base_url.rstrip('/')
|
|
self.session = requests.Session()
|
|
|
|
def get_config(self) -> dict:
|
|
"""Get logger configuration parameters"""
|
|
url = f"{self.base_url}/logc.xml"
|
|
response = self.session.get(url, timeout=5)
|
|
response.raise_for_status()
|
|
|
|
parts = response.text.split('#')
|
|
config = {
|
|
'date_from': parts[0],
|
|
'date_to': parts[1],
|
|
'timestamp': int(parts[2]),
|
|
'file_mode': int(parts[3]),
|
|
'data_format': int(parts[4]),
|
|
'timestamp_char': chr(int(parts[5])),
|
|
'time_format': int(parts[6])
|
|
}
|
|
|
|
return config
|
|
|
|
def _get_progress(self, data: bytearray) -> int:
|
|
"""Extract progress percentage from end of data"""
|
|
length = len(data)
|
|
while length >= 2 and data[length-1] == 10 and data[length-2] == 13:
|
|
length -= 2
|
|
|
|
if length > 0:
|
|
return data[length - 1]
|
|
return 0
|
|
|
|
def _clear_endofline(self, data: bytearray) -> int:
|
|
"""Remove trailing carriage returns and get data length"""
|
|
length = len(data)
|
|
while length >= 2 and data[length-1] == 10 and data[length-2] == 13:
|
|
length -= 2
|
|
return length
|
|
|
|
def decode_control_characters(self, data: bytes) -> str:
|
|
"""
|
|
Decode the <13>, <10>, etc. control character sequences back to actual characters.
|
|
|
|
Args:
|
|
data: Raw bytes with <NN> sequences
|
|
|
|
Returns:
|
|
String with control characters properly decoded
|
|
"""
|
|
# Convert bytes to string
|
|
text = data.decode('ascii', errors='replace')
|
|
|
|
# Replace <NN> patterns with actual characters
|
|
def replace_control(match):
|
|
code = int(match.group(1))
|
|
return chr(code)
|
|
|
|
# Match <digits> pattern and replace with the actual character
|
|
decoded = re.sub(r'<(\d+)>', replace_control, text)
|
|
|
|
return decoded
|
|
|
|
def parse_log_data(self, chunks: List[bytearray], config: dict, channel: int = 3) -> bytearray:
|
|
"""
|
|
Parse raw log data chunks into readable format
|
|
|
|
Args:
|
|
chunks: List of raw data chunks
|
|
config: Configuration dictionary from get_config()
|
|
channel: Channel to parse (1=A, 2=B, 3=both)
|
|
|
|
Returns:
|
|
Parsed data as bytearray
|
|
"""
|
|
result = bytearray()
|
|
|
|
# State variables for parsing
|
|
state = {
|
|
'last_ch': 0,
|
|
'last_char': 0,
|
|
'day_stamp': True,
|
|
'last_ta': 0,
|
|
'last_tb': 0,
|
|
'h': 0,
|
|
'd': 0,
|
|
'm': 0,
|
|
'y': 0
|
|
}
|
|
|
|
timestamp = config['timestamp']
|
|
data_format = config['data_format']
|
|
ts_char = config['timestamp_char']
|
|
time_format = config['time_format']
|
|
|
|
# Determine time interval
|
|
t_interval = 0
|
|
if timestamp >= 50000:
|
|
t_interval = timestamp - 50000
|
|
elif timestamp > 1000:
|
|
t_interval = timestamp - 1000
|
|
elif timestamp > 2:
|
|
timestamp = 2
|
|
|
|
for chunk_idx, chunk in enumerate(chunks):
|
|
length = self._clear_endofline(chunk)
|
|
if length <= 0:
|
|
continue
|
|
|
|
# Remove progress byte at end
|
|
length -= 1
|
|
|
|
if length <= 0:
|
|
continue
|
|
|
|
parsed = self._parse_chunk(
|
|
chunk, length, channel, timestamp, t_interval,
|
|
data_format, ts_char, time_format, state
|
|
)
|
|
result.extend(parsed)
|
|
|
|
return result
|
|
|
|
def _parse_chunk(self, data: bytearray, length: int, channel: int,
|
|
timestamp: int, t_interval: int, data_format: int,
|
|
ts_char: str, time_format: int, state: dict) -> bytearray:
|
|
"""Parse a single chunk of data"""
|
|
result = bytearray()
|
|
index = 0
|
|
|
|
while index < length:
|
|
if (length - index) < 4:
|
|
break
|
|
|
|
byte0 = data[index]
|
|
byte1 = data[index + 1]
|
|
byte2 = data[index + 2]
|
|
byte3 = data[index + 3]
|
|
|
|
# Check if this is a timestamp marker (high bit set)
|
|
if byte0 & 0x80:
|
|
# Date/time record
|
|
state['h'] = byte0 & 0x7F
|
|
dtmp = byte1
|
|
mtmp = byte2
|
|
ytmp = byte3 + 2000
|
|
|
|
if state['d'] != dtmp or state['m'] != mtmp or state['y'] != ytmp:
|
|
state['d'] = dtmp
|
|
state['m'] = mtmp
|
|
state['y'] = ytmp
|
|
|
|
if timestamp != 0:
|
|
if len(result) > 0:
|
|
result.extend(b'\r\n')
|
|
date_str = f"[{ytmp}-{mtmp:02d}-{dtmp:02d}]"
|
|
result.extend(date_str.encode('ascii'))
|
|
|
|
state['last_ta'] = 0
|
|
state['last_tb'] = 0
|
|
state['last_ch'] = 0
|
|
state['day_stamp'] = True
|
|
else:
|
|
# Data record
|
|
ch = 'B' if (byte0 & 0x40) else 'A'
|
|
ch_mask = 2 if ch == 'B' else 1
|
|
|
|
minute = byte0 & 0x3F
|
|
data_byte = byte1
|
|
tu16 = byte2 | (byte3 << 8)
|
|
ms = tu16 & 0x3FF
|
|
s = (byte3 >> 2) & 0x3F
|
|
|
|
if (channel & ch_mask):
|
|
# Format time string
|
|
if time_format == 0:
|
|
h_str = f"{state['h']:2d}"
|
|
else:
|
|
if state['h'] == 0:
|
|
h_str = "A12"
|
|
elif state['h'] < 12:
|
|
h_str = f"A{state['h']:2d}"
|
|
elif state['h'] == 12:
|
|
h_str = "P12"
|
|
else:
|
|
h_str = f"P{state['h']-12:2d}"
|
|
|
|
time_str = f"{h_str}:{minute:02d}:{s:02d}.{ms:03d}"
|
|
if channel == 3:
|
|
time_str += ch
|
|
time_str += ts_char
|
|
|
|
# Decide if we need to add timestamp based on config
|
|
add_time = self._should_add_timestamp(
|
|
timestamp, t_interval, ch_mask, s, minute, state['h'],
|
|
state, channel
|
|
)
|
|
|
|
if add_time:
|
|
result.extend(b'\r\n')
|
|
result.extend(time_str.encode('ascii'))
|
|
|
|
# Add the data byte
|
|
if data_format == 1:
|
|
# Hex format
|
|
result.extend(ts_char.encode('ascii'))
|
|
result.extend(f"{data_byte:x}".encode('ascii'))
|
|
else:
|
|
# ASCII format
|
|
if data_byte < 32:
|
|
result.extend(f"<{data_byte}>".encode('ascii'))
|
|
else:
|
|
result.append(data_byte)
|
|
|
|
state['day_stamp'] = False
|
|
|
|
state['last_ch'] = ch_mask
|
|
state['last_char'] = data_byte
|
|
|
|
index += 4
|
|
|
|
return result
|
|
|
|
def _should_add_timestamp(self, timestamp: int, t_interval: int,
|
|
ch_mask: int, s: int, minute: int, h: int,
|
|
state: dict, channel: int) -> bool:
|
|
"""Determine if timestamp should be added based on configuration"""
|
|
if timestamp == 1:
|
|
return True
|
|
|
|
if timestamp >= 50000:
|
|
if timestamp < 50256:
|
|
if t_interval == state['last_char']:
|
|
return True
|
|
elif timestamp == 2 or t_interval > 0:
|
|
t = s + 60 * minute + 3600 * h
|
|
should_add = False
|
|
|
|
if state['last_ch'] != ch_mask:
|
|
should_add = True
|
|
if t_interval > 0 and channel != 3:
|
|
should_add = False
|
|
|
|
if t_interval > 0:
|
|
t_last = state['last_ta'] if ch_mask == 1 else state['last_tb']
|
|
if (t - t_last) > t_interval:
|
|
should_add = True
|
|
|
|
if should_add:
|
|
if channel == 3:
|
|
state['last_ta'] = t
|
|
state['last_tb'] = t
|
|
elif ch_mask == 1:
|
|
state['last_ta'] = t
|
|
else:
|
|
state['last_tb'] = t
|
|
return True
|
|
|
|
return False
|
|
|
|
def download_via_pages(self, output_file: str = None, decode_controls: bool = True) -> Union[bytes, str]:
|
|
"""
|
|
Download data by fetching pages (like the preview does).
|
|
This bypasses the date range issue entirely.
|
|
|
|
Args:
|
|
output_file: Optional filename to save to
|
|
decode_controls: If True, decode <13>, <10> etc. to actual control characters
|
|
|
|
Returns:
|
|
Parsed log data as bytes or string (if decoded)
|
|
"""
|
|
print("Getting configuration...")
|
|
config = self.get_config()
|
|
print(f"Config: {config}")
|
|
|
|
all_data = bytearray()
|
|
|
|
# Get first page
|
|
print("\nFetching first page...")
|
|
url = f"{self.base_url}/page.xml?Page=0"
|
|
response = self.session.get(url, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
first_page = bytearray(response.content)
|
|
print(f"First page: {len(first_page)} bytes")
|
|
|
|
if len(first_page) > 4:
|
|
all_data.extend(first_page)
|
|
|
|
# Get last page to see total size
|
|
print("Fetching last page...")
|
|
url = f"{self.base_url}/page.xml?Page=1"
|
|
response = self.session.get(url, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
last_page = bytearray(response.content)
|
|
print(f"Last page: {len(last_page)} bytes")
|
|
|
|
# Now keep getting next pages until we get back to the last page
|
|
print("\nFetching all pages...")
|
|
current_page = first_page
|
|
page_count = 1
|
|
max_pages = 1000 # Safety limit
|
|
|
|
while page_count < max_pages:
|
|
url = f"{self.base_url}/page.xml?Page=3" # 3 = next page
|
|
response = self.session.get(url, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
next_page = bytearray(response.content)
|
|
|
|
# Check if we've reached the end (data repeats)
|
|
if next_page == current_page or next_page == last_page:
|
|
print(f"Reached end after {page_count} pages")
|
|
break
|
|
|
|
if len(next_page) > 4:
|
|
all_data.extend(next_page)
|
|
page_count += 1
|
|
if page_count % 10 == 0:
|
|
print(f"Fetched {page_count} pages, {len(all_data)} bytes total...")
|
|
|
|
current_page = next_page
|
|
|
|
print(f"\nTotal raw data collected: {len(all_data)} bytes from {page_count} pages")
|
|
|
|
# Parse the data
|
|
print("Parsing data...")
|
|
file_mode = config['file_mode']
|
|
|
|
# Treat all_data as a single chunk
|
|
chunks = [all_data]
|
|
|
|
if file_mode == 2:
|
|
# Separate files for channel A and B
|
|
data_a = self.parse_log_data(chunks, config, channel=1)
|
|
data_b = self.parse_log_data(chunks, config, channel=2)
|
|
|
|
# Decode control characters if requested
|
|
if decode_controls:
|
|
data_a_decoded = self.decode_control_characters(data_a)
|
|
data_b_decoded = self.decode_control_characters(data_b)
|
|
|
|
if output_file:
|
|
with open(f"{output_file}_A.txt", 'w', encoding='utf-8') as f:
|
|
f.write(data_a_decoded)
|
|
with open(f"{output_file}_B.txt", 'w', encoding='utf-8') as f:
|
|
f.write(data_b_decoded)
|
|
print(f"\nSaved decoded data to {output_file}_A.txt and {output_file}_B.txt")
|
|
|
|
return data_a_decoded, data_b_decoded
|
|
else:
|
|
if output_file:
|
|
with open(f"{output_file}_A.txt", 'wb') as f:
|
|
f.write(data_a)
|
|
with open(f"{output_file}_B.txt", 'wb') as f:
|
|
f.write(data_b)
|
|
print(f"\nSaved raw data to {output_file}_A.txt and {output_file}_B.txt")
|
|
|
|
return data_a, data_b
|
|
else:
|
|
# Combined file
|
|
data = self.parse_log_data(chunks, config, channel=3)
|
|
|
|
# Decode control characters if requested
|
|
if decode_controls:
|
|
data_decoded = self.decode_control_characters(data)
|
|
|
|
if output_file:
|
|
with open(f"{output_file}.txt", 'w', encoding='utf-8') as f:
|
|
f.write(data_decoded)
|
|
print(f"\nSaved {len(data_decoded)} characters to {output_file}.txt")
|
|
|
|
return data_decoded
|
|
else:
|
|
if output_file:
|
|
with open(f"{output_file}.txt", 'wb') as f:
|
|
f.write(data)
|
|
print(f"\nSaved {len(data)} bytes to {output_file}.txt")
|
|
|
|
return data
|
|
|
|
|
|
# Usage example
|
|
if __name__ == "__main__":
|
|
downloader = RSLoggerDownloader("http://rslogger")
|
|
|
|
print("Downloading via page-by-page method...")
|
|
print("="*60 + "\n")
|
|
|
|
data = downloader.download_via_pages(
|
|
output_file="rslogger_data",
|
|
decode_controls=True # This will decode <13>, <10> etc.
|
|
)
|
|
|
|
if data and len(data) > 0:
|
|
print("\n" + "="*60)
|
|
print("Download complete!")
|
|
print("="*60)
|
|
print(f"\nFirst 1000 characters:")
|
|
print(data[:1000])
|
|
else:
|
|
print("\nNo data was downloaded.") |