py9b/transport/xiaomi: BLE encryption support
parent
54d2eb335e
commit
c53be35b26
|
@ -0,0 +1,44 @@
|
|||
from py9b.link.bleak import BleakLink
|
||||
|
||||
from py9b.transport.base import BaseTransport as BT
|
||||
from py9b.transport.packet import BasePacket as PKT
|
||||
from py9b.transport.xiaomi import XiaomiTransport
|
||||
|
||||
from py9b.command.regio import ReadRegs
|
||||
|
||||
import time
|
||||
|
||||
link = BleakLink()
|
||||
with link:
|
||||
devs = link.scan()
|
||||
print(devs)
|
||||
|
||||
tran = XiaomiTransport(link)
|
||||
|
||||
link.open(devs[0])
|
||||
|
||||
data = tran.execute(ReadRegs(BT.ESC, 0x68, "<H"))[0]
|
||||
print('BLE version: %04x' % data)
|
||||
|
||||
if data >= 0x81:
|
||||
print('Connected, fetching keys...')
|
||||
keys = link.fetch_keys()
|
||||
tran.keys = keys
|
||||
print('keys:', keys)
|
||||
|
||||
# Recover longer keystream
|
||||
req = PKT(src=BT.HOST, dst=BT.BMS, cmd=0x01, arg=0x50, data=bytearray([0x20]))
|
||||
tran.send(req)
|
||||
resp = tran.recv()
|
||||
tran.keys += resp.data[9:]
|
||||
print('Got %d bytes of keystream' % (len(tran.keys),))
|
||||
|
||||
data = tran.execute(ReadRegs(BT.ESC, 0x68, "<H"))
|
||||
print('Version reported after encryption: %04x' % data)
|
||||
|
||||
data = tran.execute(ReadRegs(BT.ESC, 0x1A, "<H"))[0]
|
||||
print('ESC version: %04x' % data)
|
||||
data = tran.execute(ReadRegs(BT.BMS, 0x17, "<H"))[0]
|
||||
print('BMS version: %04x' % data)
|
||||
|
||||
print('Serial:', tran.execute(ReadRegs(BT.ESC, 0x10, "12s"))[0].decode())
|
|
@ -6,6 +6,7 @@ from threading import Thread
|
|||
|
||||
_rx_char_uuid = '6e400002-b5a3-f393-e0a9-e50e24dcca9e'
|
||||
_tx_char_uuid = '6e400003-b5a3-f393-e0a9-e50e24dcca9e'
|
||||
_keys_char_uuid = '00000014-0000-1000-8000-00805f9b34fb'
|
||||
|
||||
try:
|
||||
import queue
|
||||
|
@ -39,17 +40,26 @@ class BleakLink(BaseLink):
|
|||
self.loop = loop or asyncio.get_event_loop()
|
||||
self._rx_fifo = Fifo()
|
||||
self._client = None
|
||||
self._th = None
|
||||
|
||||
def __enter__(self):
|
||||
self.start()
|
||||
return self
|
||||
|
||||
def start(self):
|
||||
if self._th:
|
||||
return
|
||||
|
||||
self._th = Thread(target=run_worker, args=(self.loop,))
|
||||
self._th.daemon = True
|
||||
self._th.start()
|
||||
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
if self._client:
|
||||
asyncio.run_coroutine_threadsafe(self._client.disconnect(), self.loop).result(10)
|
||||
self.close()
|
||||
|
||||
def close(self):
|
||||
asyncio.run_coroutine_threadsafe(self._client.disconnect(), self.loop).result(10)
|
||||
|
||||
def scan(self, timeout=1):
|
||||
future = asyncio.run_coroutine_threadsafe(discover(timeout=timeout, device=self.device), self.loop)
|
||||
|
@ -64,7 +74,7 @@ class BleakLink(BaseLink):
|
|||
await self._client.connect()
|
||||
print('connected')
|
||||
await self._client.start_notify(_tx_char_uuid, self._data_received)
|
||||
print('services:', await self._client.get_services())
|
||||
print('services:', list(await self._client.get_services()))
|
||||
|
||||
def _data_received(self, sender, data):
|
||||
print('<<', ' '.join(map(lambda b: '%02x' % b, data)))
|
||||
|
@ -82,4 +92,5 @@ class BleakLink(BaseLink):
|
|||
raise LinkTimeoutException
|
||||
return data
|
||||
|
||||
|
||||
def fetch_keys(self):
|
||||
return asyncio.run_coroutine_threadsafe(self._client.read_gatt_char(_keys_char_uuid), self.loop).result(5)
|
||||
|
|
|
@ -8,7 +8,6 @@ class BasePacket(object):
|
|||
self.cmd = cmd
|
||||
self.arg = arg
|
||||
self.data = data
|
||||
print(self.data)
|
||||
|
||||
def __str__(self):
|
||||
return "%s->%s: %02X @%02X %s" % (BT.GetDeviceName(self.src), BT.GetDeviceName(self.dst), self.cmd, self.arg, hexlify(self.data).upper())
|
||||
|
|
|
@ -5,91 +5,107 @@ from .packet import BasePacket
|
|||
|
||||
|
||||
class XiaomiTransport(BT):
|
||||
MASTER2ESC = 0x20
|
||||
ESC2MASTER = 0x23
|
||||
MASTER2ESC = 0x20
|
||||
ESC2MASTER = 0x23
|
||||
|
||||
MASTER2BLE = 0x21
|
||||
BLE2MASTER = 0x24
|
||||
MASTER2BLE = 0x21
|
||||
BLE2MASTER = 0x24
|
||||
|
||||
MASTER2BMS = 0x22
|
||||
BMS2MASTER = 0x25
|
||||
MASTER2BMS = 0x22
|
||||
BMS2MASTER = 0x25
|
||||
|
||||
MOTOR = 0x01
|
||||
DEVFF = 0xFF
|
||||
MOTOR = 0x01
|
||||
DEVFF = 0xFF
|
||||
|
||||
_SaDa2Addr = { BT.HOST : { BT.MOTOR : MOTOR, BT.ESC : MASTER2ESC, BT.BLE : MASTER2BLE, BT.BMS : MASTER2BMS },
|
||||
BT.ESC : { BT.HOST : ESC2MASTER, BT.BLE : MASTER2BLE, BT.BMS : MASTER2BMS, BT.MOTOR : MOTOR },
|
||||
BT.BMS : { BT.HOST : BMS2MASTER, BT.ESC : BMS2MASTER, BT.MOTOR : MOTOR },
|
||||
BT.MOTOR : {BT.HOST : MOTOR, BT.ESC : MOTOR, BT.BMS : MOTOR } }
|
||||
_SaDa2Addr = { BT.HOST : { BT.MOTOR : MOTOR, BT.ESC : MASTER2ESC, BT.BLE : MASTER2BLE, BT.BMS : MASTER2BMS },
|
||||
BT.ESC : { BT.HOST : ESC2MASTER, BT.BLE : MASTER2BLE, BT.BMS : MASTER2BMS, BT.MOTOR : MOTOR },
|
||||
BT.BMS : { BT.HOST : BMS2MASTER, BT.ESC : BMS2MASTER, BT.MOTOR : MOTOR },
|
||||
BT.MOTOR : {BT.HOST : MOTOR, BT.ESC : MOTOR, BT.BMS : MOTOR } }
|
||||
|
||||
# TBC
|
||||
_BleAddr2SaDa = { MASTER2ESC : (BT.HOST, BT.ESC),
|
||||
ESC2MASTER : (BT.ESC, BT.HOST),
|
||||
MASTER2BMS : (BT.HOST, BT.BMS),
|
||||
BMS2MASTER : (BT.BMS, BT.HOST),
|
||||
MASTER2BLE : (BT.HOST, BT.BLE),
|
||||
BLE2MASTER : (BT.BLE, BT.HOST),
|
||||
MOTOR : (BT.MOTOR, BT.HOST) }
|
||||
# TBC
|
||||
_BleAddr2SaDa = { MASTER2ESC : (BT.HOST, BT.ESC),
|
||||
ESC2MASTER : (BT.ESC, BT.HOST),
|
||||
MASTER2BMS : (BT.HOST, BT.BMS),
|
||||
BMS2MASTER : (BT.BMS, BT.HOST),
|
||||
MASTER2BLE : (BT.HOST, BT.BLE),
|
||||
BLE2MASTER : (BT.BLE, BT.HOST),
|
||||
MOTOR : (BT.MOTOR, BT.HOST) }
|
||||
|
||||
_BmsAddr2SaDa = { MASTER2ESC : (BT.BMS, BT.ESC),
|
||||
ESC2MASTER : (BT.ESC, BT.BMS),
|
||||
MASTER2BMS : (BT.ESC, BT.BMS),
|
||||
BMS2MASTER : (BT.BMS, BT.ESC),
|
||||
MASTER2BLE : (BT.BMS, BT.BLE),
|
||||
BLE2MASTER : (BT.BLE, BT.BMS),
|
||||
MOTOR : (BT.MOTOR, BT.BMS) }
|
||||
_BmsAddr2SaDa = { MASTER2ESC : (BT.BMS, BT.ESC),
|
||||
ESC2MASTER : (BT.ESC, BT.BMS),
|
||||
MASTER2BMS : (BT.ESC, BT.BMS),
|
||||
BMS2MASTER : (BT.BMS, BT.ESC),
|
||||
MASTER2BLE : (BT.BMS, BT.BLE),
|
||||
BLE2MASTER : (BT.BLE, BT.BMS),
|
||||
MOTOR : (BT.MOTOR, BT.BMS) }
|
||||
|
||||
|
||||
def __init__(self, link, device=BT.HOST):
|
||||
super(XiaomiTransport, self).__init__(link)
|
||||
self.device = device
|
||||
def __init__(self, link, device=BT.HOST):
|
||||
super(XiaomiTransport, self).__init__(link)
|
||||
self.device = device
|
||||
self.keys = None
|
||||
|
||||
|
||||
def _make_addr(self, src, dst):
|
||||
return XiaomiTransport._SaDa2Addr[src][dst]
|
||||
def _make_addr(self, src, dst):
|
||||
return XiaomiTransport._SaDa2Addr[src][dst]
|
||||
|
||||
|
||||
def _split_addr(self, addr):
|
||||
if self.device==BT.BMS:
|
||||
return XiaomiTransport._BmsAddr2SaDa[addr]
|
||||
else:
|
||||
return XiaomiTransport._BleAddr2SaDa[addr]
|
||||
def _split_addr(self, addr):
|
||||
if self.device==BT.BMS:
|
||||
return XiaomiTransport._BmsAddr2SaDa[addr]
|
||||
else:
|
||||
return XiaomiTransport._BleAddr2SaDa[addr]
|
||||
|
||||
|
||||
def _wait_pre(self):
|
||||
while True:
|
||||
while True:
|
||||
c = self.link.read(1)
|
||||
if c==b"\x55":
|
||||
break
|
||||
while True:
|
||||
c = self.link.read(1)
|
||||
if c==b"\xAA":
|
||||
return True
|
||||
if c!=b"\x55":
|
||||
break # start waiting 55 again, else - this is 55, so wait for AA
|
||||
def _wait_pre(self):
|
||||
while True:
|
||||
while True:
|
||||
c = self.link.read(1)
|
||||
if c == b'\x55':
|
||||
break
|
||||
while True:
|
||||
c = self.link.read(1)
|
||||
if c == b'\xaa' or c == b'\xab':
|
||||
return c
|
||||
if c != b'\x55':
|
||||
break # start waiting 55 again, else - this is 55, so wait for AA
|
||||
|
||||
|
||||
def recv(self):
|
||||
self._wait_pre()
|
||||
pkt = self.link.read(1)
|
||||
l = ord(pkt)+3
|
||||
for i in range(l):
|
||||
pkt.extend(self.link.read(1))
|
||||
ck_calc = checksum(pkt[0:-2])
|
||||
ck_pkt = unpack("<H", pkt[-2:])[0]
|
||||
if ck_pkt!=ck_calc:
|
||||
print("Checksum mismatch !")
|
||||
return None
|
||||
sa, da = self._split_addr(pkt[1])
|
||||
return BasePacket(sa, da, pkt[2], pkt[3], pkt[4:-2]) # sa, da, cmd, arg, data
|
||||
def recv(self):
|
||||
ver = self._wait_pre()
|
||||
pkt = self.link.read(1)
|
||||
l = ord(pkt) + 3 + (4 if ver == b'\xab' else 0)
|
||||
for i in range(l):
|
||||
pkt.extend(self.link.read(1))
|
||||
ck_calc = checksum(pkt[0:-2])
|
||||
ck_pkt = unpack("<H", pkt[-2:])[0]
|
||||
if ck_pkt!=ck_calc:
|
||||
print("Checksum mismatch !")
|
||||
return None
|
||||
|
||||
if ver == b'\xab':
|
||||
# Drops 2 bytes of garbage and 2 bytes of checksum, first 2 bytes
|
||||
# of garbage are dropped below
|
||||
pkt[1:] = self.encrypt(pkt[1:])[:-4]
|
||||
|
||||
sa, da = self._split_addr(pkt[1])
|
||||
return BasePacket(sa, da, pkt[2], pkt[3], pkt[4:-2]) # sa, da, cmd, arg, data
|
||||
|
||||
|
||||
def send(self, packet):
|
||||
dev = self._make_addr(packet.src, packet.dst)
|
||||
pkt = pack("<BBBB", len(packet.data)+2, dev, packet.cmd, packet.arg)+packet.data
|
||||
pkt = b"\x55\xAA" + pkt + pack("<H", checksum(pkt))
|
||||
self.link.write(pkt)
|
||||
def send(self, packet):
|
||||
dev = self._make_addr(packet.src, packet.dst)
|
||||
if self.keys:
|
||||
pkt = pack("<B", len(packet.data)+2)
|
||||
pkt += self.encrypt(pack("<BBB", dev, packet.cmd, packet.arg)+packet.data + (b'\x00' * 4))
|
||||
pkt = b'\x55\xab' + pkt + pack("<H", checksum(pkt))
|
||||
else:
|
||||
pkt = pack("<BBBB", len(packet.data)+2, dev, packet.cmd, packet.arg)+packet.data
|
||||
pkt = b'\x55\xaa' + pkt + pack("<H", checksum(pkt))
|
||||
self.link.write(pkt)
|
||||
|
||||
def encrypt(self, data):
|
||||
k = self.keys
|
||||
return bytearray([b ^ (k[i] if i < len(k) else 0) for i, b in enumerate(data)])
|
||||
|
||||
|
||||
__all__ = ["XiaomiTransport"]
|
||||
|
|
Loading…
Reference in New Issue