diff --git a/fwupd.py b/fwupd.py index e100924..87ab7f5 100644 --- a/fwupd.py +++ b/fwupd.py @@ -14,137 +14,161 @@ from py9b.command.update import * PING_RETRIES = 20 + def checksum(s, data): - for c in data: - s += ord(c) - return (s & 0xFFFFFFFF) + for c in data: + s += ord(c) + return s & 0xFFFFFFFF def UpdateFirmware(link, tran, dev, fwfile): - fwfile.seek(0, os.SEEK_END) - fw_size = fwfile.tell() - fwfile.seek(0) - fw_page_size = 0x80 + fwfile.seek(0, os.SEEK_END) + fw_size = fwfile.tell() + fwfile.seek(0) + fw_page_size = 0x80 - print('Pinging...', end='') - for retry in range(PING_RETRIES): - print('.', end='') - try: - if dev==BT.BLE: - tran.execute(ReadRegs(dev, 0, '13s')) - else: - tran.execute(ReadRegs(dev, 0x10, '14s')) - except LinkTimeoutException: - continue - break - else: - print('Timed out !') - return False - print('OK') + print("Pinging...", end="") + for retry in range(PING_RETRIES): + print(".", end="") + try: + if dev == BT.BLE: + tran.execute(ReadRegs(dev, 0, "13s")) + else: + tran.execute(ReadRegs(dev, 0x10, "14s")) + except LinkTimeoutException: + continue + break + else: + print("Timed out !") + return False + print("OK") - if args.interface!='blefleet': - print('Locking...') - tran.execute(WriteRegs(BT.ESC, 0x70, '= 0x81: - print('Connected, fetching keys...') + print("Connected, fetching keys...") keys = link.fetch_keys() tran.keys = keys - print('keys:', keys) + print("keys:", keys) # Recover longer keystream req = PKT(src=BT.HOST, dst=BT.BMS, cmd=0x01, arg=0x50, data=bytearray([0x20])) tran.send(req) resp = tran.recv() tran.keys += resp.data[9:] - print('Got %d bytes of keystream' % (len(tran.keys),)) + print("Got %d bytes of keystream" % (len(tran.keys),)) data = tran.execute(ReadRegs(BT.ESC, 0x68, "', hexlify(data).upper()) - size = len(data) - ofs = 0 - while size: - chunk_sz = min(size, _write_chunk_size) - self._dev.char_write_handle(self._wr_handle, bytearray(data[ofs:ofs+chunk_sz])) - ofs += chunk_sz - size -= chunk_sz - - -__all__ = ['BLELink'] diff --git a/py9b/link/3/blefleet.py b/py9b/link/3/blefleet.py deleted file mode 100644 index 4d14fd4..0000000 --- a/py9b/link/3/blefleet.py +++ /dev/null @@ -1,109 +0,0 @@ -"""BLE link using BlueGiga adapter via PyGatt/BGAPI""" - - -import pygatt -from .base import BaseLink, LinkTimeoutException, LinkOpenException -from binascii import hexlify - -SCAN_TIMEOUT = 3 - - -try: - import queue -except ImportError: - import queue as queue - -class Fifo(): - def __init__(self): - self.q = queue.Queue() - - def write(self, data): # put bytes - for b in data: - self.q.put(b) - - def read(self, size=1, timeout=None): # but read string - res = '' - for i in range(size): - res += chr(self.q.get(True, timeout)) - return res - - -#_cccd_uuid = '00002902-0000-1000-8000-00805f9b34fb' -_rx_char_uuid = eval(input('RX UUID?')) -_tx_char_uuid = eval(input('TX UUID?')) -_write_chunk_size = 20 # as in android dumps - -class BLELink(BaseLink): - def __init__(self, *args, **kwargs): - super(BLELink, self).__init__(*args, **kwargs) - self._adapter = None - self._dev = None - self._wr_handle = None - self._rx_fifo = Fifo() - - - def __enter__(self): - self._adapter = pygatt.BGAPIBackend() - self._adapter.start() - return self - - - def __exit__(self, exc_type, exc_value, traceback): - self.close() - - - def _make_rx_cb(self): # this is a closure :) - def rx_cb(handle, value): - self._rx_fifo.write(value) - return rx_cb - - - def scan(self): - res = [] - devices = self._adapter.scan(timeout=SCAN_TIMEOUT) - for dev in devices: - if dev['name'].startswith(('MISc', 'NBSc')): - res.append((dev['name'], dev['address'])) - return res - - - def open(self, port): - try: - self._dev = self._adapter.connect(port, address_type=pygatt.BLEAddressType.random) - self._dev.subscribe(_tx_char_uuid, callback=self._make_rx_cb()) - self._wr_handle = self._dev.get_handle(_rx_char_uuid) - except pygatt.exceptions.NotConnectedError: - raise LinkOpenException - - - def close(self): - if self._dev: - self._dev.disconnect() - self._dev = None - if self._adapter: - self._adapter.stop() - - - def read(self, size): - try: - data = self._rx_fifo.read(size, timeout=self.timeout) - except queue.Empty: - raise LinkTimeoutException - if self.dump: - print('<', hexlify(data).upper()) - return data - - - def write(self, data): - if self.dump: - print('>', hexlify(data).upper()) - size = len(data) - ofs = 0 - while size: - chunk_sz = min(size, _write_chunk_size) - self._dev.char_write_handle(self._wr_handle, bytearray(data[ofs:ofs+chunk_sz])) - ofs += chunk_sz - size -= chunk_sz - - -__all__ = ['BLELink'] diff --git a/py9b/link/3/droidble.py b/py9b/link/3/droidble.py deleted file mode 100644 index a3399b7..0000000 --- a/py9b/link/3/droidble.py +++ /dev/null @@ -1,243 +0,0 @@ -"""BLE link using ABLE""" - - -try: - from able import GATT_SUCCESS, Advertisement, BluetoothDispatcher -except ImportError: - exit('error importing able') -try: - from .base import BaseLink, LinkTimeoutException, LinkOpenException -except ImportError: - exit('error importing .base') -from binascii import hexlify -from kivy.logger import Logger -from kivy.properties import StringProperty - -try: - import queue -except ImportError: - import queue as queue - -SCAN_TIMEOUT = 3 - -_write_chunk_size = 20 - -identity = bytearray([ -0x4e, 0x42, 0x21, 0x00, 0x00, 0x00, 0x00, 0xDE, # Ninebot Bluetooth ID 4E422100000000DE -0x4e, 0x42, 0x21, 0x00, 0x00, 0x00, 0x00, 0xDF # Xiaomi Bluetooth ID 4E422100000000DF -]) - -service_ids = { -'retail': '6e400001-b5a3-f393-e0a9-e50e24dcca9e' #service UUID -} - -receive_ids = { -'retail': '6e400002-b5a3-f393-e0a9-e50e24dcca9e' #receive characteristic UUID -} - -transmit_ids = { -'retail': '6e400003-b5a3-f393-e0a9-e50e24dcca9e' #transmit characteristic UUID -} - -scoot_found = False - - -class Fifo(): - def __init__(self): - self.q = queue.Queue() - - def write(self, data): # put bytes - for b in data: - self.q.put(b) - - def read(self, size=1, timeout=None): # but read string - res = '' - for i in range(size): - res += chr(self.q.get(True, timeout)) - return res - - -class ScootBT(BluetoothDispatcher): - def __init__(self): - super(ScootBT, self).__init__() - self.rx_fifo = Fifo() - self.ble_device = None - self.state = StringProperty() - self.dump = True - self.tx_characteristic = None - self.rx_characteristic = None - self.timeout = SCAN_TIMEOUT - self.set_queue_timeout(self.timeout) - - - def __enter__(self): - return self - - - def __exit__(self, exc_type, exc_value, traceback): - self.close() - - - def discover(self): - self.start_scan() - self.state = 'scan' - print((self.state)) - - - def on_device(self, device, rssi, advertisement): - global scoot_found - if self.state != 'scan': - return - Logger.debug("on_device event {}".format(list(advertisement))) - self.addr = device.getAddress() - if self.addr and address.startswith(self.addr): - print((self.addr)) - self.ble_device = device - self.scoot_found = True - self.stop_scan() - else: - for ad in advertisement: - print(ad) - if ad.ad_type == Advertisement.ad_types.manufacturer_specific_data: - if ad.data.startswith(self.identity): - scoot_found = True - else: - break - elif ad.ad_type == Advertisement.ad_types.complete_local_name: - name = str(ad.data) - if scoot_found: - self.state = 'found' - print((self.state)) - self.ble_device = device - Logger.debug("Scooter detected: {}".format(name)) - self.stop_scan() - - - def on_scan_completed(self): - if self.ble_device: - self.connect_gatt(self.ble_device) - self.state = 'connected' - print((self.state)) - else: - self.start_scan() - - - def on_connection_state_change(self, status, state): - if status == GATT_SUCCESS and state: - self.discover_services() - self.state = 'discover' - print((self.state)) - else: - self.close_gatt() - self.rx_characteristic = None - self.tx_characteristic = None - self.services = None - - - def on_services(self, status, services): - self.services = services - for uuid in list(receive_ids.values()): - self.rx_characteristic = self.services.search(uuid) - print(('RX: '+uuid)) - for uuid in list(transmit_ids.values()): - self.tx_characteristic = self.services.search(uuid) - print(('TX: '+uuid)) - self.enable_notifications(self.tx_characteristic) - - - def on_characteristic_changed(self, characteristic): - if characteristic == self.tx_characteristic: - data = characteristic.getValue() - self.rx_fifo.write(data) - - - def open(self, port): - self.addr = port - if self.ble_device == None: - self.discover() - if self.state!='connected': - self.connect_gatt(self.ble_device) - else: - return - - - def close(self): - if self.ble_device != None: - self.close_gatt() - self.services = None - print('close') - - - def read(self, size): - print('read') - if self.ble_device: - try: - data = self.rx_fifo.read(size, timeout=self.timeout) - except queue.Empty: - raise LinkTimeoutException - if self.dump: - print('<', hexlify(data).upper()) - return data - else: - print('BLE not connected') - self.discover() - - - def write(self, data): - print('write') - if self.ble_device: - if self.dump: - print('>', hexlify(data).upper()) - size = len(data) - ofs = 0 - while size: - chunk_sz = min(size, _write_chunk_size) - self.write_characteristic(self.rx_characteristic, bytearray(data[ofs:ofs+chunk_sz])) - ofs += chunk_sz - size -= chunk_sz - else: - print('BLE not connected') - self.discover() - - - def scan(self): - self.discover() - - -class BLELink(BaseLink): - def __init__(self, *args, **kwargs): - super(BLELink, self).__init__(*args, **kwargs) - self._adapter = None - - - def __enter__(self): - self._adapter = ScootBT() - self._adapter.discover() - return self - - - def __exit__(self, exc_type, exc_value, traceback): - self.close() - - - def scan(self): - devices = self._adapter.scan() - - - def open(self, port): - self._adapter.open(port) - - - def close(self): - self._adapter.close() - - - def read(self, size): - self._adapter.read(size) - - - def write(self, data): - self._adapter.write(data) - - -__all__ = ['BLELink'] diff --git a/py9b/link/3/serial.py b/py9b/link/3/serial.py deleted file mode 100644 index 84447a7..0000000 --- a/py9b/link/3/serial.py +++ /dev/null @@ -1,61 +0,0 @@ -"""Direct serial link""" - - -import serial -import serial.tools.list_ports as lp -from binascii import hexlify -from .base import BaseLink, LinkTimeoutException, LinkOpenException - - -class SerialLink(BaseLink): - def __init__(self, *args, **kwargs): - super(SerialLink, self).__init__(*args, **kwargs) - self.com = None - - - def __enter__(self): - return self - - - def __exit__(self, exc_type, exc_value, traceback): - self.close() - - - def scan(self): - ports = lp.comports() - res = [("%s %04X:%04X" % (port.device, port.vid, port.pid), port.device) for port in ports] - return res - - - def open(self, port): - try: - self.com = serial.Serial(port, 115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=self.timeout) - except serial.SerialException: - raise LinkOpenException - - - def close(self): - if self.com: - self.com.close() - self.com = None - - - def read(self, size): - try: - data = self.com.read(size) - except serial.SerialTimeoutException: - raise LinkTimeoutException - if len(data)", hexlify(data).upper()) - self.com.write(data) - - -__all__ = ["SerialLink"] diff --git a/py9b/link/3/tcp.py b/py9b/link/3/tcp.py deleted file mode 100644 index 8e0957d..0000000 --- a/py9b/link/3/tcp.py +++ /dev/null @@ -1,80 +0,0 @@ -"""TCP-BLE bridge link""" - -import socket -from binascii import hexlify -from .base import BaseLink, LinkTimeoutException, LinkOpenException - -HOST, PORT = "127.0.0.1", 6000 - -_write_chunk_size = 20 # 20 as in android dumps - -def recvall(sock, size): - data = "" - while len(data)", hexlify(data).upper()) - size = len(data) - ofs = 0 - while size: - chunk_sz = min(size, _write_chunk_size) - self.sock.sendall(data[ofs:ofs+chunk_sz]) - ofs += chunk_sz - size -= chunk_sz - - -__all__ = ["TCPLink"] diff --git a/py9b/link/base.py b/py9b/link/base.py index 2addfae..316b194 100644 --- a/py9b/link/base.py +++ b/py9b/link/base.py @@ -1,34 +1,32 @@ class LinkTimeoutException(Exception): - pass + pass + class LinkOpenException(Exception): - pass + pass + class BaseLink(object): - DEF_TIMEOUT = 1 + DEF_TIMEOUT = 1 - def __init__(self, timeout=DEF_TIMEOUT, dump=False): - self.dump = dump - self.timeout = timeout - + def __init__(self, timeout=DEF_TIMEOUT, dump=False): + self.dump = dump + self.timeout = timeout - def scan(self): - raise NotImplementedError() + def scan(self): + raise NotImplementedError() + def open(self, port): + raise NotImplementedError() - def open(self, port): - raise NotImplementedError() + def close(self): + pass - - def close(self): - pass + def read(self, size): + raise NotImplementedError() + def write(self, data): + raise NotImplementedError() - def read(self, size): - raise NotImplementedError() - - def write(self, data): - raise NotImplementedError() - __all__ = ["LinkTimeoutException", "LinkOpenException", "BaseLink"] diff --git a/py9b/link/ble.py b/py9b/link/ble.py index 58b9fac..8c250c2 100644 --- a/py9b/link/ble.py +++ b/py9b/link/ble.py @@ -13,26 +13,28 @@ try: except ImportError: import Queue as queue -class Fifo(): + +class Fifo: def __init__(self): self.q = queue.Queue() - def write(self, data): # put bytes + def write(self, data): # put bytes for b in data: self.q.put(b) - def read(self, size=1, timeout=None): # but read string - res = '' + def read(self, size=1, timeout=None): # but read string + res = "" for i in xrange(size): res += chr(self.q.get(True, timeout)) return res -#_cccd_uuid = '00002902-0000-1000-8000-00805f9b34fb' -_rx_char_uuid = '6e400002-b5a3-f393-e0a9-e50e24dcca9e' -_tx_char_uuid = '6e400003-b5a3-f393-e0a9-e50e24dcca9e' +# _cccd_uuid = '00002902-0000-1000-8000-00805f9b34fb' +_rx_char_uuid = "6e400002-b5a3-f393-e0a9-e50e24dcca9e" +_tx_char_uuid = "6e400003-b5a3-f393-e0a9-e50e24dcca9e" + +_write_chunk_size = 20 # as in android dumps -_write_chunk_size = 20 # as in android dumps class BLELink(BaseLink): def __init__(self, *args, **kwargs): @@ -42,42 +44,41 @@ class BLELink(BaseLink): self._wr_handle = None self._rx_fifo = Fifo() - def __enter__(self): self._adapter = pygatt.GATTToolBackend() self._adapter.start() return self - def __exit__(self, exc_type, exc_value, traceback): self.close() - - def _make_rx_cb(self): # this is a closure :) + def _make_rx_cb(self): # this is a closure :) def rx_cb(handle, value): self._rx_fifo.write(value) - return rx_cb + return rx_cb def scan(self): res = [] self._adapter.reset() devices = self._adapter.scan(timeout=SCAN_TIMEOUT) for dev in devices: - if dev['name'] and dev['name'].startswith((u'MISc', u'NBSc', u'JP2', u'Seg')): - res.append((dev['name'], dev['address'])) + if dev["name"] and dev["name"].startswith( + (u"MISc", u"NBSc", u"JP2", u"Seg") + ): + res.append((dev["name"], dev["address"])) return res - def open(self, port): try: - self._dev = self._adapter.connect(port, address_type=pygatt.BLEAddressType.random) + self._dev = self._adapter.connect( + port, address_type=pygatt.BLEAddressType.random + ) self._dev.subscribe(_tx_char_uuid, callback=self._make_rx_cb()) self._wr_handle = self._dev.get_handle(_rx_char_uuid) except pygatt.exceptions.NotConnectedError: raise LinkOpenException - def close(self): if self._dev: self._dev.disconnect() @@ -85,27 +86,27 @@ class BLELink(BaseLink): if self._adapter: self._adapter.stop() - def read(self, size): try: data = self._rx_fifo.read(size, timeout=self.timeout) except queue.Empty: raise LinkTimeoutException if self.dump: - print('<', hexlify(data).upper()) + print("<", hexlify(data).upper()) return data - def write(self, data): if self.dump: - print('>', hexlify(data).upper()) + print(">", hexlify(data).upper()) size = len(data) ofs = 0 while size: chunk_sz = min(size, _write_chunk_size) - self._dev.char_write_handle(self._wr_handle, bytearray(data[ofs:ofs+chunk_sz])) + self._dev.char_write_handle( + self._wr_handle, bytearray(data[ofs : ofs + chunk_sz]) + ) ofs += chunk_sz size -= chunk_sz -__all__ = ['BLELink'] +__all__ = ["BLELink"] diff --git a/py9b/link/bleak.py b/py9b/link/bleak.py index 984dbfe..2d6cfda 100644 --- a/py9b/link/bleak.py +++ b/py9b/link/bleak.py @@ -4,24 +4,25 @@ from bleak import discover, BleakClient from py9b.link.base import BaseLink from threading import Thread -_rx_char_uuid = '6e400002-b5a3-f393-e0a9-e50e24dcca9e' -_tx_char_uuid = '6e400003-b5a3-f393-e0a9-e50e24dcca9e' -_keys_char_uuid = '00000014-0000-1000-8000-00805f9b34fb' +_rx_char_uuid = "6e400002-b5a3-f393-e0a9-e50e24dcca9e" +_tx_char_uuid = "6e400003-b5a3-f393-e0a9-e50e24dcca9e" +_keys_char_uuid = "00000014-0000-1000-8000-00805f9b34fb" try: import queue except ImportError: import Queue as queue -class Fifo(): + +class Fifo: def __init__(self): self.q = queue.Queue() - def write(self, data): # put bytes + def write(self, data): # put bytes for b in data: self.q.put(b) - def read(self, size=1, timeout=None): # but read string + def read(self, size=1, timeout=None): # but read string res = bytearray() for i in range(size): res.append(self.q.get(True, timeout)) @@ -29,12 +30,13 @@ class Fifo(): def run_worker(loop): - print('Starting event loop', loop) + print("Starting event loop", loop) asyncio.set_event_loop(loop) loop.run_forever() + class BleakLink(BaseLink): - def __init__(self, device='hci0', loop=None, *args, **kwargs): + def __init__(self, device="hci0", loop=None, *args, **kwargs): self.device = device self.timeout = 5 self.loop = loop or asyncio.get_event_loop() @@ -59,11 +61,19 @@ class BleakLink(BaseLink): self.close() def close(self): - asyncio.run_coroutine_threadsafe(self._client.disconnect(), self.loop).result(10) + asyncio.run_coroutine_threadsafe(self._client.disconnect(), self.loop).result( + 10 + ) def scan(self, timeout=1): - future = asyncio.run_coroutine_threadsafe(discover(timeout=timeout, device=self.device), self.loop) - return [(dev.name, dev.address) for dev in future.result(timeout*2) if dev.name.startswith(('MISc', 'NBSc'))] + future = asyncio.run_coroutine_threadsafe( + discover(timeout=timeout, device=self.device), self.loop + ) + return [ + (dev.name, dev.address) + for dev in future.result(timeout * 2) + if dev.name.startswith(("MISc", "NBSc")) + ] def open(self, port): fut = asyncio.run_coroutine_threadsafe(self._connect(port), self.loop) @@ -72,17 +82,20 @@ class BleakLink(BaseLink): async def _connect(self, port): self._client = BleakClient(port[1], device=self.device) await self._client.connect() - print('connected') + print("connected") await self._client.start_notify(_tx_char_uuid, self._data_received) - print('services:', list(await self._client.get_services())) + print("services:", list(await self._client.get_services())) def _data_received(self, sender, data): - print('<<', ' '.join(map(lambda b: '%02x' % b, data))) + print("<<", " ".join(map(lambda b: "%02x" % b, data))) self._rx_fifo.write(data) def write(self, data): - print('>>', ' '.join(map(lambda b: '%02x' % b, data))) - fut = asyncio.run_coroutine_threadsafe(self._client.write_gatt_char(_rx_char_uuid, bytearray(data), False), self.loop) + print(">>", " ".join(map(lambda b: "%02x" % b, data))) + fut = asyncio.run_coroutine_threadsafe( + self._client.write_gatt_char(_rx_char_uuid, bytearray(data), False), + self.loop, + ) return fut.result(3) def read(self, size): @@ -93,4 +106,6 @@ class BleakLink(BaseLink): return data def fetch_keys(self): - return asyncio.run_coroutine_threadsafe(self._client.read_gatt_char(_keys_char_uuid), self.loop).result(5) + return asyncio.run_coroutine_threadsafe( + self._client.read_gatt_char(_keys_char_uuid), self.loop + ).result(5) diff --git a/py9b/link/blefleet.py b/py9b/link/blefleet.py index 7a0a582..50706dd 100644 --- a/py9b/link/blefleet.py +++ b/py9b/link/blefleet.py @@ -9,101 +9,100 @@ SCAN_TIMEOUT = 3 try: - import queue + import queue except ImportError: - import Queue as queue - -class Fifo(): - def __init__(self): - self.q = queue.Queue() - - def write(self, data): # put bytes - for b in data: - self.q.put(b) - - def read(self, size=1, timeout=None): # but read string - res = '' - for i in xrange(size): - res += chr(self.q.get(True, timeout)) - return res + import Queue as queue -#_cccd_uuid = '00002902-0000-1000-8000-00805f9b34fb' -_rx_char_uuid = input('RX UUID?') -_tx_char_uuid = input('TX UUID?') -_write_chunk_size = 20 # as in android dumps +class Fifo: + def __init__(self): + self.q = queue.Queue() + + def write(self, data): # put bytes + for b in data: + self.q.put(b) + + def read(self, size=1, timeout=None): # but read string + res = "" + for i in xrange(size): + res += chr(self.q.get(True, timeout)) + return res + + +# _cccd_uuid = '00002902-0000-1000-8000-00805f9b34fb' +_rx_char_uuid = input("RX UUID?") +_tx_char_uuid = input("TX UUID?") +_write_chunk_size = 20 # as in android dumps + class BLELink(BaseLink): - def __init__(self, *args, **kwargs): - super(BLELink, self).__init__(*args, **kwargs) - self._adapter = None - self._dev = None - self._wr_handle = None - self._rx_fifo = Fifo() + def __init__(self, *args, **kwargs): + super(BLELink, self).__init__(*args, **kwargs) + self._adapter = None + self._dev = None + self._wr_handle = None + self._rx_fifo = Fifo() + + def __enter__(self): + self._adapter = pygatt.BGAPIBackend() + self._adapter.start() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + def _make_rx_cb(self): # this is a closure :) + def rx_cb(handle, value): + self._rx_fifo.write(value) + + return rx_cb + + def scan(self): + res = [] + devices = self._adapter.scan(timeout=SCAN_TIMEOUT) + for dev in devices: + if dev["name"].startswith((u"MISc", u"NBSc")): + res.append((dev["name"], dev["address"])) + return res + + def open(self, port): + try: + self._dev = self._adapter.connect( + port, address_type=pygatt.BLEAddressType.random + ) + self._dev.subscribe(_tx_char_uuid, callback=self._make_rx_cb()) + self._wr_handle = self._dev.get_handle(_rx_char_uuid) + except pygatt.exceptions.NotConnectedError: + raise LinkOpenException + + def close(self): + if self._dev: + self._dev.disconnect() + self._dev = None + if self._adapter: + self._adapter.stop() + + def read(self, size): + try: + data = self._rx_fifo.read(size, timeout=self.timeout) + except queue.Empty: + raise LinkTimeoutException + if self.dump: + print("<", hexlify(data).upper()) + return data + + def write(self, data): + if self.dump: + print(">", hexlify(data).upper()) + size = len(data) + ofs = 0 + while size: + chunk_sz = min(size, _write_chunk_size) + self._dev.char_write_handle( + self._wr_handle, bytearray(data[ofs : ofs + chunk_sz]) + ) + ofs += chunk_sz + size -= chunk_sz - def __enter__(self): - self._adapter = pygatt.BGAPIBackend() - self._adapter.start() - return self - - - def __exit__(self, exc_type, exc_value, traceback): - self.close() - - - def _make_rx_cb(self): # this is a closure :) - def rx_cb(handle, value): - self._rx_fifo.write(value) - return rx_cb - - - def scan(self): - res = [] - devices = self._adapter.scan(timeout=SCAN_TIMEOUT) - for dev in devices: - if dev['name'].startswith((u'MISc', u'NBSc')): - res.append((dev['name'], dev['address'])) - return res - - - def open(self, port): - try: - self._dev = self._adapter.connect(port, address_type=pygatt.BLEAddressType.random) - self._dev.subscribe(_tx_char_uuid, callback=self._make_rx_cb()) - self._wr_handle = self._dev.get_handle(_rx_char_uuid) - except pygatt.exceptions.NotConnectedError: - raise LinkOpenException - - - def close(self): - if self._dev: - self._dev.disconnect() - self._dev = None - if self._adapter: - self._adapter.stop() - - - def read(self, size): - try: - data = self._rx_fifo.read(size, timeout=self.timeout) - except queue.Empty: - raise LinkTimeoutException - if self.dump: - print '<', hexlify(data).upper() - return data - - - def write(self, data): - if self.dump: - print '>', hexlify(data).upper() - size = len(data) - ofs = 0 - while size: - chunk_sz = min(size, _write_chunk_size) - self._dev.char_write_handle(self._wr_handle, bytearray(data[ofs:ofs+chunk_sz])) - ofs += chunk_sz - size -= chunk_sz - - -__all__ = ['BLELink'] +__all__ = ["BLELink"] diff --git a/py9b/link/droidble.py b/py9b/link/droidble.py index 6cfecb4..f40e6ff 100644 --- a/py9b/link/droidble.py +++ b/py9b/link/droidble.py @@ -1,14 +1,15 @@ """BLE link using ABLE""" from __future__ import absolute_import + try: from able import GATT_SUCCESS, Advertisement, BluetoothDispatcher except ImportError: - exit('error importing able') + exit("error importing able") try: from .base import BaseLink, LinkTimeoutException, LinkOpenException except ImportError: - exit('error importing .base') + exit("error importing .base") from binascii import hexlify from kivy.logger import Logger from kivy.properties import StringProperty @@ -22,36 +23,50 @@ SCAN_TIMEOUT = 3 _write_chunk_size = 20 -identity = bytearray([ -0x4e, 0x42, 0x21, 0x00, 0x00, 0x00, 0x00, 0xDE, # Ninebot Bluetooth ID 4E422100000000DE -0x4e, 0x42, 0x21, 0x00, 0x00, 0x00, 0x00, 0xDF # Xiaomi Bluetooth ID 4E422100000000DF -]) +identity = bytearray( + [ + 0x4E, + 0x42, + 0x21, + 0x00, + 0x00, + 0x00, + 0x00, + 0xDE, # Ninebot Bluetooth ID 4E422100000000DE + 0x4E, + 0x42, + 0x21, + 0x00, + 0x00, + 0x00, + 0x00, + 0xDF, # Xiaomi Bluetooth ID 4E422100000000DF + ] +) -service_ids = { -'retail': '6e400001-b5a3-f393-e0a9-e50e24dcca9e' #service UUID -} +service_ids = {"retail": "6e400001-b5a3-f393-e0a9-e50e24dcca9e"} # service UUID receive_ids = { -'retail': '6e400002-b5a3-f393-e0a9-e50e24dcca9e' #receive characteristic UUID + "retail": "6e400002-b5a3-f393-e0a9-e50e24dcca9e" # receive characteristic UUID } transmit_ids = { -'retail': '6e400003-b5a3-f393-e0a9-e50e24dcca9e' #transmit characteristic UUID + "retail": "6e400003-b5a3-f393-e0a9-e50e24dcca9e" # transmit characteristic UUID } scoot_found = False -class Fifo(): +class Fifo: def __init__(self): self.q = queue.Queue() - def write(self, data): # put bytes + def write(self, data): # put bytes for b in data: self.q.put(b) - def read(self, size=1, timeout=None): # but read string - res = '' + def read(self, size=1, timeout=None): # but read string + res = "" for i in xrange(size): res += chr(self.q.get(True, timeout)) return res @@ -69,24 +84,20 @@ class ScootBT(BluetoothDispatcher): self.timeout = SCAN_TIMEOUT self.set_queue_timeout(self.timeout) - def __enter__(self): return self - def __exit__(self, exc_type, exc_value, traceback): self.close() - def discover(self): self.start_scan() - self.state = 'scan' + self.state = "scan" print(self.state) - def on_device(self, device, rssi, advertisement): global scoot_found - if self.state != 'scan': + if self.state != "scan": return Logger.debug("on_device event {}".format(list(advertisement))) self.addr = device.getAddress() @@ -106,26 +117,24 @@ class ScootBT(BluetoothDispatcher): elif ad.ad_type == Advertisement.ad_types.complete_local_name: name = str(ad.data) if scoot_found: - self.state = 'found' + self.state = "found" print(self.state) self.ble_device = device Logger.debug("Scooter detected: {}".format(name)) self.stop_scan() - def on_scan_completed(self): if self.ble_device: self.connect_gatt(self.ble_device) - self.state = 'connected' + self.state = "connected" print(self.state) else: self.start_scan() - def on_connection_state_change(self, status, state): if status == GATT_SUCCESS and state: self.discover_services() - self.state = 'discover' + self.state = "discover" print(self.state) else: self.close_gatt() @@ -133,73 +142,68 @@ class ScootBT(BluetoothDispatcher): self.tx_characteristic = None self.services = None - def on_services(self, status, services): self.services = services for uuid in receive_ids.values(): self.rx_characteristic = self.services.search(uuid) - print('RX: '+uuid) + print("RX: " + uuid) for uuid in transmit_ids.values(): self.tx_characteristic = self.services.search(uuid) - print('TX: '+uuid) + print("TX: " + uuid) self.enable_notifications(self.tx_characteristic) - def on_characteristic_changed(self, characteristic): if characteristic == self.tx_characteristic: data = characteristic.getValue() self.rx_fifo.write(data) - def open(self, port): self.addr = port if self.ble_device == None: self.discover() - if self.state!='connected': + if self.state != "connected": self.connect_gatt(self.ble_device) else: return - def close(self): if self.ble_device != None: self.close_gatt() self.services = None - print('close') - + print("close") def read(self, size): - print('read') + print("read") if self.ble_device: try: data = self.rx_fifo.read(size, timeout=self.timeout) except queue.Empty: raise LinkTimeoutException if self.dump: - print '<', hexlify(data).upper() + print("<", hexlify(data).upper()) return data else: - print('BLE not connected') + print("BLE not connected") self.discover() - def write(self, data): - print('write') + print("write") if self.ble_device: if self.dump: - print '>', hexlify(data).upper() + print(">", hexlify(data).upper()) size = len(data) ofs = 0 while size: chunk_sz = min(size, _write_chunk_size) - self.write_characteristic(self.rx_characteristic, bytearray(data[ofs:ofs+chunk_sz])) + self.write_characteristic( + self.rx_characteristic, bytearray(data[ofs : ofs + chunk_sz]) + ) ofs += chunk_sz size -= chunk_sz else: - print('BLE not connected') + print("BLE not connected") self.discover() - def scan(self): self.discover() @@ -209,35 +213,28 @@ class BLELink(BaseLink): super(BLELink, self).__init__(*args, **kwargs) self._adapter = None - def __enter__(self): self._adapter = ScootBT() self._adapter.discover() return self - def __exit__(self, exc_type, exc_value, traceback): self.close() - def scan(self): devices = self._adapter.scan() - def open(self, port): self._adapter.open(port) - def close(self): self._adapter.close() - def read(self, size): self._adapter.read(size) - def write(self, data): self._adapter.write(data) -__all__ = ['BLELink'] +__all__ = ["BLELink"] diff --git a/py9b/link/serial.py b/py9b/link/serial.py index 7b6997a..79e1f38 100644 --- a/py9b/link/serial.py +++ b/py9b/link/serial.py @@ -5,57 +5,59 @@ import serial import serial.tools.list_ports as lp from binascii import hexlify from .base import BaseLink, LinkTimeoutException, LinkOpenException - - -class SerialLink(BaseLink): - def __init__(self, *args, **kwargs): - super(SerialLink, self).__init__(*args, **kwargs) - self.com = None - - - def __enter__(self): - return self - - - def __exit__(self, exc_type, exc_value, traceback): - self.close() - - - def scan(self): - ports = lp.comports() - res = [("%s %04X:%04X" % (port.device, port.vid, port.pid), port.device) for port in ports] - return res - - - def open(self, port): - try: - self.com = serial.Serial(port, 115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=self.timeout) - except serial.SerialException: - raise LinkOpenException - - - def close(self): - if self.com: - self.com.close() - self.com = None - - - def read(self, size): - try: - data = self.com.read(size) - except serial.SerialTimeoutException: - raise LinkTimeoutException - if len(data)", hexlify(data).upper()) - self.com.write(data) - - -__all__ = ["SerialLink"] + + +class SerialLink(BaseLink): + def __init__(self, *args, **kwargs): + super(SerialLink, self).__init__(*args, **kwargs) + self.com = None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + def scan(self): + ports = lp.comports() + res = [ + ("%s %04X:%04X" % (port.device, port.vid, port.pid), port.device) + for port in ports + ] + return res + + def open(self, port): + try: + self.com = serial.Serial( + port, + 115200, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + timeout=self.timeout, + ) + except serial.SerialException: + raise LinkOpenException + + def close(self): + if self.com: + self.com.close() + self.com = None + + def read(self, size): + try: + data = self.com.read(size) + except serial.SerialTimeoutException: + raise LinkTimeoutException + if len(data) < size: + raise LinkTimeoutException + if self.dump: + print("<", hexlify(data).upper()) + return data + + def write(self, data): + if self.dump: + print(">", hexlify(data).upper()) + self.com.write(data) + + +__all__ = ["SerialLink"] diff --git a/py9b/link/tcp.py b/py9b/link/tcp.py index ab87f33..cce5ca9 100644 --- a/py9b/link/tcp.py +++ b/py9b/link/tcp.py @@ -6,75 +6,69 @@ from .base import BaseLink, LinkTimeoutException, LinkOpenException HOST, PORT = "127.0.0.1", 6000 -_write_chunk_size = 20 # 20 as in android dumps +_write_chunk_size = 20 # 20 as in android dumps + def recvall(sock, size): - data = "" - while len(data)", hexlify(data).upper()) - size = len(data) - ofs = 0 - while size: - chunk_sz = min(size, _write_chunk_size) - self.sock.sendall(data[ofs:ofs+chunk_sz]) - ofs += chunk_sz - size -= chunk_sz + def write(self, data): + if self.dump: + print(">", hexlify(data).upper()) + size = len(data) + ofs = 0 + while size: + chunk_sz = min(size, _write_chunk_size) + self.sock.sendall(data[ofs : ofs + chunk_sz]) + ofs += chunk_sz + size -= chunk_sz __all__ = ["TCPLink"] diff --git a/py9b/transport/3/ninebot.py b/py9b/transport/3/ninebot.py deleted file mode 100644 index bcefc0e..0000000 --- a/py9b/transport/3/ninebot.py +++ /dev/null @@ -1,47 +0,0 @@ -"""Ninebot packet transport""" -from struct import pack, unpack -from .base import checksum, BaseTransport as BT -from .packet import BasePacket - - -class NinebotTransport(BT): - def __init__(self, link, device=BT.HOST): - super(NinebotTransport, self).__init__(link) - self.device = device - - - def _wait_pre(self): - while True: - while True: - c = self.link.read(1) - if c=="\x5A": - break - while True: - c = self.link.read(1) - if c=="\xA5": - return True - if c!="\x5A": - break # start waiting 5A again, else - this is 5A, so wait for A5 - - - def recv(self): - self._wait_pre() - pkt = self.link.read(1) - l = ord(pkt)+6 - for i in range(l): - pkt += self.link.read(1) - ck_calc = checksum(pkt[0:-2]) - ck_pkt = unpack("%s: %02X @%02X %s" % (BT.GetDeviceName(self.src), BT.GetDeviceName(self.dst), self.cmd, self.arg, hexlify(self.data).upper()) + return "%s->%s: %02X @%02X %s" % ( + BT.GetDeviceName(self.src), + BT.GetDeviceName(self.dst), + self.cmd, + self.arg, + hexlify(self.data).upper(), + ) __all__ = ["BasePacket"] diff --git a/py9b/transport/xiaomi.py b/py9b/transport/xiaomi.py index eb963ca..88affaf 100644 --- a/py9b/transport/xiaomi.py +++ b/py9b/transport/xiaomi.py @@ -14,93 +14,108 @@ class XiaomiTransport(BT): MASTER2BMS = 0x22 BMS2MASTER = 0x25 - MOTOR = 0x01 - DEVFF = 0xFF + MOTOR = 0x01 + DEVFF = 0xFF - _SaDa2Addr = { BT.HOST : { BT.MOTOR : MOTOR, BT.ESC : MASTER2ESC, BT.BLE : MASTER2BLE, BT.BMS : MASTER2BMS }, - BT.ESC : { BT.HOST : ESC2MASTER, BT.BLE : MASTER2BLE, BT.BMS : MASTER2BMS, BT.MOTOR : MOTOR }, - BT.BMS : { BT.HOST : BMS2MASTER, BT.ESC : BMS2MASTER, BT.MOTOR : MOTOR }, - BT.MOTOR : {BT.HOST : MOTOR, BT.ESC : MOTOR, BT.BMS : MOTOR } } + _SaDa2Addr = { + BT.HOST: { + BT.MOTOR: MOTOR, + BT.ESC: MASTER2ESC, + BT.BLE: MASTER2BLE, + BT.BMS: MASTER2BMS, + }, + BT.ESC: { + BT.HOST: ESC2MASTER, + BT.BLE: MASTER2BLE, + BT.BMS: MASTER2BMS, + BT.MOTOR: MOTOR, + }, + BT.BMS: {BT.HOST: BMS2MASTER, BT.ESC: BMS2MASTER, BT.MOTOR: MOTOR}, + BT.MOTOR: {BT.HOST: MOTOR, BT.ESC: MOTOR, BT.BMS: MOTOR}, + } # TBC - _BleAddr2SaDa = { MASTER2ESC : (BT.HOST, BT.ESC), - ESC2MASTER : (BT.ESC, BT.HOST), - MASTER2BMS : (BT.HOST, BT.BMS), - BMS2MASTER : (BT.BMS, BT.HOST), - MASTER2BLE : (BT.HOST, BT.BLE), - BLE2MASTER : (BT.BLE, BT.HOST), - MOTOR : (BT.MOTOR, BT.HOST) } - - _BmsAddr2SaDa = { MASTER2ESC : (BT.BMS, BT.ESC), - ESC2MASTER : (BT.ESC, BT.BMS), - MASTER2BMS : (BT.ESC, BT.BMS), - BMS2MASTER : (BT.BMS, BT.ESC), - MASTER2BLE : (BT.BMS, BT.BLE), - BLE2MASTER : (BT.BLE, BT.BMS), - MOTOR : (BT.MOTOR, BT.BMS) } + _BleAddr2SaDa = { + MASTER2ESC: (BT.HOST, BT.ESC), + ESC2MASTER: (BT.ESC, BT.HOST), + MASTER2BMS: (BT.HOST, BT.BMS), + BMS2MASTER: (BT.BMS, BT.HOST), + MASTER2BLE: (BT.HOST, BT.BLE), + BLE2MASTER: (BT.BLE, BT.HOST), + MOTOR: (BT.MOTOR, BT.HOST), + } + _BmsAddr2SaDa = { + MASTER2ESC: (BT.BMS, BT.ESC), + ESC2MASTER: (BT.ESC, BT.BMS), + MASTER2BMS: (BT.ESC, BT.BMS), + BMS2MASTER: (BT.BMS, BT.ESC), + MASTER2BLE: (BT.BMS, BT.BLE), + BLE2MASTER: (BT.BLE, BT.BMS), + MOTOR: (BT.MOTOR, BT.BMS), + } def __init__(self, link, device=BT.HOST): super(XiaomiTransport, self).__init__(link) self.device = device self.keys = None - def _make_addr(self, src, dst): return XiaomiTransport._SaDa2Addr[src][dst] - def _split_addr(self, addr): - if self.device==BT.BMS: + if self.device == BT.BMS: return XiaomiTransport._BmsAddr2SaDa[addr] else: return XiaomiTransport._BleAddr2SaDa[addr] - def _wait_pre(self): while True: while True: c = self.link.read(1) - if c == b'\x55': + if c == b"\x55": break while True: c = self.link.read(1) - if c == b'\xaa' or c == b'\xab': + if c == b"\xaa" or c == b"\xab": return c - if c != b'\x55': - break # start waiting 55 again, else - this is 55, so wait for AA - + if c != b"\x55": + break # start waiting 55 again, else - this is 55, so wait for AA def recv(self): ver = self._wait_pre() pkt = self.link.read(1) - l = ord(pkt) + 3 + (4 if ver == b'\xab' else 0) + l = ord(pkt) + 3 + (4 if ver == b"\xab" else 0) for i in range(l): pkt.extend(self.link.read(1)) ck_calc = checksum(pkt[0:-2]) ck_pkt = unpack(">1, "16s"))[0] - except LinkTimeoutException: - continue - break - else: - print "No response !" - break - hfo.write(data) + hfo = open("BmsRegs.bin", "wb") + for i in xrange(0x0, 0x100, READ_CHUNK_SIZE): + print(".") + for retry in xrange(5): + try: + data = tran.execute(ReadRegs(BT.BMS, i >> 1, "16s"))[0] + except LinkTimeoutException: + continue + break + else: + print("No response !") + break + hfo.write(data) - hfo.close() - link.close() + hfo.close() + link.close() diff --git a/read_bms_ll.py b/read_bms_ll.py index fd90349..eb8bf80 100644 --- a/read_bms_ll.py +++ b/read_bms_ll.py @@ -10,37 +10,37 @@ from py9b.transport.xiaomi import XiaomiTransport READ_CHUNK_SIZE = 0x10 link = SerialLink(dump=True) -#link = TCPLink() -#link = BLELink() +# link = TCPLink() +# link = BLELink() with link: - print "Scanning..." - ports = link.scan() - print ports + print("Scanning...") + ports = link.scan() + print(ports) - tran = XiaomiTransport(link) + tran = XiaomiTransport(link) - #link.open(("127.0.0.1", 6000)) - link.open(ports[0][1]) - print "Connected" + # link.open(("127.0.0.1", 6000)) + link.open(ports[0][1]) + print("Connected") - req = PKT(src=BT.HOST, dst=BT.BMS, cmd=0x01, arg=0, data=chr(READ_CHUNK_SIZE)) + req = PKT(src=BT.HOST, dst=BT.BMS, cmd=0x01, arg=0, data=chr(READ_CHUNK_SIZE)) - hfo = open("BmsRegs.bin", "wb") - for i in xrange(0x0, 0x100, READ_CHUNK_SIZE): - print ".", - req.arg = i>>1 - for retry in xrange(5): - tran.send(req) - try: - rsp = tran.recv() - except LinkTimeoutException: - continue - break - else: - print "No response !" - break - hfo.write(rsp.data) + hfo = open("BmsRegs.bin", "wb") + for i in xrange(0x0, 0x100, READ_CHUNK_SIZE): + print(".") + req.arg = i >> 1 + for retry in xrange(5): + tran.send(req) + try: + rsp = tran.recv() + except LinkTimeoutException: + continue + break + else: + print("No response !") + break + hfo.write(rsp.data) - hfo.close() - link.close() + hfo.close() + link.close() diff --git a/read_bms_mem.py b/read_bms_mem.py index 3429635..3b7c82e 100644 --- a/read_bms_mem.py +++ b/read_bms_mem.py @@ -13,33 +13,33 @@ SIZE = 0x800 READ_CHUNK_SIZE = 0x10 link = SerialLink(dump=True) -#link = TCPLink() -#link = BLELink() +# link = TCPLink() +# link = BLELink() with link: - print "Scanning..." - ports = link.scan() - print ports + print("Scanning...") + ports = link.scan() + print(ports) - tran = XiaomiTransport(link) + tran = XiaomiTransport(link) - #link.open(("127.0.0.1", 6000)) - link.open(ports[0][1]) - print "Connected" + # link.open(("127.0.0.1", 6000)) + link.open(ports[0][1]) + print("Connected") - hfo = open("BmsEep.bin", "wb") - for i in xrange(ADDR, ADDR+SIZE, READ_CHUNK_SIZE): - print ".", - for retry in xrange(5): - try: - data = tran.execute(ReadMem(BT.BMS, i, "16s"))[0] - except LinkTimeoutException: - continue - break - else: - print "No response !" - break - hfo.write(data) + hfo = open("BmsEep.bin", "wb") + for i in xrange(ADDR, ADDR + SIZE, READ_CHUNK_SIZE): + print(".") + for retry in xrange(5): + try: + data = tran.execute(ReadMem(BT.BMS, i, "16s"))[0] + except LinkTimeoutException: + continue + break + else: + print("No response !") + break + hfo.write(data) - hfo.close() - link.close() + hfo.close() + link.close() diff --git a/read_esc.py b/read_esc.py index f6f7ec4..97a2b94 100644 --- a/read_esc.py +++ b/read_esc.py @@ -12,34 +12,34 @@ from py9b.command.regio import ReadRegs READ_CHUNK_SIZE = 0x10 link = SerialLink() -#link = TCPLink() -#link = BLELink() +# link = TCPLink() +# link = BLELink() with link: - print "Scanning..." - ports = link.scan() - print ports + print("Scanning...") + ports = link.scan() + print(ports) - #tran = XiaomiTransport(link) - tran = NinebotTransport(link) + # tran = XiaomiTransport(link) + tran = NinebotTransport(link) - #link.open(("127.0.0.1", 6000)) - link.open(ports[0][1]) - print "Connected" + # link.open(("127.0.0.1", 6000)) + link.open(ports[0][1]) + print("Connected") - hfo = open("EscRegs.bin", "wb") - for i in xrange(0x0, 0x200, READ_CHUNK_SIZE): - print ".", - for retry in xrange(5): - try: - data = tran.execute(ReadRegs(BT.ESC, i>>1, "16s"))[0] - except LinkTimeoutException: - continue - break - else: - print "No response !" - break - hfo.write(data) + hfo = open("EscRegs.bin", "wb") + for i in xrange(0x0, 0x200, READ_CHUNK_SIZE): + print(".") + for retry in xrange(5): + try: + data = tran.execute(ReadRegs(BT.ESC, i >> 1, "16s"))[0] + except LinkTimeoutException: + continue + break + else: + print("No response !") + break + hfo.write(data) - hfo.close() - link.close() + hfo.close() + link.close() diff --git a/read_esc_ll.py b/read_esc_ll.py index 8759273..12260df 100644 --- a/read_esc_ll.py +++ b/read_esc_ll.py @@ -9,35 +9,35 @@ from py9b.transport.xiaomi import XiaomiTransport READ_CHUNK_SIZE = 0x40 -#link = SerialLink() -#link = TCPLink() +# link = SerialLink() +# link = TCPLink() link = BLELink() with link: - print "Scanning..." - ports = link.scan() - print ports + print("Scanning...") + ports = link.scan() + print(ports) - tran = XiaomiTransport(link) + tran = XiaomiTransport(link) - #link.open(("127.0.0.1", 6000)) - link.open(ports[0][1]) - print "Connected" + # link.open(("127.0.0.1", 6000)) + link.open(ports[0][1]) + print("Connected") - req = PKT(src=BT.HOST, dst=BT.ESC, cmd=0x01, arg=0, data=chr(READ_CHUNK_SIZE)) + req = PKT(src=BT.HOST, dst=BT.ESC, cmd=0x01, arg=0, data=chr(READ_CHUNK_SIZE)) - hfo = open("EscRegs.bin", "wb") - for i in xrange(0, 0x200, READ_CHUNK_SIZE): - print ".", - req.arg = i>>1 - for retry in xrange(3): - tran.send(req) - try: - rsp = tran.recv() - except LinkTimeoutException: - continue - break - hfo.write(rsp.data) + hfo = open("EscRegs.bin", "wb") + for i in xrange(0, 0x200, READ_CHUNK_SIZE): + print(".") + req.arg = i >> 1 + for retry in xrange(3): + tran.send(req) + try: + rsp = tran.recv() + except LinkTimeoutException: + continue + break + hfo.write(rsp.data) - hfo.close() - link.close() + hfo.close() + link.close() diff --git a/readregs.py b/readregs.py index f69ce2e..92b8d50 100644 --- a/readregs.py +++ b/readregs.py @@ -17,92 +17,115 @@ from py9b.command.regio import ReadRegs READ_CHUNK_SIZE = 0x10 + def ReadAllRegs(link, tran, dev, hfo): - size = 0x200 if dev==BT.ESC else 0x100 - pb = ProgressBar(maxval=size).start() - for i in xrange(0x0, size, READ_CHUNK_SIZE): - pb.update(i) - for retry in xrange(5): - try: - data = tran.execute(ReadRegs(dev, i>>1, '16s'))[0] - except LinkTimeoutException: - continue - break - else: - print('No response !') - return False - hfo.write(data) - pb.finish() - print('OK') - return True + size = 0x200 if dev == BT.ESC else 0x100 + pb = ProgressBar(maxval=size).start() + for i in xrange(0x0, size, READ_CHUNK_SIZE): + pb.update(i) + for retry in xrange(5): + try: + data = tran.execute(ReadRegs(dev, i >> 1, "16s"))[0] + except LinkTimeoutException: + continue + break + else: + print("No response !") + return False + hfo.write(data) + pb.finish() + print("OK") + return True ########################################################################################## -parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, - description='Xiaomi/Ninebot register reader', - epilog='Example 1: %(prog)s esc esc_regs.bin - read ESC regs to esc_regs.bin using default communication parameters' - '\nExample 2: %(prog)s -i tcp -a 127.0.1.10:6000 bms bms_regs.bin - flash BMS regs over TCP-BLE bridge at 127.0.1.10:6000' - '\nExample 3: %(prog)s -i serial -a COM2 esc esc_regs.bin - read ESC regs via COM2' - '\nExample 4: %(prog)s -i ble -a 12:34:56:78:9A:BC esc esc_regs.bin - read ESC regs via BLE, use specified BLE address') - -devices = {'esc' : BT.ESC, 'bms' : BT.BMS, 'extbms' : BT.EXTBMS } -parser.add_argument('device', help='device to read from', type=str.lower, choices=devices) +parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description="Xiaomi/Ninebot register reader", + epilog="Example 1: %(prog)s esc esc_regs.bin - read ESC regs to esc_regs.bin using default communication parameters" + "\nExample 2: %(prog)s -i tcp -a 127.0.1.10:6000 bms bms_regs.bin - flash BMS regs over TCP-BLE bridge at 127.0.1.10:6000" + "\nExample 3: %(prog)s -i serial -a COM2 esc esc_regs.bin - read ESC regs via COM2" + "\nExample 4: %(prog)s -i ble -a 12:34:56:78:9A:BC esc esc_regs.bin - read ESC regs via BLE, use specified BLE address", +) -parser.add_argument('file', type=argparse.FileType('wb'), help='output file') +devices = {"esc": BT.ESC, "bms": BT.BMS, "extbms": BT.EXTBMS} +parser.add_argument( + "device", help="device to read from", type=str.lower, choices=devices +) -parser.add_argument('-i', '--interface', help='communication interface, default: %(default)s', type=str.lower, - choices=('ble', 'serial', 'tcp'), default='ble') +parser.add_argument("file", type=argparse.FileType("wb"), help="output file") -parser.add_argument('-a', '--address', help='communication address (ble: BDADDR, serial: port, tcp: host:port), default: first available') +parser.add_argument( + "-i", + "--interface", + help="communication interface, default: %(default)s", + type=str.lower, + choices=("ble", "serial", "tcp"), + default="ble", +) -protocols = {'xiaomi' : XiaomiTransport, 'ninebot' : NinebotTransport } -parser.add_argument('-p', '--protocol', help='communication protocol, default: %(default)s', type=str.lower, - choices=protocols, default='xiaomi') +parser.add_argument( + "-a", + "--address", + help="communication address (ble: BDADDR, serial: port, tcp: host:port), default: first available", +) -if len(argv)==1: - parser.print_usage() - exit() +protocols = {"xiaomi": XiaomiTransport, "ninebot": NinebotTransport} +parser.add_argument( + "-p", + "--protocol", + help="communication protocol, default: %(default)s", + type=str.lower, + choices=protocols, + default="xiaomi", +) + +if len(argv) == 1: + parser.print_usage() + exit() args = parser.parse_args() -if args.device=='extbms' and args.protocol!='ninebot': - exit('Only Ninebot supports External BMS !') +if args.device == "extbms" and args.protocol != "ninebot": + exit("Only Ninebot supports External BMS !") dev = devices.get(args.device) -if args.interface=='ble': - try: - from py9b.link.ble import BLELink - except: - exit('BLE is not supported on your system !') - link = BLELink() -elif args.interface=='tcp': - from py9b.link.tcp import TCPLink - link = TCPLink() -elif args.interface=='serial': - from py9b.link.serial import SerialLink - link = SerialLink() +if args.interface == "ble": + try: + from py9b.link.ble import BLELink + except: + exit("BLE is not supported on your system !") + link = BLELink() +elif args.interface == "tcp": + from py9b.link.tcp import TCPLink + + link = TCPLink() +elif args.interface == "serial": + from py9b.link.serial import SerialLink + + link = SerialLink() else: - exit('!!! BUG !!! Unknown interface selected: '+args.interface) - + exit("!!! BUG !!! Unknown interface selected: " + args.interface) + with link: - tran = protocols.get(args.protocol)(link) + tran = protocols.get(args.protocol)(link) - if args.address: - addr = args.address - else: - print('Scanning...') - ports = link.scan() - if not ports: - exit('No interfaces found !') - print('Connecting to', ports[0][0]) - addr = ports[0][1] + if args.address: + addr = args.address + else: + print("Scanning...") + ports = link.scan() + if not ports: + exit("No interfaces found !") + print("Connecting to", ports[0][0]) + addr = ports[0][1] - link.open(addr) - print('Connected') - try: - ReadAllRegs(link, tran, dev, args.file) - args.file.close() - except Exception as e: - print('Error:', e) - raise + link.open(addr) + print("Connected") + try: + ReadAllRegs(link, tran, dev, args.file) + args.file.close() + except Exception as e: + print("Error:", e) + raise diff --git a/reboot.py b/reboot.py index 612b77f..f6c6ec7 100644 --- a/reboot.py +++ b/reboot.py @@ -1,31 +1,33 @@ #!python2-32 from py9b.link.base import LinkOpenException, LinkTimeoutException from py9b.link.tcp import TCPLink -#from py9b.link.ble import BLELink + +# from py9b.link.ble import BLELink from py9b.link.serial import SerialLink from py9b.transport.base import BaseTransport as BT from py9b.transport.packet import BasePacket as PKT -#from py9b.transport.xiaomi import XiaomiTransport + +# from py9b.transport.xiaomi import XiaomiTransport from py9b.transport.ninebot import NinebotTransport from py9b.command.regio import ReadRegs, WriteRegs -#link = SerialLink(dump=True) +# link = SerialLink(dump=True) link = TCPLink() -#link = BLELink() +# link = BLELink() with link: - print "Scanning..." - ports = link.scan() - print ports + print("Scanning...") + ports = link.scan() + print(ports) - #tran = XiaomiTransport(link) - tran = NinebotTransport(link) + # tran = XiaomiTransport(link) + tran = NinebotTransport(link) - link.open(("127.0.0.20:6000")) -# link.open(ports[0][1]) - print "Connected" + link.open(("127.0.0.20:6000")) + # link.open(ports[0][1]) + print("Connected") - print('Reboot...') - tran.execute(WriteRegs(BT.ESC, 0x78, 'ESC: TH: %02X, BR: %02X, %02X %02X" % (throttle, brake, u2, u3) - last_esc_65 = rsp.data - continue - elif len(rsp.data)==7: - if rsp.data==last_esc_64: - continue - ll, throttle, brake, u2, u3, ver = unpack("ESC: TH: %02X, BR: %02X, %02X %02X, VER: %04X" % (throttle, brake, u2, u3, ver) - last_esc_64 = rsp.data - continue - elif rsp.src==BT.HOST and rsp.dst==BT.BLE and rsp.cmd==0x64: - if len(rsp.data)==4: - if rsp.data==last_ble_64: - continue - u0, u1, u2, u3 = unpack("BLE: %02X %02X %02X %02X" % (u0, u1, u2, u3) - last_ble_64 = rsp.data - continue - print rsp - except LinkTimeoutException: - pass - except KeyboardInterrupt: - pass - - link.close() + last_esc_64 = "" + last_esc_65 = "" + last_ble_64 = "" + try: + while True: + try: + rsp = tran.recv() + if not rsp: + continue + if rsp.src == BT.HOST and rsp.dst == BT.ESC and rsp.cmd in (0x64, 0x65): + if len(rsp.data) == 5: + if rsp.data == last_esc_65: + continue + ll, throttle, brake, u2, u3 = unpack("ESC: TH: %02X, BR: %02X, %02X %02X" + % (throttle, brake, u2, u3) + ) + last_esc_65 = rsp.data + continue + elif len(rsp.data) == 7: + if rsp.data == last_esc_64: + continue + ll, throttle, brake, u2, u3, ver = unpack("ESC: TH: %02X, BR: %02X, %02X %02X, VER: %04X" + % (throttle, brake, u2, u3, ver) + ) + last_esc_64 = rsp.data + continue + elif rsp.src == BT.HOST and rsp.dst == BT.BLE and rsp.cmd == 0x64: + if len(rsp.data) == 4: + if rsp.data == last_ble_64: + continue + u0, u1, u2, u3 = unpack("BLE: %02X %02X %02X %02X" % (u0, u1, u2, u3)) + last_ble_64 = rsp.data + continue + print(rsp) + except LinkTimeoutException: + pass + except KeyboardInterrupt: + pass + + link.close() diff --git a/tcp_test.py b/tcp_test.py index 5c28717..910bb15 100644 --- a/tcp_test.py +++ b/tcp_test.py @@ -4,26 +4,26 @@ from py9b.transport.base import BaseTransport as BT from py9b.transport.packet import BasePacket as PKT from py9b.transport.xiaomi import XiaomiTransport -#link = SerialLink() +# link = SerialLink() with TCPLink() as link: - ports = link.scan() - print ports - - tran = XiaomiTransport(link) + ports = link.scan() + print(ports) - #link.open(("127.0.0.1", 6000)) - link.open(ports[0][1]) + tran = XiaomiTransport(link) - #req = PKT(src=BT.HOST, dst=BT.ESC, cmd=0x01, arg=0x10, data="\x10") - req = PKT(src=BT.HOST, dst=BT.BMS, cmd=0x01, arg=0x10, data="\x10") + # link.open(("127.0.0.1", 6000)) + link.open(ports[0][1]) - while raw_input("Press ENTER to send...")!="q": - tran.send(req) - try: - rsp = tran.recv() - except LinkTimeoutException: - print "No response" - continue - print rsp + # req = PKT(src=BT.HOST, dst=BT.ESC, cmd=0x01, arg=0x10, data="\x10") + req = PKT(src=BT.HOST, dst=BT.BMS, cmd=0x01, arg=0x10, data="\x10") - link.close() + while raw_input("Press ENTER to send...") != "q": + tran.send(req) + try: + rsp = tran.recv() + except LinkTimeoutException: + print("No response") + continue + print(rsp) + + link.close() diff --git a/unlock.py b/unlock.py index 4fd4969..c868b4a 100644 --- a/unlock.py +++ b/unlock.py @@ -1,31 +1,33 @@ #!python2-32 from py9b.link.base import LinkOpenException, LinkTimeoutException from py9b.link.tcp import TCPLink -#from py9b.link.ble import BLELink + +# from py9b.link.ble import BLELink from py9b.link.serial import SerialLink from py9b.transport.base import BaseTransport as BT from py9b.transport.packet import BasePacket as PKT -#from py9b.transport.xiaomi import XiaomiTransport + +# from py9b.transport.xiaomi import XiaomiTransport from py9b.transport.ninebot import NinebotTransport from py9b.command.regio import ReadRegs, WriteRegs -#link = SerialLink(dump=True) +# link = SerialLink(dump=True) link = TCPLink() -#link = BLELink() +# link = BLELink() with link: - print "Scanning..." - ports = link.scan() - print ports + print("Scanning...") + ports = link.scan() + print(ports) - #tran = XiaomiTransport(link) - tran = NinebotTransport(link) + # tran = XiaomiTransport(link) + tran = NinebotTransport(link) - link.open(("127.0.0.20:6000")) -# link.open(ports[0][1]) - print "Connected" + link.open(("127.0.0.20:6000")) + # link.open(ports[0][1]) + print("Connected") - print('Unlocking...') - tran.execute(WriteRegs(BT.ESC, 0x71, '