From 5ab4669dedf67efd44b9d9465fa8d43fc1aa8ae2 Mon Sep 17 00:00:00 2001 From: Piotr Dobrowolski Date: Sun, 22 Apr 2018 07:21:00 +0200 Subject: [PATCH] Initial commit --- .gitignore | 1 + rdatool.py | 290 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 291 insertions(+) create mode 100644 .gitignore create mode 100644 rdatool.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..539da74 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.py[co] diff --git a/rdatool.py b/rdatool.py new file mode 100644 index 0000000..64f67aa --- /dev/null +++ b/rdatool.py @@ -0,0 +1,290 @@ +import struct +import logging +import argparse +import time +import binascii +from contextlib import contextmanager + +import serial + +PACKET_TRACE = 128 +PACKET_EVENT = 255 + +CMD_RD_DWORD = 2 +CMD_RD_REG = 4 +CMD_WR_DWORD = 130 +CMD_WR_REG = 132 + +REG_CHIP_ID = 0x1a24000 +REG_RESET_CAUSE = 0x1a000a0 + +CTRL_CFG_REG = 3 + +class TimeoutException(serial.SerialTimeoutException): + pass + + +class RDAClient(serial.Serial): + """RDA8851 and friends ("Coolsand") programming interface library""" + + def __init__(self, path, baudrate=921600, *args, **kwargs): + super(RDAClient, self).__init__(path, baudrate, *args, **kwargs) + self.trace_logger = logging.getLogger('rda.trace') + + def run(self): + while True: + try: + self.handle_packet(*self.read_packet()) + except KeyboardInterrupt: + return + except serial.SerialException: + raise + except: + logging.exception('Oops!') + time.sleep(0.01) + + def wait_for_packet(self, pid, handle=False): + """Waits for single packet from device, handles every other packet when + needed""" + while True: + packet_id, data = self.read_packet() + if packet_id == pid: + return data + + try: + if handle: + self.handle_packet(packet_id, data) + except: + logging.exception('Oops!') + + def wait_for_event(self, event, handle=False): + """Waits for specific event packet""" + while True: + data = self.wait_for_packet(PACKET_EVENT, handle) + if struct.unpack('H', self.read_unescape(2)) + + if packet_len == 0 or packet_len == 1: + raise Exception('Empty packet?') + + data = self.read_unescape(packet_len) + + expected_checksum = ord(self.read_unescape(1)) + checksum = self.calculate_checksum(data) + if checksum != expected_checksum: + raise Exception('Invalid checksum (%d expected, got %d)' % (expected_checksum, checksum)) + + logging.debug(' <- %s', binascii.hexlify(data)) + + return (ord(data[0]), data[1:]) + + def write_packet(self, packet_id, data): + """Sends single packet to device""" + data = chr(packet_id) + data + data_len = len(data) + + logging.debug(' -> %s', binascii.hexlify(data)) + + payload = '\xad' + struct.pack('>H%dsB' % data_len, data_len, data, self.calculate_checksum(data)) + self.write_escape(payload) + + timeouts = [] + + @contextmanager + def do_timeout(self, t): + """Context manager that adds timeouts to blocking operations""" + self.timeouts.append(time.time() + t) + yield + self.timeouts.pop() + + def check_timeout(self): + """Returns seconds left to earliest timeout""" + if not self.timeouts: + return None + + t = min(self.timeouts) - time.time() + + if t < 0: + raise TimeoutException() + + return t + + def _sleep(self, t): + """Sleep for t seconds or raise TimeoutException if not enough time left""" + if self.check_timeout() < t: + raise TimeoutException() + + time.sleep(t) + + def read(self, size): + """Reads bytes with timeout exception support""" + self.timeout = self.check_timeout() + buf = super(RDAClient, self).read(size) + + if len(buf) != size: + raise TimeoutException() + + return buf + + def read_unescape(self, l): + """Reads and unescapes data""" + s = bytearray(self.read(l)) + while True: + read_esc_count = s.count('\\') + l - len(s) + if read_esc_count == 0: + break + s.append(self.read(read_esc_count)) + + s = s.replace('\\\xee', '\x11') + s = s.replace('\\\xec', '\x13') + s = s.replace('\\\xa3', '\\') + return s + + def write_escape(self, data): + """Escapes and writes data to device""" + data = data.replace('\\', '\\\xa3') \ + .replace('\x11', '\\\xee') \ + .replace('\x13', '\\\xec') + self.write(data) + + def calculate_checksum(self, data): + """Calculates XOR-based checksum""" + checksum = 0 + for b in data: + checksum = checksum ^ ord(b) + + return checksum + + _cnt = 1 + def read_value(self, cmd, addr, raw=False): + """Reads single value (either "reg" or dword from memory address)""" + self._cnt = (self._cnt % 255) + 1 + self.write_packet(255, struct.pack('