import struct import logging import argparse import time import binascii import re from contextlib import contextmanager import serial PACKET_TRACE = 128 PACKET_EVENT = 255 CMD_RD_DWORD = 2 CMD_RD_REG = 4 CMD_WR_DWORD = 130 CMD_WR_REG = 132 REG_CHIP_ID = 0x1a24000 REG_RESET_CAUSE = 0x1a000a0 CTRL_CFG_REG = 3 def lod_load(fd): data = {'meta': {}, 'sectors': {}} cur_sector = None for line in fd.readlines(): if line.startswith('#'): m = re.match(r'#\W+(\w+)=(.+)', line) if m: data['meta'].update([m.groups()]) elif line.startswith('@'): cur_sector = int(line[1:9], 16) data['sectors'][cur_sector] = bytearray() elif line.strip(): # Data is stored in little-endian, flip it around buf = binascii.unhexlify(line.strip())[::-1] data['sectors'][cur_sector].extend(buf) return data class TimeoutException(serial.SerialTimeoutException): pass class RDAClient(serial.Serial): """RDA8851 and friends ("Coolsand") programming interface library""" def __init__(self, path, baudrate=921600, *args, **kwargs): super(RDAClient, self).__init__(path, baudrate, *args, **kwargs) self.trace_logger = logging.getLogger('rda.trace') def run(self): while True: try: self.handle_packet(*self.read_packet()) except KeyboardInterrupt: return except serial.SerialException: raise except: logging.exception('Oops!') time.sleep(0.01) def wait_for_packet(self, pid, handle=False): """Waits for single packet from device, handles every other packet when needed""" while True: packet_id, data = self.read_packet() if packet_id == pid: return data try: if handle: self.handle_packet(packet_id, data) except: logging.exception('Oops!') def wait_for_event(self, event, handle=False): """Waits for specific event packet""" while True: data = self.wait_for_packet(PACKET_EVENT, handle) if struct.unpack('H', self.read_unescape(2)) if packet_len == 0 or packet_len == 1: raise Exception('Empty packet?') data = self.read_unescape(packet_len) expected_checksum = self.read_unescape(1)[0] checksum = self.calculate_checksum(data) if checksum != expected_checksum: raise Exception('Invalid checksum (%d expected, got %d)' % (expected_checksum, checksum)) logging.debug(' <- %s', binascii.hexlify(data).decode()) return (data[0], data[1:]) def write_packet(self, packet_id, data): """Sends single packet to device""" data = bytearray([packet_id]) + data data_len = len(data) logging.debug(' -> %s', binascii.hexlify(data).decode()) payload = bytearray([0xad]) + struct.pack('>H', data_len) + data + bytearray([self.calculate_checksum(data)]) self.write_escape(payload) timeouts = [] @contextmanager def do_timeout(self, t): """Context manager that adds timeouts to blocking operations""" self.timeouts.append(time.time() + t) yield self.timeouts.pop() def check_timeout(self): """Returns seconds left to earliest timeout""" if not self.timeouts: return None t = min(self.timeouts) - time.time() if t < 0: raise TimeoutException() return t def _sleep(self, t): """Sleep for t seconds or raise TimeoutException if not enough time left""" if self.check_timeout() < t: raise TimeoutException() time.sleep(t) def read(self, size): """Reads bytes with timeout exception support""" self.timeout = self.check_timeout() buf = super(RDAClient, self).read(size) if len(buf) != size: raise TimeoutException() return buf def read_unescape(self, l): """Reads and unescapes data""" s = bytearray(self.read(l)) while True: read_esc_count = s.count(b'\\') + l - len(s) if read_esc_count == 0: break s.append(self.read(read_esc_count)) s = s.replace(b'\\\xee', b'\x11') s = s.replace(b'\\\xec', b'\x13') s = s.replace(b'\\\xa3', b'\\') return s def write_escape(self, data): """Escapes and writes data to device""" data = data.replace(b'\\', b'\\\xa3') \ .replace(b'\x11', b'\\\xee') \ .replace(b'\x13', b'\\\xec') self.write(data) def calculate_checksum(self, data): """Calculates XOR-based checksum""" checksum = 0 for b in data: checksum = checksum ^ b return checksum _cnt = 1 def read_value(self, cmd, addr, raw=False): """Reads single value (either "reg" or dword from memory address)""" self._cnt = (self._cnt % 255) + 1 self.write_packet(255, struct.pack('