Initial commit

master
informatic 2018-04-22 07:21:00 +02:00
commit 5ab4669ded
2 changed files with 291 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
*.py[co]

290
rdatool.py Normal file
View File

@ -0,0 +1,290 @@
import struct
import logging
import argparse
import time
import binascii
from contextlib import contextmanager
import serial
PACKET_TRACE = 128
PACKET_EVENT = 255
CMD_RD_DWORD = 2
CMD_RD_REG = 4
CMD_WR_DWORD = 130
CMD_WR_REG = 132
REG_CHIP_ID = 0x1a24000
REG_RESET_CAUSE = 0x1a000a0
CTRL_CFG_REG = 3
class TimeoutException(serial.SerialTimeoutException):
pass
class RDAClient(serial.Serial):
"""RDA8851 and friends ("Coolsand") programming interface library"""
def __init__(self, path, baudrate=921600, *args, **kwargs):
super(RDAClient, self).__init__(path, baudrate, *args, **kwargs)
self.trace_logger = logging.getLogger('rda.trace')
def run(self):
while True:
try:
self.handle_packet(*self.read_packet())
except KeyboardInterrupt:
return
except serial.SerialException:
raise
except:
logging.exception('Oops!')
time.sleep(0.01)
def wait_for_packet(self, pid, handle=False):
"""Waits for single packet from device, handles every other packet when
needed"""
while True:
packet_id, data = self.read_packet()
if packet_id == pid:
return data
try:
if handle:
self.handle_packet(packet_id, data)
except:
logging.exception('Oops!')
def wait_for_event(self, event, handle=False):
"""Waits for specific event packet"""
while True:
data = self.wait_for_packet(PACKET_EVENT, handle)
if struct.unpack('<xI', data)[0] == event:
return True
def handle_packet(self, packet_type, data):
"""Generic debugging packet handler"""
if packet_type == PACKET_TRACE:
traceline = data[2:data.find('\x00', 2)]
self.trace_logger.info(traceline)
elif packet_type == PACKET_EVENT:
logging.debug('Event: %08x', *struct.unpack('<xI', data))
elif packet_type == 131:
logging.debug('131: %d', *struct.unpack('<I', data))
elif packet_type == 162:
logging.info('162: %r %r', packet_type, data)
else:
logging.debug('Unhandled packet %d (%r)', packet_type, data)
def read_packet(self):
"""Reads single packet from device, returns tuple with packet ID/type
and its data"""
preamble = self.read(1)
if preamble != '\xad':
raise Exception('Invalid preamble (got %r)' % preamble)
packet_len, = struct.unpack('>H', self.read_unescape(2))
if packet_len == 0 or packet_len == 1:
raise Exception('Empty packet?')
data = self.read_unescape(packet_len)
expected_checksum = ord(self.read_unescape(1))
checksum = self.calculate_checksum(data)
if checksum != expected_checksum:
raise Exception('Invalid checksum (%d expected, got %d)' % (expected_checksum, checksum))
logging.debug(' <- %s', binascii.hexlify(data))
return (ord(data[0]), data[1:])
def write_packet(self, packet_id, data):
"""Sends single packet to device"""
data = chr(packet_id) + data
data_len = len(data)
logging.debug(' -> %s', binascii.hexlify(data))
payload = '\xad' + struct.pack('>H%dsB' % data_len, data_len, data, self.calculate_checksum(data))
self.write_escape(payload)
timeouts = []
@contextmanager
def do_timeout(self, t):
"""Context manager that adds timeouts to blocking operations"""
self.timeouts.append(time.time() + t)
yield
self.timeouts.pop()
def check_timeout(self):
"""Returns seconds left to earliest timeout"""
if not self.timeouts:
return None
t = min(self.timeouts) - time.time()
if t < 0:
raise TimeoutException()
return t
def _sleep(self, t):
"""Sleep for t seconds or raise TimeoutException if not enough time left"""
if self.check_timeout() < t:
raise TimeoutException()
time.sleep(t)
def read(self, size):
"""Reads bytes with timeout exception support"""
self.timeout = self.check_timeout()
buf = super(RDAClient, self).read(size)
if len(buf) != size:
raise TimeoutException()
return buf
def read_unescape(self, l):
"""Reads and unescapes data"""
s = bytearray(self.read(l))
while True:
read_esc_count = s.count('\\') + l - len(s)
if read_esc_count == 0:
break
s.append(self.read(read_esc_count))
s = s.replace('\\\xee', '\x11')
s = s.replace('\\\xec', '\x13')
s = s.replace('\\\xa3', '\\')
return s
def write_escape(self, data):
"""Escapes and writes data to device"""
data = data.replace('\\', '\\\xa3') \
.replace('\x11', '\\\xee') \
.replace('\x13', '\\\xec')
self.write(data)
def calculate_checksum(self, data):
"""Calculates XOR-based checksum"""
checksum = 0
for b in data:
checksum = checksum ^ ord(b)
return checksum
_cnt = 1
def read_value(self, cmd, addr, raw=False):
"""Reads single value (either "reg" or dword from memory address)"""
self._cnt = (self._cnt % 255) + 1
self.write_packet(255, struct.pack('<BIB', cmd, addr, self._cnt))
while True:
data = self.wait_for_packet(PACKET_EVENT)
if ord(data[0]) == self._cnt:
if raw:
return data[1:]
elif cmd == CMD_RD_DWORD:
return struct.unpack('<I', data[1:])[0]
elif cmd == CMD_RD_REG:
return struct.unpack('<B', data[1:])[0]
def read_reg(self, addr, raw=False):
"""Helper function to read single reg"""
return self.read_value(CMD_RD_REG, addr, raw)
def read_dword(self, addr, raw=False):
"""Helper function to read single memory dword"""
return self.read_value(CMD_RD_DWORD, addr, raw)
def read_block(self, addr, read_len):
"""Reads memory block"""
data = ""
for offset in range(0, read_len, 4):
data += self.read_dword(addr + offset, raw=True)
return data
def write_value(self, cmd, addr, value):
"""Writes single value (either "reg" or single memory dword)"""
if cmd == CMD_WR_REG:
payload = struct.pack('<BIB', cmd, addr, value)
elif cmd == CMD_WR_DWORD:
payload = struct.pack('<BII', cmd, addr, value)
self.write_packet(255, payload)
def write_reg(self, addr, value):
"""Helper function to write reg value"""
return self.write_value(CMD_WR_REG, addr, value)
def write_dword(self, addr, value):
"""Helper function to write single dword to memory"""
return self.write_value(CMD_WR_DWORD, addr, value)
# High level interface
def chipid(self):
return self.read_dword(REG_CHIP_ID)
def host_mode_enable(self):
self.write_dword(REG_RESET_CAUSE, 0x6a0000)
self.write_reg(0, 1)
self._sleep(1)
for i in range(3):
self.write_reg(CTRL_CFG_REG, 128)
self._sleep(1)
self.reset_input_buffer()
self.write_reg(1, 2)
self.write_reg(5, 253)
self.wait_for_event(0xff000001)
self.write_dword(REG_RESET_CAUSE, 0x600000)
def cli_trace(client, args):
client.run()
def cli_at(client, args):
at_cmd = 'AT+POWER=2,200\r'
client.write_packet(162, '\x04' + at_cmd + '\xc0')
client.run()
def cli_chipid(client, args):
logging.info('Chip ID: %08x', client.chipid())
parser = argparse.ArgumentParser()
parser.add_argument('--verbose', '-v', action='count', default=0)
parser.add_argument('--port', '-p', default='/dev/serial/by-id/usb-Silicon_Labs_CP2102_USB_to_UART_Bridge_Controller_0001-if00-port0')
subparsers = parser.add_subparsers()
trace_parser = subparsers.add_parser('trace', help='show trace')
trace_parser.set_defaults(func=cli_trace)
trace_parser = subparsers.add_parser('at', help='execute AT command')
trace_parser.set_defaults(func=cli_at)
trace_parser = subparsers.add_parser('chipid', help='show chip ID')
trace_parser.set_defaults(func=cli_chipid)
if __name__ == "__main__":
args = parser.parse_args()
logging.basicConfig(level=logging.INFO - 10*args.verbose)
client = RDAClient(args.port)
args.func(client, args)
#c = RDAClient('/dev/serial/by-id/usb-Silicon_Labs_CP2102_USB_to_UART_Bridge_Controller_0001-if00-port0') #'/dev/serial/by-id/usb-1a86_USB2.0-Serial-if00-port0')
#c = RDAClient('/dev/serial/by-id/usb-1a86_USB2.0-Serial-if00-port0')
#c.run()