From c53be35b261665a9d8846f54e0a4d6ddb2a42f57 Mon Sep 17 00:00:00 2001 From: Piotr Dobrowolski Date: Sat, 19 Oct 2019 09:45:12 +0200 Subject: [PATCH] py9b/transport/xiaomi: BLE encryption support --- m365_hax.py | 44 ++++++++++++ py9b/link/bleak.py | 21 ++++-- py9b/transport/packet.py | 1 - py9b/transport/xiaomi.py | 150 ++++++++++++++++++++++----------------- 4 files changed, 143 insertions(+), 73 deletions(-) create mode 100644 m365_hax.py diff --git a/m365_hax.py b/m365_hax.py new file mode 100644 index 0000000..7a59115 --- /dev/null +++ b/m365_hax.py @@ -0,0 +1,44 @@ +from py9b.link.bleak import BleakLink + +from py9b.transport.base import BaseTransport as BT +from py9b.transport.packet import BasePacket as PKT +from py9b.transport.xiaomi import XiaomiTransport + +from py9b.command.regio import ReadRegs + +import time + +link = BleakLink() +with link: + devs = link.scan() + print(devs) + + tran = XiaomiTransport(link) + + link.open(devs[0]) + + data = tran.execute(ReadRegs(BT.ESC, 0x68, "= 0x81: + print('Connected, fetching keys...') + keys = link.fetch_keys() + tran.keys = keys + print('keys:', keys) + + # Recover longer keystream + req = PKT(src=BT.HOST, dst=BT.BMS, cmd=0x01, arg=0x50, data=bytearray([0x20])) + tran.send(req) + resp = tran.recv() + tran.keys += resp.data[9:] + print('Got %d bytes of keystream' % (len(tran.keys),)) + + data = tran.execute(ReadRegs(BT.ESC, 0x68, "%s: %02X @%02X %s" % (BT.GetDeviceName(self.src), BT.GetDeviceName(self.dst), self.cmd, self.arg, hexlify(self.data).upper()) diff --git a/py9b/transport/xiaomi.py b/py9b/transport/xiaomi.py index 7c94b21..eb963ca 100644 --- a/py9b/transport/xiaomi.py +++ b/py9b/transport/xiaomi.py @@ -5,91 +5,107 @@ from .packet import BasePacket class XiaomiTransport(BT): - MASTER2ESC = 0x20 - ESC2MASTER = 0x23 + MASTER2ESC = 0x20 + ESC2MASTER = 0x23 - MASTER2BLE = 0x21 - BLE2MASTER = 0x24 + MASTER2BLE = 0x21 + BLE2MASTER = 0x24 - MASTER2BMS = 0x22 - BMS2MASTER = 0x25 + MASTER2BMS = 0x22 + BMS2MASTER = 0x25 - MOTOR = 0x01 - DEVFF = 0xFF + MOTOR = 0x01 + DEVFF = 0xFF - _SaDa2Addr = { BT.HOST : { BT.MOTOR : MOTOR, BT.ESC : MASTER2ESC, BT.BLE : MASTER2BLE, BT.BMS : MASTER2BMS }, - BT.ESC : { BT.HOST : ESC2MASTER, BT.BLE : MASTER2BLE, BT.BMS : MASTER2BMS, BT.MOTOR : MOTOR }, - BT.BMS : { BT.HOST : BMS2MASTER, BT.ESC : BMS2MASTER, BT.MOTOR : MOTOR }, - BT.MOTOR : {BT.HOST : MOTOR, BT.ESC : MOTOR, BT.BMS : MOTOR } } + _SaDa2Addr = { BT.HOST : { BT.MOTOR : MOTOR, BT.ESC : MASTER2ESC, BT.BLE : MASTER2BLE, BT.BMS : MASTER2BMS }, + BT.ESC : { BT.HOST : ESC2MASTER, BT.BLE : MASTER2BLE, BT.BMS : MASTER2BMS, BT.MOTOR : MOTOR }, + BT.BMS : { BT.HOST : BMS2MASTER, BT.ESC : BMS2MASTER, BT.MOTOR : MOTOR }, + BT.MOTOR : {BT.HOST : MOTOR, BT.ESC : MOTOR, BT.BMS : MOTOR } } - # TBC - _BleAddr2SaDa = { MASTER2ESC : (BT.HOST, BT.ESC), - ESC2MASTER : (BT.ESC, BT.HOST), - MASTER2BMS : (BT.HOST, BT.BMS), - BMS2MASTER : (BT.BMS, BT.HOST), - MASTER2BLE : (BT.HOST, BT.BLE), - BLE2MASTER : (BT.BLE, BT.HOST), - MOTOR : (BT.MOTOR, BT.HOST) } + # TBC + _BleAddr2SaDa = { MASTER2ESC : (BT.HOST, BT.ESC), + ESC2MASTER : (BT.ESC, BT.HOST), + MASTER2BMS : (BT.HOST, BT.BMS), + BMS2MASTER : (BT.BMS, BT.HOST), + MASTER2BLE : (BT.HOST, BT.BLE), + BLE2MASTER : (BT.BLE, BT.HOST), + MOTOR : (BT.MOTOR, BT.HOST) } - _BmsAddr2SaDa = { MASTER2ESC : (BT.BMS, BT.ESC), - ESC2MASTER : (BT.ESC, BT.BMS), - MASTER2BMS : (BT.ESC, BT.BMS), - BMS2MASTER : (BT.BMS, BT.ESC), - MASTER2BLE : (BT.BMS, BT.BLE), - BLE2MASTER : (BT.BLE, BT.BMS), - MOTOR : (BT.MOTOR, BT.BMS) } + _BmsAddr2SaDa = { MASTER2ESC : (BT.BMS, BT.ESC), + ESC2MASTER : (BT.ESC, BT.BMS), + MASTER2BMS : (BT.ESC, BT.BMS), + BMS2MASTER : (BT.BMS, BT.ESC), + MASTER2BLE : (BT.BMS, BT.BLE), + BLE2MASTER : (BT.BLE, BT.BMS), + MOTOR : (BT.MOTOR, BT.BMS) } - def __init__(self, link, device=BT.HOST): - super(XiaomiTransport, self).__init__(link) - self.device = device + def __init__(self, link, device=BT.HOST): + super(XiaomiTransport, self).__init__(link) + self.device = device + self.keys = None - def _make_addr(self, src, dst): - return XiaomiTransport._SaDa2Addr[src][dst] + def _make_addr(self, src, dst): + return XiaomiTransport._SaDa2Addr[src][dst] - def _split_addr(self, addr): - if self.device==BT.BMS: - return XiaomiTransport._BmsAddr2SaDa[addr] - else: - return XiaomiTransport._BleAddr2SaDa[addr] + def _split_addr(self, addr): + if self.device==BT.BMS: + return XiaomiTransport._BmsAddr2SaDa[addr] + else: + return XiaomiTransport._BleAddr2SaDa[addr] - def _wait_pre(self): - while True: - while True: - c = self.link.read(1) - if c==b"\x55": - break - while True: - c = self.link.read(1) - if c==b"\xAA": - return True - if c!=b"\x55": - break # start waiting 55 again, else - this is 55, so wait for AA + def _wait_pre(self): + while True: + while True: + c = self.link.read(1) + if c == b'\x55': + break + while True: + c = self.link.read(1) + if c == b'\xaa' or c == b'\xab': + return c + if c != b'\x55': + break # start waiting 55 again, else - this is 55, so wait for AA - def recv(self): - self._wait_pre() - pkt = self.link.read(1) - l = ord(pkt)+3 - for i in range(l): - pkt.extend(self.link.read(1)) - ck_calc = checksum(pkt[0:-2]) - ck_pkt = unpack("