318 lines
9.5 KiB
Python
318 lines
9.5 KiB
Python
import struct
|
|
import logging
|
|
import argparse
|
|
import time
|
|
import binascii
|
|
import re
|
|
from contextlib import contextmanager
|
|
|
|
import serial
|
|
|
|
PACKET_TRACE = 128
|
|
PACKET_EVENT = 255
|
|
|
|
CMD_RD_DWORD = 2
|
|
CMD_RD_REG = 4
|
|
CMD_WR_DWORD = 130
|
|
CMD_WR_REG = 132
|
|
|
|
REG_CHIP_ID = 0x1a24000
|
|
REG_RESET_CAUSE = 0x1a000a0
|
|
|
|
CTRL_CFG_REG = 3
|
|
|
|
|
|
def lod_load(fd):
|
|
data = {'meta': {}, 'sectors': {}}
|
|
cur_sector = None
|
|
|
|
for line in fd.readlines():
|
|
if line.startswith('#'):
|
|
m = re.match(r'#\W+(\w+)=(.+)', line)
|
|
if m:
|
|
data['meta'].update([m.groups()])
|
|
elif line.startswith('@'):
|
|
cur_sector = int(line[1:9], 16)
|
|
data['sectors'][cur_sector] = bytearray()
|
|
elif line.strip():
|
|
# Data is stored in little-endian, flip it around
|
|
buf = binascii.unhexlify(line.strip())[::-1]
|
|
data['sectors'][cur_sector].extend(buf)
|
|
|
|
return data
|
|
|
|
|
|
class TimeoutException(serial.SerialTimeoutException):
|
|
pass
|
|
|
|
|
|
class RDAClient(serial.Serial):
|
|
"""RDA8851 and friends ("Coolsand") programming interface library"""
|
|
|
|
def __init__(self, path, baudrate=921600, *args, **kwargs):
|
|
super(RDAClient, self).__init__(path, baudrate, *args, **kwargs)
|
|
self.trace_logger = logging.getLogger('rda.trace')
|
|
|
|
def run(self):
|
|
while True:
|
|
try:
|
|
self.handle_packet(*self.read_packet())
|
|
except KeyboardInterrupt:
|
|
return
|
|
except serial.SerialException:
|
|
raise
|
|
except:
|
|
logging.exception('Oops!')
|
|
time.sleep(0.01)
|
|
|
|
def wait_for_packet(self, pid, handle=False):
|
|
"""Waits for single packet from device, handles every other packet when
|
|
needed"""
|
|
while True:
|
|
packet_id, data = self.read_packet()
|
|
if packet_id == pid:
|
|
return data
|
|
|
|
try:
|
|
if handle:
|
|
self.handle_packet(packet_id, data)
|
|
except:
|
|
logging.exception('Oops!')
|
|
|
|
def wait_for_event(self, event, handle=False):
|
|
"""Waits for specific event packet"""
|
|
while True:
|
|
data = self.wait_for_packet(PACKET_EVENT, handle)
|
|
if struct.unpack('<xI', data)[0] == event:
|
|
return True
|
|
|
|
def handle_packet(self, packet_type, data):
|
|
"""Generic debugging packet handler"""
|
|
if packet_type == PACKET_TRACE:
|
|
traceline = data[2:data.find(b'\x00', 2)].decode()
|
|
self.trace_logger.info(traceline)
|
|
elif packet_type == PACKET_EVENT:
|
|
logging.debug('Event: %08x', *struct.unpack('<xI', data))
|
|
elif packet_type == 131:
|
|
logging.debug('131: %d', *struct.unpack('<I', data))
|
|
elif packet_type == 162:
|
|
logging.info('162: %r %r', packet_type, data)
|
|
else:
|
|
logging.debug('Unhandled packet %d (%r)', packet_type, data)
|
|
|
|
def read_packet(self):
|
|
"""Reads single packet from device, returns tuple with packet ID/type
|
|
and its data"""
|
|
preamble = self.read(1)
|
|
if preamble != b'\xad':
|
|
raise Exception('Invalid preamble (got %r)' % preamble)
|
|
|
|
packet_len, = struct.unpack('>H', self.read_unescape(2))
|
|
|
|
if packet_len == 0 or packet_len == 1:
|
|
raise Exception('Empty packet?')
|
|
|
|
data = self.read_unescape(packet_len)
|
|
|
|
expected_checksum = self.read_unescape(1)[0]
|
|
checksum = self.calculate_checksum(data)
|
|
if checksum != expected_checksum:
|
|
raise Exception('Invalid checksum (%d expected, got %d)' % (expected_checksum, checksum))
|
|
|
|
logging.debug(' <- %s', binascii.hexlify(data).decode())
|
|
|
|
return (data[0], data[1:])
|
|
|
|
def write_packet(self, packet_id, data):
|
|
"""Sends single packet to device"""
|
|
data = bytearray([packet_id]) + data
|
|
data_len = len(data)
|
|
|
|
logging.debug(' -> %s', binascii.hexlify(data).decode())
|
|
|
|
payload = bytearray([0xad]) + struct.pack('>H', data_len) + data + bytearray([self.calculate_checksum(data)])
|
|
self.write_escape(payload)
|
|
|
|
timeouts = []
|
|
|
|
@contextmanager
|
|
def do_timeout(self, t):
|
|
"""Context manager that adds timeouts to blocking operations"""
|
|
self.timeouts.append(time.time() + t)
|
|
yield
|
|
self.timeouts.pop()
|
|
|
|
def check_timeout(self):
|
|
"""Returns seconds left to earliest timeout"""
|
|
if not self.timeouts:
|
|
return None
|
|
|
|
t = min(self.timeouts) - time.time()
|
|
|
|
if t < 0:
|
|
raise TimeoutException()
|
|
|
|
return t
|
|
|
|
def _sleep(self, t):
|
|
"""Sleep for t seconds or raise TimeoutException if not enough time left"""
|
|
if self.check_timeout() < t:
|
|
raise TimeoutException()
|
|
|
|
time.sleep(t)
|
|
|
|
def read(self, size):
|
|
"""Reads bytes with timeout exception support"""
|
|
self.timeout = self.check_timeout()
|
|
buf = super(RDAClient, self).read(size)
|
|
|
|
if len(buf) != size:
|
|
raise TimeoutException()
|
|
|
|
return buf
|
|
|
|
def read_unescape(self, l):
|
|
"""Reads and unescapes data"""
|
|
s = bytearray(self.read(l))
|
|
while True:
|
|
read_esc_count = s.count(b'\\') + l - len(s)
|
|
if read_esc_count == 0:
|
|
break
|
|
s.append(self.read(read_esc_count))
|
|
|
|
s = s.replace(b'\\\xee', b'\x11')
|
|
s = s.replace(b'\\\xec', b'\x13')
|
|
s = s.replace(b'\\\xa3', b'\\')
|
|
return s
|
|
|
|
def write_escape(self, data):
|
|
"""Escapes and writes data to device"""
|
|
data = data.replace(b'\\', b'\\\xa3') \
|
|
.replace(b'\x11', b'\\\xee') \
|
|
.replace(b'\x13', b'\\\xec')
|
|
self.write(data)
|
|
|
|
def calculate_checksum(self, data):
|
|
"""Calculates XOR-based checksum"""
|
|
checksum = 0
|
|
for b in data:
|
|
checksum = checksum ^ b
|
|
|
|
return checksum
|
|
|
|
_cnt = 1
|
|
def read_value(self, cmd, addr, raw=False):
|
|
"""Reads single value (either "reg" or dword from memory address)"""
|
|
self._cnt = (self._cnt % 255) + 1
|
|
self.write_packet(255, struct.pack('<BIB', cmd, addr, self._cnt))
|
|
|
|
while True:
|
|
data = self.wait_for_packet(PACKET_EVENT)
|
|
if data[0] == self._cnt:
|
|
if raw:
|
|
return data[1:]
|
|
elif cmd == CMD_RD_DWORD:
|
|
return struct.unpack('<I', data[1:])[0]
|
|
elif cmd == CMD_RD_REG:
|
|
return struct.unpack('<B', data[1:])[0]
|
|
|
|
def read_reg(self, addr, raw=False):
|
|
"""Helper function to read single reg"""
|
|
return self.read_value(CMD_RD_REG, addr, raw)
|
|
|
|
def read_dword(self, addr, raw=False):
|
|
"""Helper function to read single memory dword"""
|
|
return self.read_value(CMD_RD_DWORD, addr, raw)
|
|
|
|
def read_block(self, addr, read_len):
|
|
"""Reads memory block"""
|
|
data = ""
|
|
|
|
for offset in range(0, read_len, 4):
|
|
data += self.read_dword(addr + offset, raw=True)
|
|
|
|
return data
|
|
|
|
def write_value(self, cmd, addr, value):
|
|
"""Writes single value (either "reg" or single memory dword)"""
|
|
if cmd == CMD_WR_REG:
|
|
payload = struct.pack('<BIB', cmd, addr, value)
|
|
elif cmd == CMD_WR_DWORD:
|
|
payload = struct.pack('<BII', cmd, addr, value)
|
|
|
|
self.write_packet(255, payload)
|
|
|
|
def write_reg(self, addr, value):
|
|
"""Helper function to write reg value"""
|
|
return self.write_value(CMD_WR_REG, addr, value)
|
|
|
|
def write_dword(self, addr, value):
|
|
"""Helper function to write single dword to memory"""
|
|
return self.write_value(CMD_WR_DWORD, addr, value)
|
|
|
|
# High level interface
|
|
def chipid(self):
|
|
return self.read_dword(REG_CHIP_ID)
|
|
|
|
def host_mode_enable(self):
|
|
self.write_dword(REG_RESET_CAUSE, 0x6a0000)
|
|
self.write_reg(0, 1)
|
|
|
|
self._sleep(1)
|
|
|
|
for i in range(3):
|
|
self.write_reg(CTRL_CFG_REG, 128)
|
|
|
|
self._sleep(1)
|
|
self.reset_input_buffer()
|
|
|
|
self.write_reg(1, 2)
|
|
self.write_reg(5, 253)
|
|
|
|
self.wait_for_event(0xff000001)
|
|
|
|
self.write_dword(REG_RESET_CAUSE, 0x600000)
|
|
|
|
|
|
def cli_trace(client, args):
|
|
client.run()
|
|
|
|
def cli_at(client, args):
|
|
at_cmd = 'AT+POWER=2,200\r'
|
|
client.write_packet(162, '\x04' + at_cmd + '\xc0')
|
|
|
|
client.run()
|
|
|
|
def cli_chipid(client, args):
|
|
logging.info('Chip ID: %08x', client.chipid())
|
|
|
|
DEFAULT_SERIAL = '/dev/serial/by-id/usb-Silicon_Labs_CP2102_USB_to_UART_' \
|
|
+ 'Bridge_Controller_0001-if00-port0'
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--verbose', '-v', action='count', default=0,
|
|
help='show debug information')
|
|
parser.add_argument('--port', '-p', default=DEFAULT_SERIAL,
|
|
help='serial port')
|
|
subparsers = parser.add_subparsers()
|
|
|
|
trace_parser = subparsers.add_parser('trace', help='show trace')
|
|
trace_parser.set_defaults(func=cli_trace)
|
|
|
|
at_parser = subparsers.add_parser('at', help='execute AT command')
|
|
at_parser.set_defaults(func=cli_at)
|
|
|
|
chipid_parser = subparsers.add_parser('chipid', help='show chip ID')
|
|
chipid_parser.set_defaults(func=cli_chipid)
|
|
|
|
if __name__ == "__main__":
|
|
args = parser.parse_args()
|
|
|
|
logging.basicConfig(level=logging.INFO - 10*args.verbose)
|
|
client = RDAClient(args.port)
|
|
args.func(client, args)
|
|
|
|
#c = RDAClient('/dev/serial/by-id/usb-Silicon_Labs_CP2102_USB_to_UART_Bridge_Controller_0001-if00-port0') #'/dev/serial/by-id/usb-1a86_USB2.0-Serial-if00-port0')
|
|
#c = RDAClient('/dev/serial/by-id/usb-1a86_USB2.0-Serial-if00-port0')
|
|
#c.run()
|