Added custom UUID parameters for fleet vehicles

legit-fork
Slinger360 2019-03-16 13:02:12 -05:00
parent 7e454d7618
commit 1135928621
2 changed files with 122 additions and 7 deletions

View File

@ -42,9 +42,12 @@ def UpdateFirmware(link, tran, dev, fwfile):
return False
print('OK')
print('Locking...')
tran.execute(WriteRegs(BT.ESC, 0x70, '<H', 0x0001))
if args.interface!='blefleet':
print('Locking...')
tran.execute(WriteRegs(BT.ESC, 0x70, '<H', 0x0001))
else:
print('Not Locking...')
print('Starting...')
tran.execute(StartUpdate(dev, fw_size))
@ -75,18 +78,18 @@ def UpdateFirmware(link, tran, dev, fwfile):
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description='Xiaomi/Ninebot firmware flasher',
epilog='Example 1: %(prog)s ble ble_patched.bin - flash ble_patched.bin to BLE using default communication parameters'
epilog='Example 1: %(prog)s ble ble_patched.bin - flash ble_patched.bin to BLE using default communication parameters'
'\nExample 2: %(prog)s -i tcp -a 192.168.1.10:6000 bms bms115.bin - flash bms115.bin to BMS over TCP-BLE bridge at 192.168.1.10:6000'
'\nExample 3: %(prog)s -i serial -a COM2 esc CFW.bin - flash CFW.bin to ESC via COM2'
'\nExample 4: %(prog)s -i ble -a 12:34:56:78:9A:BC -p ninebot extbms bms107.bin - flash bms107.bin to Ninebot\'s external BMS via BLE, use specified BLE address')
devices = {'ble' : BT.BLE, 'esc' : BT.ESC, 'bms' : BT.BMS, 'extbms' : BT.EXTBMS }
parser.add_argument('device', help='target device', type=str.lower, choices=devices)
parser.add_argument('file', type=argparse.FileType('rb'), help='firmware file')
parser.add_argument('-i', '--interface', help='communication interface, default: %(default)s', type=str.lower,
choices=('ble', 'serial', 'tcp'), default='ble')
choices=('ble', 'serial', 'tcp', 'blefleet'), default='ble')
parser.add_argument('-a', '--address', help='communication address (ble: BDADDR, serial: port, tcp: host:port), default: first available')
@ -116,9 +119,15 @@ elif args.interface=='tcp':
elif args.interface=='serial':
from py9b.link.serial import SerialLink
link = SerialLink()
elif args.interface=='blefleet':
try:
from py9b.link.blefleet import BLELink
except:
exit('BLE is not supported on your system !')
link = BLELink()
else:
exit('!!! BUG !!! Unknown interface selected: '+args.interface)
with link:
tran = protocols.get(args.protocol)(link)

106
py9b/link/blefleet.py Normal file
View File

@ -0,0 +1,106 @@
"""BLE link using BlueGiga adapter via PyGatt/BGAPI"""
from __future__ import absolute_import
import pygatt
from .base import BaseLink, LinkTimeoutException, LinkOpenException
from binascii import hexlify
SCAN_TIMEOUT = 3
import queue
class Fifo():
def __init__(self):
self.q = queue.Queue()
def write(self, data): # put bytes
for b in data:
self.q.put(b)
def read(self, size=1, timeout=None): # but read string
res = ''
for i in xrange(size):
res += chr(self.q.get(True, timeout))
return res
#_cccd_uuid = '00002902-0000-1000-8000-00805f9b34fb'
_rx_char_uuid = input('RX UUID?')
_tx_char_uuid = input('TX UUID?')
_write_chunk_size = 20 # as in android dumps
class BLELink(BaseLink):
def __init__(self, *args, **kwargs):
super(BLELink, self).__init__(*args, **kwargs)
self._adapter = None
self._dev = None
self._wr_handle = None
self._rx_fifo = Fifo()
def __enter__(self):
self._adapter = pygatt.BGAPIBackend()
self._adapter.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def _make_rx_cb(self): # this is a closure :)
def rx_cb(handle, value):
self._rx_fifo.write(value)
return rx_cb
def scan(self):
res = []
devices = self._adapter.scan(timeout=SCAN_TIMEOUT)
for dev in devices:
if dev['name'].startswith((u'MISc', u'NBSc')):
res.append((dev['name'], dev['address']))
return res
def open(self, port):
try:
self._dev = self._adapter.connect(port, address_type=pygatt.BLEAddressType.random)
self._dev.subscribe(_tx_char_uuid, callback=self._make_rx_cb())
self._wr_handle = self._dev.get_handle(_rx_char_uuid)
except pygatt.exceptions.NotConnectedError:
raise LinkOpenException
def close(self):
if self._dev:
self._dev.disconnect()
self._dev = None
if self._adapter:
self._adapter.stop()
def read(self, size):
try:
data = self._rx_fifo.read(size, timeout=self.timeout)
except queue.Empty:
raise LinkTimeoutException
if self.dump:
print '<', hexlify(data).upper()
return data
def write(self, data):
if self.dump:
print '>', hexlify(data).upper()
size = len(data)
ofs = 0
while size:
chunk_sz = min(size, _write_chunk_size)
self._dev.char_write_handle(self._wr_handle, bytearray(data[ofs:ofs+chunk_sz]))
ofs += chunk_sz
size -= chunk_sz
__all__ = ['BLELink']