Many fwupd fixes

legit-fork
flowswitch 2018-12-01 23:24:21 +01:00
parent 6654c6bfe3
commit 4079e4f619
17 changed files with 390 additions and 75 deletions

3
.gitignore vendored
View File

@ -1,3 +1,6 @@
*.bin
py9b.si4project/
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod] *.py[cod]

View File

@ -3,6 +3,7 @@ Ninebot/Xiaomi electric scooter communication library
## Requirements ## Requirements
* Python 2.x.x [www.python.org] * Python 2.x.x [www.python.org]
* ProgressBar [pip install progressbar]
* PySerial [pip install pyserial] - for direct serial link backend * PySerial [pip install pyserial] - for direct serial link backend
* PyGatt [pip install pygatt] - for BLED112 dongle backend * PyGatt [pip install pygatt] - for BLED112 dongle backend
* nRFUARTBridge [https://github.com/flowswitch/nRFUARTBridge] - for Android BLE-TCP backend * nRFUARTBridge [https://github.com/flowswitch/nRFUARTBridge] - for Android BLE-TCP backend

131
fwupd.py
View File

@ -1,6 +1,10 @@
#!python2-32 #!python2-32
from __future__ import print_function
from sys import argv, exit from sys import argv, exit
from os.path import getsize import os
import argparse
from progressbar import ProgressBar
from py9b.link.base import LinkOpenException, LinkTimeoutException from py9b.link.base import LinkOpenException, LinkTimeoutException
from py9b.link.tcp import TCPLink from py9b.link.tcp import TCPLink
from py9b.link.ble import BLELink from py9b.link.ble import BLELink
@ -10,65 +14,114 @@ from py9b.transport.xiaomi import XiaomiTransport
from py9b.command.regio import ReadRegs, WriteRegs from py9b.command.regio import ReadRegs, WriteRegs
from py9b.command.update import * from py9b.command.update import *
PING_RETRIES = 20
def checksum(s, data): def checksum(s, data):
for c in data: for c in data:
s += ord(c) s += ord(c)
return (s & 0xFFFFFFFF) return (s & 0xFFFFFFFF)
fw_dev = BT.BMS def UpdateFirmware(link, tran, dev, fwfile):
fw_name = "bms.bin" fwfile.seek(0, os.SEEK_END)
fw_size = getsize(fw_name) fw_size = fwfile.tell()
fw_page_size = 0x80 fwfile.seek(0)
fw_page_size = 0x80
link = SerialLink(timeout=0.5) print('Pinging...', end='')
#link = TCPLink() for retry in range(PING_RETRIES):
#link = BLELink() print('.', end='')
with link:
print "Scanning..."
ports = link.scan()
print ports
tran = XiaomiTransport(link)
#link.open(("192.168.1.45", 6000))
link.open(ports[0][1])
print "Connected"
print "Pinging..."
for retry in xrange(20):
print ".",
try: try:
tran.execute(ReadRegs(BT.BMS, 0x10, "14s")) if dev==BT.BLE:
tran.execute(ReadRegs(dev, 0, '13s'))
else:
tran.execute(ReadRegs(dev, 0x10, '14s'))
except LinkTimeoutException: except LinkTimeoutException:
continue continue
break break
else: else:
exit("Timed out !") print('Timed out !')
print "" return False
print('OK')
print('Locking...')
tran.execute(WriteRegs(BT.ESC, 0x70, '<H', 0x0001))
hfi = open(fw_name, "rb") print('Starting...')
tran.execute(StartUpdate(dev, fw_size))
print "Starting..." print('Writing...')
tran.execute(StartUpdate(fw_dev, fw_size)) pb = ProgressBar(maxval=fw_size//fw_page_size+1).start()
print "Writing..."
page = 0 page = 0
chk = 0 chk = 0
while fw_size: while fw_size:
print "{0:X}".format(page*0x80) pb.update(page)
chunk_sz = min(fw_size, fw_page_size) chunk_sz = min(fw_size, fw_page_size)
data = hfi.read(chunk_sz) data = fwfile.read(chunk_sz)
chk = checksum(chk, data) chk = checksum(chk, data)
tran.execute(WriteUpdate(fw_dev, page, data)) tran.execute(WriteUpdate(dev, page, data))
page += 1 page += 1
fw_size -= chunk_sz fw_size -= chunk_sz
hfi.close() pb.finish()
print "Finalizing..." print('Finalizing...')
tran.execute(FinishUpdate(fw_dev, chk ^ 0xFFFFFFFF)) tran.execute(FinishUpdate(dev, chk ^ 0xFFFFFFFF))
print "Reboot" print('Reboot')
tran.execute(RebootUpdate(fw_dev)) tran.execute(RebootUpdate(dev))
print('Done')
return True
##########################################################################################
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description='Xiaomi/Ninebot firmware flasher',
epilog='Example 1: %(prog)s ble ble_patched.bin - flash ble_patched.bin to BLE using default communication parameters'
'\nExample 2: %(prog)s -i tcp -a 192.168.1.10:6000 bms bms115.bin - flash bms115.bin to BMS over TCP-BLE bridge at 192.168.1.10:6000'
'\nExample 3: %(prog)s -i serial -a COM2 esc CFW.bin - flash CFW.bin to ESC via COM2'
'\nExample 4: %(prog)s -i ble -a 12:34:56:78:9A:BC esc CFW.bin - flash CFW.bin to ESC via BLE, use specified BLE address')
devices = {'ble' : BT.BLE, 'esc' : BT.ESC, 'bms' : BT.BMS} # TODO: add extbms
parser.add_argument('device', help='target device', type=str.lower, choices=devices)
parser.add_argument('file', type=argparse.FileType('rb'), help='firmware file')
interfaces = {'ble' : BLELink, 'serial' : SerialLink, 'tcp' : TCPLink}
parser.add_argument('-i', '--interface', help='communication interface, default: %(default)s', type=str.lower,
choices=interfaces, default='ble')
parser.add_argument('-a', '--address', help='communication address (ble: BDADDR, serial: port, tcp: host:port), default: first available')
protocols = {'xiaomi' : XiaomiTransport} # TODO: add Ninebot ES
parser.add_argument('-p', '--protocol', help='communication protocol, default: %(default)s', type=str.lower,
choices=protocols, default='xiaomi')
if len(argv)==1:
parser.print_usage()
exit()
args = parser.parse_args()
dev = devices.get(args.device)
link = interfaces.get(args.interface)()
with link:
tran = protocols.get(args.protocol)(link)
if args.address:
addr = args.address
else:
print('Scanning...')
ports = link.scan()
if not ports:
exit("No interfaces found !")
print('Connecting to', ports[0][0])
addr = ports[0][1]
link.open(addr)
print('Connected')
try:
UpdateFirmware(link, tran, dev, args.file)
except Exception as e:
print('Error:', e)

24
py9b/command/mfg.py Normal file
View File

@ -0,0 +1,24 @@
"""Manufacturer commands"""
from struct import pack, unpack
from .base import BaseCommand, InvalidResponse
class AuthError(Exception):
pass
class WriteSN(BaseCommand):
def __init__(self, dev, sn, auth):
super(WriteSN, self).__init__(dst=dev, cmd=0x18, arg=0x10, data=pack("<14sL", sn, auth), has_response=True)
self.dev = dev
def handle_response(self, response):
if len(response.data)!=0:
raise InvalidResponse("WriteSN {0:X}".format(self.dev))
if response.arg!=1:
raise AuthError("WriteSN {0:X}".format(self.dev))
return True
__all__=["AuthError", "WriteSN"]

View File

@ -1,3 +1,5 @@
"""Register read/write commands"""
from struct import pack, unpack, calcsize from struct import pack, unpack, calcsize
from .base import BaseCommand, InvalidResponse from .base import BaseCommand, InvalidResponse
@ -11,7 +13,7 @@ class ReadRegs(BaseCommand):
def handle_response(self, response): def handle_response(self, response):
if response.arg!=self.reg or len(response.data)!=calcsize(self.format): if response.arg!=self.reg or len(response.data)!=calcsize(self.format):
raise InvalidResponse("ReadRegs {0:X}:{1:X}".format(self.dev, self.reg)) raise InvalidResponse("ReadRegs {0:X}:{1:X}: @{2:X} [{3:X}]".format(self.dev, self.reg, response.arg, len(response.data)))
return unpack(self.format, response.data) return unpack(self.format, response.data)

View File

@ -1,7 +1,21 @@
"""Firmware update commands"""
from struct import pack, unpack from struct import pack, unpack
from .base import BaseCommand, InvalidResponse from .base import BaseCommand, InvalidResponse
# error codes:
# 1 - invalid parameter
# 2 - erase error
# 3 - flash error
# 4 - not locked
# 5 - address error
# 6 - command in progress
# 7 - invalid cmd/len
UpdateErrorCodes = { 0: 'OK', 1: 'Out of bounds', 2: 'Erase error', 3: 'Write error',
4: 'Not locked', 5: 'Invalid address', 6: 'Command in progress', 7: 'Invalid payload len'}
class UpdateError(Exception): class UpdateError(Exception):
pass pass
@ -12,24 +26,24 @@ class StartUpdate(BaseCommand):
self.dev = dev self.dev = dev
def handle_response(self, response): def handle_response(self, response):
if len(response.data)!=0: if not len(response.data) in (0, 1):
raise InvalidResponse("StartUpdate {0:X}".format(self.dev)) raise InvalidResponse("StartUpdate {0:X}".format(self.dev))
if response.arg!=0: if response.arg!=0:
raise UpdateError("StartUpdate {0:X} error {1:d}".format(self.dev, response.arg)) raise UpdateError("StartUpdate {0:X}: {1:s}".format(self.dev, UpdateErrorCodes.get(response.arg, str(response.arg))))
return True return True
class WriteUpdate(BaseCommand): class WriteUpdate(BaseCommand):
def __init__(self, dev, page, data): def __init__(self, dev, page, data):
super(WriteUpdate, self).__init__(dst=dev, cmd=0x08, arg=page, data=data, has_response=True) super(WriteUpdate, self).__init__(dst=dev, cmd=0x08, arg=page & 0xFF, data=data, has_response=True)
self.dev = dev self.dev = dev
self.page = page self.page = page
def handle_response(self, response): def handle_response(self, response):
if len(response.data)!=0: if not len(response.data) in (0, 1):
raise InvalidResponse("WriteUpdate {0:X} @{1:X}".format(self.dev, self.page)) raise InvalidResponse("WriteUpdate {0:X} @{1:X}".format(self.dev, self.page))
if response.arg!=0: if response.arg!=0:
raise UpdateError("WriteUpdate {0:X} @{1:X} error {2:d}".format(self.dev, self.page, response.arg)) raise UpdateError("WriteUpdate {0:X} @{1:X}: {2:s}".format(self.dev, self.page, UpdateErrorCodes.get(response.arg, str(response.arg))))
return True return True
@ -39,10 +53,10 @@ class FinishUpdate(BaseCommand):
self.dev = dev self.dev = dev
def handle_response(self, response): def handle_response(self, response):
if len(response.data)!=0: if not len(response.data) in (0, 1):
raise InvalidResponse("FinishUpdate {0:X}".format(self.dev)) raise InvalidResponse("FinishUpdate {0:X}".format(self.dev))
if response.arg!=0: if response.arg!=0:
raise UpdateError("FinishUpdate {0:X} error {1:d}".format(self.dev, response.arg)) raise UpdateError("FinishUpdate {0:X}: {1:s}".format(self.dev, UpdateErrorCodes.get(response.arg, str(response.arg))))
return True return True

View File

@ -29,6 +29,8 @@ class Fifo():
_rx_char_uuid = "6e400002-b5a3-f393-e0a9-e50e24dcca9e" _rx_char_uuid = "6e400002-b5a3-f393-e0a9-e50e24dcca9e"
_tx_char_uuid = "6e400003-b5a3-f393-e0a9-e50e24dcca9e" _tx_char_uuid = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"
_write_chunk_size = 20 # as in android dumps
class BLELink(BaseLink): class BLELink(BaseLink):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(BLELink, self).__init__(*args, **kwargs) super(BLELink, self).__init__(*args, **kwargs)
@ -93,7 +95,13 @@ class BLELink(BaseLink):
def write(self, data): def write(self, data):
if self.dump: if self.dump:
print ">", hexlify(data).upper() print ">", hexlify(data).upper()
self._dev.char_write_handle(self._wr_handle, bytearray(data)) size = len(data)
ofs = 0
while size:
chunk_sz = min(size, _write_chunk_size)
self._dev.char_write_handle(self._wr_handle, bytearray(data[ofs:ofs+chunk_sz]))
ofs += chunk_sz
size -= chunk_sz
__all__ = ["BLELink"] __all__ = ["BLELink"]

View File

@ -6,6 +6,7 @@ from .base import BaseLink, LinkTimeoutException, LinkOpenException
HOST, PORT = "127.0.0.1", 6000 HOST, PORT = "127.0.0.1", 6000
_write_chunk_size = 20 # as in android dumps
def recvall(sock, size): def recvall(sock, size):
data = "" data = ""
@ -63,7 +64,13 @@ class TCPLink(BaseLink):
def write(self, data): def write(self, data):
if self.dump: if self.dump:
print ">", hexlify(data).upper() print ">", hexlify(data).upper()
self.sock.sendall(data) size = len(data)
ofs = 0
while size:
chunk_sz = min(size, _write_chunk_size)
self.sock.sendall(data[ofs:ofs+chunk_sz])
ofs += chunk_sz
size -= chunk_sz
__all__ = ["TCPLink"] __all__ = ["TCPLink"]

View File

@ -7,12 +7,15 @@ def checksum(data):
return (s & 0xFFFF) ^ 0xFFFF return (s & 0xFFFF) ^ 0xFFFF
class BaseTransport(object): class BaseTransport(object):
DEV01 = 1 MOTOR = 0x01
ESC = 0x20 ESC = 0x20
BLE = 0x21 BLE = 0x21
BMS = 0x22 BMS = 0x22
HOST = 0x3E HOST = 0x3E
DeviceNames = { MOTOR : "MOTOR", ESC : "ESC", BLE : "BLE", BMS : "BMS", HOST : "HOST" }
def __init__(self, link): def __init__(self, link):
self.link = link self.link = link
@ -31,5 +34,9 @@ class BaseTransport(object):
rsp = self.recv() rsp = self.recv()
return command.handle_response(rsp) return command.handle_response(rsp)
@staticmethod
def GetDeviceName(dev):
return BaseTransport.DeviceNames.get(dev, "%02X" % (dev))
__all__ = ["checksum", "BaseTransport"] __all__ = ["checksum", "BaseTransport"]

View File

@ -1,4 +1,5 @@
from binascii import hexlify from binascii import hexlify
from .base import BaseTransport as BT
class BasePacket(object): class BasePacket(object):
def __init__(self, src=0, dst=0, cmd=0, arg=0, data=""): def __init__(self, src=0, dst=0, cmd=0, arg=0, data=""):
@ -9,7 +10,7 @@ class BasePacket(object):
self.data = data self.data = data
def __str__(self): def __str__(self):
return "%02X->%02X: %02X @%02X %s" % (self.src, self.dst, self.cmd, self.arg, hexlify(self.data).upper()) return "%s->%s: %02X @%02X %s" % (BT.GetDeviceName(self.src), BT.GetDeviceName(self.dst), self.cmd, self.arg, hexlify(self.data).upper())
__all__ = ["BasePacket"] __all__ = ["BasePacket"]

View File

@ -14,26 +14,30 @@ class XiaomiTransport(BT):
MASTER2BMS = 0x22 MASTER2BMS = 0x22
BMS2MASTER = 0x25 BMS2MASTER = 0x25
DEV01 = 0x01 MOTOR = 0x01
DEVFF = 0xFF DEVFF = 0xFF
_SaDa2Addr = { BT.HOST : { BT.DEV01 : DEV01, BT.ESC : MASTER2ESC, BT.BLE : MASTER2BLE, BT.BMS : MASTER2BMS }, _SaDa2Addr = { BT.HOST : { BT.MOTOR : MOTOR, BT.ESC : MASTER2ESC, BT.BLE : MASTER2BLE, BT.BMS : MASTER2BMS },
BT.ESC : { BT.HOST : ESC2MASTER, BT.BLE : MASTER2BLE, BT.BMS : MASTER2BMS, BT.DEV01 : DEV01 }, BT.ESC : { BT.HOST : ESC2MASTER, BT.BLE : MASTER2BLE, BT.BMS : MASTER2BMS, BT.MOTOR : MOTOR },
BT.BMS : { BT.HOST : BMS2MASTER, BT.ESC : BMS2MASTER, BT.DEV01 : DEV01 }, BT.BMS : { BT.HOST : BMS2MASTER, BT.ESC : BMS2MASTER, BT.MOTOR : MOTOR },
BT.DEV01 : {BT.HOST : DEV01, BT.ESC : DEV01, BT.BMS : DEV01 } } BT.MOTOR : {BT.HOST : MOTOR, BT.ESC : MOTOR, BT.BMS : MOTOR } }
# TBC # TBC
_BleAddr2SaDa = { MASTER2ESC : (BT.HOST, BT.ESC), _BleAddr2SaDa = { MASTER2ESC : (BT.HOST, BT.ESC),
ESC2MASTER : (BT.ESC, BT.HOST), ESC2MASTER : (BT.ESC, BT.HOST),
MASTER2BMS : (BT.HOST, BT.BMS), MASTER2BMS : (BT.HOST, BT.BMS),
BMS2MASTER : (BT.BMS, BT.HOST), BMS2MASTER : (BT.BMS, BT.HOST),
DEV01 : (BT.DEV01, BT.HOST) } MASTER2BLE : (BT.HOST, BT.BLE),
BLE2MASTER : (BT.BLE, BT.HOST),
MOTOR : (BT.MOTOR, BT.HOST) }
_BmsAddr2SaDa = { MASTER2ESC : (BT.BMS, BT.ESC), _BmsAddr2SaDa = { MASTER2ESC : (BT.BMS, BT.ESC),
ESC2MASTER : (BT.ESC, BT.BMS), ESC2MASTER : (BT.ESC, BT.BMS),
MASTER2BMS : (BT.ESC, BT.BMS), MASTER2BMS : (BT.ESC, BT.BMS),
BMS2MASTER : (BT.BMS, BT.ESC), BMS2MASTER : (BT.BMS, BT.ESC),
DEV01 : (BT.DEV01, BT.BMS) } MASTER2BLE : (BT.BMS, BT.BLE),
BLE2MASTER : (BT.BLE, BT.BMS),
MOTOR : (BT.MOTOR, BT.BMS) }
def __init__(self, link, device=BT.HOST): def __init__(self, link, device=BT.HOST):
@ -78,7 +82,7 @@ class XiaomiTransport(BT):
print "Checksum mismatch !" print "Checksum mismatch !"
return None return None
sa, da = self._split_addr(ord(pkt[1])) sa, da = self._split_addr(ord(pkt[1]))
return BasePacket(sa, da, ord(pkt[2]), ord(pkt[3]), pkt[4:-2]) # sa, da, cmd, param, data return BasePacket(sa, da, ord(pkt[2]), ord(pkt[3]), pkt[4:-2]) # sa, da, cmd, arg, data
def send(self, packet): def send(self, packet):

View File

@ -10,9 +10,9 @@ from py9b.command.regio import ReadRegs
READ_CHUNK_SIZE = 0x10 READ_CHUNK_SIZE = 0x10
link = SerialLink(dump=True) #link = SerialLink(dump=True)
#link = TCPLink() #link = TCPLink()
#link = BLELink() link = BLELink()
with link: with link:
print "Scanning..." print "Scanning..."

View File

@ -2,13 +2,15 @@
from py9b.link.base import LinkOpenException, LinkTimeoutException from py9b.link.base import LinkOpenException, LinkTimeoutException
from py9b.link.tcp import TCPLink from py9b.link.tcp import TCPLink
from py9b.link.ble import BLELink from py9b.link.ble import BLELink
from py9b.link.serial import SerialLink
from py9b.transport.base import BaseTransport as BT from py9b.transport.base import BaseTransport as BT
from py9b.transport.packet import BasePacket as PKT from py9b.transport.packet import BasePacket as PKT
from py9b.transport.xiaomi import XiaomiTransport from py9b.transport.xiaomi import XiaomiTransport
from py9b.command.regio import ReadRegs
READ_CHUNK_SIZE = 0x40 READ_CHUNK_SIZE = 0x10
#link = SerialLink() #link = SerialLink(dump=True)
#link = TCPLink() #link = TCPLink()
link = BLELink() link = BLELink()
@ -23,20 +25,19 @@ with link:
link.open(ports[0][1]) link.open(ports[0][1])
print "Connected" print "Connected"
req = PKT(src=BT.HOST, dst=BT.ESC, cmd=0x01, arg=0, data=chr(READ_CHUNK_SIZE))
hfo = open("EscRegs.bin", "wb") hfo = open("EscRegs.bin", "wb")
for i in xrange(0, 0x200, READ_CHUNK_SIZE): for i in xrange(0x0, 0x100, READ_CHUNK_SIZE):
print ".", print ".",
req.arg = i>>1 for retry in xrange(5):
for retry in xrange(3):
tran.send(req)
try: try:
rsp = tran.recv() data = tran.execute(ReadRegs(BT.ESC, i>>1, "16s"))[0]
except LinkTimeoutException: except LinkTimeoutException:
continue continue
break break
hfo.write(rsp.data) else:
print "No response !"
break
hfo.write(data)
hfo.close() hfo.close()
link.close() link.close()

43
read_esc_ll.py Normal file
View File

@ -0,0 +1,43 @@
#!python2-32
from py9b.link.base import LinkOpenException, LinkTimeoutException
from py9b.link.tcp import TCPLink
from py9b.link.ble import BLELink
from py9b.link.serial import SerialLink
from py9b.transport.base import BaseTransport as BT
from py9b.transport.packet import BasePacket as PKT
from py9b.transport.xiaomi import XiaomiTransport
READ_CHUNK_SIZE = 0x40
#link = SerialLink()
#link = TCPLink()
link = BLELink()
with link:
print "Scanning..."
ports = link.scan()
print ports
tran = XiaomiTransport(link)
#link.open(("192.168.1.45", 6000))
link.open(ports[0][1])
print "Connected"
req = PKT(src=BT.HOST, dst=BT.ESC, cmd=0x01, arg=0, data=chr(READ_CHUNK_SIZE))
hfo = open("EscRegs.bin", "wb")
for i in xrange(0, 0x200, READ_CHUNK_SIZE):
print ".",
req.arg = i>>1
for retry in xrange(3):
tran.send(req)
try:
rsp = tran.recv()
except LinkTimeoutException:
continue
break
hfo.write(rsp.data)
hfo.close()
link.close()

64
sniffer.py Normal file
View File

@ -0,0 +1,64 @@
#!python2-32
from struct import unpack
from py9b.link.base import LinkOpenException, LinkTimeoutException
#from py9b.link.tcp import TCPLink
#from py9b.link.ble import BLELink
from py9b.link.serial import SerialLink
from py9b.transport.base import BaseTransport as BT
from py9b.transport.packet import BasePacket as PKT
from py9b.transport.xiaomi import XiaomiTransport
link = SerialLink()
#link = TCPLink()
#link = BLELink()
with link:
print "Scanning..."
ports = link.scan()
print ports
tran = XiaomiTransport(link)
#link.open(("192.168.1.45", 6000))
link.open(ports[0][1])
print "Connected"
last_esc_64 = ""
last_esc_65 = ""
last_ble_64 = ""
try:
while True:
try:
rsp = tran.recv()
if not rsp:
continue
if rsp.src==BT.HOST and rsp.dst==BT.ESC and rsp.cmd in (0x64, 0x65):
if len(rsp.data)==5:
if rsp.data==last_esc_65:
continue
ll, throttle, brake, u2, u3 = unpack("<BBBBB", rsp.data)
print "BLE->ESC: TH: %02X, BR: %02X, %02X %02X" % (throttle, brake, u2, u3)
last_esc_65 = rsp.data
continue
elif len(rsp.data)==7:
if rsp.data==last_esc_64:
continue
ll, throttle, brake, u2, u3, ver = unpack("<BBBBBH", rsp.data)
print "BLE->ESC: TH: %02X, BR: %02X, %02X %02X, VER: %04X" % (throttle, brake, u2, u3, ver)
last_esc_64 = rsp.data
continue
elif rsp.src==BT.HOST and rsp.dst==BT.BLE and rsp.cmd==0x64:
if len(rsp.data)==4:
if rsp.data==last_ble_64:
continue
u0, u1, u2, u3 = unpack("<BBBB", rsp.data)
print "ESC->BLE: %02X %02X %02X %02X" % (u0, u1, u2, u3)
last_ble_64 = rsp.data
continue
print rsp
except LinkTimeoutException:
pass
except KeyboardInterrupt:
pass
link.close()

View File

@ -1,3 +1,4 @@
#!python2-32
from py9b.link.base import LinkOpenException, LinkTimeoutException from py9b.link.base import LinkOpenException, LinkTimeoutException
from py9b.link.tcp import TCPLink from py9b.link.tcp import TCPLink
from py9b.link.ble import BLELink from py9b.link.ble import BLELink
@ -6,8 +7,8 @@ from py9b.transport.packet import BasePacket as PKT
from py9b.transport.xiaomi import XiaomiTransport from py9b.transport.xiaomi import XiaomiTransport
#link = SerialLink() #link = SerialLink()
#link = TCPLink() link = TCPLink()
link = BLELink() #link = BLELink()
with link: with link:
print "Scanning..." print "Scanning..."
@ -16,8 +17,8 @@ with link:
tran = XiaomiTransport(link) tran = XiaomiTransport(link)
#link.open(("192.168.1.45", 6000)) link.open(("192.168.1.45", 6000))
link.open(ports[0][1]) #link.open(ports[0][1])
print "Connected" print "Connected"
req = PKT(src=BT.HOST, dst=BT.ESC, cmd=0x02, arg=0x41, data="\xCE\xAB\x00\x00") req = PKT(src=BT.HOST, dst=BT.ESC, cmd=0x02, arg=0x41, data="\xCE\xAB\x00\x00")

82
wr_esc_sn.py Normal file
View File

@ -0,0 +1,82 @@
#!python2-32
from __future__ import print_function
from sys import exit
from py9b.link.base import LinkOpenException, LinkTimeoutException
from py9b.link.tcp import TCPLink
from py9b.link.ble import BLELink
from py9b.link.serial import SerialLink
from py9b.transport.base import BaseTransport as BT
from py9b.transport.packet import BasePacket as PKT
from py9b.transport.xiaomi import XiaomiTransport
from py9b.command.regio import ReadRegs, WriteRegs
from py9b.command.mfg import WriteSN
new_sn = "16133/00101234"
def CalcSnAuth(oldsn, newsn, uid3):
s = 0
for i in xrange(0x0E):
s += ord(oldsn[i])
s *= ord(newsn[i])
s += uid3+(uid3<<4)
s &= 0xFFFFFFFF
if (s & 0x80000000)!=0:
s = 0x100000000-s
return s % 1000000
#link = SerialLink(dump=True)
#link = TCPLink()
link = BLELink(dump=True)
with link:
print("Scanning...")
ports = link.scan()
print(ports)
tran = XiaomiTransport(link)
#link.open(("192.168.1.45", 6000))
link.open(ports[0][1])
print("Connected")
print("Pinging...")
for retry in xrange(20):
print(".", end="")
try:
old_sn = tran.execute(ReadRegs(BT.ESC, 0x10, "14s"))[0]
except LinkTimeoutException:
continue
break
else:
exit("Timed out !")
print("")
#lock
tran.execute(WriteRegs(BT.ESC, 0x70, "<B", 0x01))
old_sn = tran.execute(ReadRegs(BT.ESC, 0x10, "14s"))[0]
print("Old S/N:", old_sn)
uid3 = tran.execute(ReadRegs(BT.ESC, 0xDE, "<L"))[0]
print("UID3: %08X" % (uid3))
auth = CalcSnAuth(old_sn, new_sn, uid3)
print("Auth: %08X" % (auth))
for i in range(3):
try:
tran.execute(WriteSN(BT.ESC, new_sn, auth))
print("OK")
break
except LinkTimeoutException:
print("Timeout !")
old_sn = tran.execute(ReadRegs(BT.ESC, 0x10, "14s"))[0]
print("Current S/N:", old_sn)
link.close()