rdatool/rdatool.py

318 lines
9.5 KiB
Python

import struct
import logging
import argparse
import time
import binascii
import re
from contextlib import contextmanager
import serial
PACKET_TRACE = 128
PACKET_EVENT = 255
CMD_RD_DWORD = 2
CMD_RD_REG = 4
CMD_WR_DWORD = 130
CMD_WR_REG = 132
REG_CHIP_ID = 0x1a24000
REG_RESET_CAUSE = 0x1a000a0
CTRL_CFG_REG = 3
def lod_load(fd):
data = {'meta': {}, 'sectors': {}}
cur_sector = None
for line in fd.readlines():
if line.startswith('#'):
m = re.match(r'#\W+(\w+)=(.+)', line)
if m:
data['meta'].update([m.groups()])
elif line.startswith('@'):
cur_sector = int(line[1:9], 16)
data['sectors'][cur_sector] = bytearray()
elif line.strip():
# Data is stored in little-endian, flip it around
buf = binascii.unhexlify(line.strip())[::-1]
data['sectors'][cur_sector].extend(buf)
return data
class TimeoutException(serial.SerialTimeoutException):
pass
class RDAClient(serial.Serial):
"""RDA8851 and friends ("Coolsand") programming interface library"""
def __init__(self, path, baudrate=921600, *args, **kwargs):
super(RDAClient, self).__init__(path, baudrate, *args, **kwargs)
self.trace_logger = logging.getLogger('rda.trace')
def run(self):
while True:
try:
self.handle_packet(*self.read_packet())
except KeyboardInterrupt:
return
except serial.SerialException:
raise
except:
logging.exception('Oops!')
time.sleep(0.01)
def wait_for_packet(self, pid, handle=False):
"""Waits for single packet from device, handles every other packet when
needed"""
while True:
packet_id, data = self.read_packet()
if packet_id == pid:
return data
try:
if handle:
self.handle_packet(packet_id, data)
except:
logging.exception('Oops!')
def wait_for_event(self, event, handle=False):
"""Waits for specific event packet"""
while True:
data = self.wait_for_packet(PACKET_EVENT, handle)
if struct.unpack('<xI', data)[0] == event:
return True
def handle_packet(self, packet_type, data):
"""Generic debugging packet handler"""
if packet_type == PACKET_TRACE:
traceline = data[2:data.find(b'\x00', 2)].decode()
self.trace_logger.info(traceline)
elif packet_type == PACKET_EVENT:
logging.debug('Event: %08x', *struct.unpack('<xI', data))
elif packet_type == 131:
logging.debug('131: %d', *struct.unpack('<I', data))
elif packet_type == 162:
logging.info('162: %r %r', packet_type, data)
else:
logging.debug('Unhandled packet %d (%r)', packet_type, data)
def read_packet(self):
"""Reads single packet from device, returns tuple with packet ID/type
and its data"""
preamble = self.read(1)
if preamble != b'\xad':
raise Exception('Invalid preamble (got %r)' % preamble)
packet_len, = struct.unpack('>H', self.read_unescape(2))
if packet_len == 0 or packet_len == 1:
raise Exception('Empty packet?')
data = self.read_unescape(packet_len)
expected_checksum = self.read_unescape(1)[0]
checksum = self.calculate_checksum(data)
if checksum != expected_checksum:
raise Exception('Invalid checksum (%d expected, got %d)' % (expected_checksum, checksum))
logging.debug(' <- %s', binascii.hexlify(data).decode())
return (data[0], data[1:])
def write_packet(self, packet_id, data):
"""Sends single packet to device"""
data = bytearray([packet_id]) + data
data_len = len(data)
logging.debug(' -> %s', binascii.hexlify(data).decode())
payload = bytearray([0xad]) + struct.pack('>H', data_len) + data + bytearray([self.calculate_checksum(data)])
self.write_escape(payload)
timeouts = []
@contextmanager
def do_timeout(self, t):
"""Context manager that adds timeouts to blocking operations"""
self.timeouts.append(time.time() + t)
yield
self.timeouts.pop()
def check_timeout(self):
"""Returns seconds left to earliest timeout"""
if not self.timeouts:
return None
t = min(self.timeouts) - time.time()
if t < 0:
raise TimeoutException()
return t
def _sleep(self, t):
"""Sleep for t seconds or raise TimeoutException if not enough time left"""
if self.check_timeout() < t:
raise TimeoutException()
time.sleep(t)
def read(self, size):
"""Reads bytes with timeout exception support"""
self.timeout = self.check_timeout()
buf = super(RDAClient, self).read(size)
if len(buf) != size:
raise TimeoutException()
return buf
def read_unescape(self, l):
"""Reads and unescapes data"""
s = bytearray(self.read(l))
while True:
read_esc_count = s.count(b'\\') + l - len(s)
if read_esc_count == 0:
break
s.append(self.read(read_esc_count))
s = s.replace(b'\\\xee', b'\x11')
s = s.replace(b'\\\xec', b'\x13')
s = s.replace(b'\\\xa3', b'\\')
return s
def write_escape(self, data):
"""Escapes and writes data to device"""
data = data.replace(b'\\', b'\\\xa3') \
.replace(b'\x11', b'\\\xee') \
.replace(b'\x13', b'\\\xec')
self.write(data)
def calculate_checksum(self, data):
"""Calculates XOR-based checksum"""
checksum = 0
for b in data:
checksum = checksum ^ b
return checksum
_cnt = 1
def read_value(self, cmd, addr, raw=False):
"""Reads single value (either "reg" or dword from memory address)"""
self._cnt = (self._cnt % 255) + 1
self.write_packet(255, struct.pack('<BIB', cmd, addr, self._cnt))
while True:
data = self.wait_for_packet(PACKET_EVENT)
if data[0] == self._cnt:
if raw:
return data[1:]
elif cmd == CMD_RD_DWORD:
return struct.unpack('<I', data[1:])[0]
elif cmd == CMD_RD_REG:
return struct.unpack('<B', data[1:])[0]
def read_reg(self, addr, raw=False):
"""Helper function to read single reg"""
return self.read_value(CMD_RD_REG, addr, raw)
def read_dword(self, addr, raw=False):
"""Helper function to read single memory dword"""
return self.read_value(CMD_RD_DWORD, addr, raw)
def read_block(self, addr, read_len):
"""Reads memory block"""
data = ""
for offset in range(0, read_len, 4):
data += self.read_dword(addr + offset, raw=True)
return data
def write_value(self, cmd, addr, value):
"""Writes single value (either "reg" or single memory dword)"""
if cmd == CMD_WR_REG:
payload = struct.pack('<BIB', cmd, addr, value)
elif cmd == CMD_WR_DWORD:
payload = struct.pack('<BII', cmd, addr, value)
self.write_packet(255, payload)
def write_reg(self, addr, value):
"""Helper function to write reg value"""
return self.write_value(CMD_WR_REG, addr, value)
def write_dword(self, addr, value):
"""Helper function to write single dword to memory"""
return self.write_value(CMD_WR_DWORD, addr, value)
# High level interface
def chipid(self):
return self.read_dword(REG_CHIP_ID)
def host_mode_enable(self):
self.write_dword(REG_RESET_CAUSE, 0x6a0000)
self.write_reg(0, 1)
self._sleep(1)
for i in range(3):
self.write_reg(CTRL_CFG_REG, 128)
self._sleep(1)
self.reset_input_buffer()
self.write_reg(1, 2)
self.write_reg(5, 253)
self.wait_for_event(0xff000001)
self.write_dword(REG_RESET_CAUSE, 0x600000)
def cli_trace(client, args):
client.run()
def cli_at(client, args):
at_cmd = 'AT+POWER=2,200\r'
client.write_packet(162, '\x04' + at_cmd + '\xc0')
client.run()
def cli_chipid(client, args):
logging.info('Chip ID: %08x', client.chipid())
DEFAULT_SERIAL = '/dev/serial/by-id/usb-Silicon_Labs_CP2102_USB_to_UART_' \
+ 'Bridge_Controller_0001-if00-port0'
parser = argparse.ArgumentParser()
parser.add_argument('--verbose', '-v', action='count', default=0,
help='show debug information')
parser.add_argument('--port', '-p', default=DEFAULT_SERIAL,
help='serial port')
subparsers = parser.add_subparsers()
trace_parser = subparsers.add_parser('trace', help='show trace')
trace_parser.set_defaults(func=cli_trace)
at_parser = subparsers.add_parser('at', help='execute AT command')
at_parser.set_defaults(func=cli_at)
chipid_parser = subparsers.add_parser('chipid', help='show chip ID')
chipid_parser.set_defaults(func=cli_chipid)
if __name__ == "__main__":
args = parser.parse_args()
logging.basicConfig(level=logging.INFO - 10*args.verbose)
client = RDAClient(args.port)
args.func(client, args)
#c = RDAClient('/dev/serial/by-id/usb-Silicon_Labs_CP2102_USB_to_UART_Bridge_Controller_0001-if00-port0') #'/dev/serial/by-id/usb-1a86_USB2.0-Serial-if00-port0')
#c = RDAClient('/dev/serial/by-id/usb-1a86_USB2.0-Serial-if00-port0')
#c.run()