Fixes in SerialLink

Added command builder
Added register r/w commands
Added fw update commands
Added a custom command example
Added simple fw updater
legit-fork
flowswitch 2018-11-19 01:16:19 +01:00
parent 77d076a369
commit 6654c6bfe3
16 changed files with 388 additions and 10 deletions

74
fwupd.py Normal file
View File

@ -0,0 +1,74 @@
#!python2-32
from sys import argv, exit
from os.path import getsize
from py9b.link.base import LinkOpenException, LinkTimeoutException
from py9b.link.tcp import TCPLink
from py9b.link.ble import BLELink
from py9b.link.serial import SerialLink
from py9b.transport.base import BaseTransport as BT
from py9b.transport.xiaomi import XiaomiTransport
from py9b.command.regio import ReadRegs, WriteRegs
from py9b.command.update import *
def checksum(s, data):
for c in data:
s += ord(c)
return (s & 0xFFFFFFFF)
fw_dev = BT.BMS
fw_name = "bms.bin"
fw_size = getsize(fw_name)
fw_page_size = 0x80
link = SerialLink(timeout=0.5)
#link = TCPLink()
#link = BLELink()
with link:
print "Scanning..."
ports = link.scan()
print ports
tran = XiaomiTransport(link)
#link.open(("192.168.1.45", 6000))
link.open(ports[0][1])
print "Connected"
print "Pinging..."
for retry in xrange(20):
print ".",
try:
tran.execute(ReadRegs(BT.BMS, 0x10, "14s"))
except LinkTimeoutException:
continue
break
else:
exit("Timed out !")
print ""
hfi = open(fw_name, "rb")
print "Starting..."
tran.execute(StartUpdate(fw_dev, fw_size))
print "Writing..."
page = 0
chk = 0
while fw_size:
print "{0:X}".format(page*0x80)
chunk_sz = min(fw_size, fw_page_size)
data = hfi.read(chunk_sz)
chk = checksum(chk, data)
tran.execute(WriteUpdate(fw_dev, page, data))
page += 1
fw_size -= chunk_sz
hfi.close()
print "Finalizing..."
tran.execute(FinishUpdate(fw_dev, chk ^ 0xFFFFFFFF))
print "Reboot"
tran.execute(RebootUpdate(fw_dev))

0
py9b/command/__init__.py Normal file
View File

19
py9b/command/base.py Normal file
View File

@ -0,0 +1,19 @@
from ..transport.packet import BasePacket as PKT
from ..transport.base import BaseTransport as BT
class InvalidResponse(Exception):
pass
class BaseCommand(object):
def __init__(self, src=BT.HOST, dst=0, cmd=0, arg=0, data="", has_response=True):
self.has_response = has_response
self.request = PKT(src, dst, cmd, arg, data)
def handle_response(self, response):
return True
__all__ = ["BaseCommand", "InvalidResponse"]

17
py9b/command/custom.py Normal file
View File

@ -0,0 +1,17 @@
from struct import pack, unpack, calcsize
from .base import BaseCommand, InvalidResponse
class ReadMem(BaseCommand):
def __init__(self, dev, addr, format):
super(ReadMem, self).__init__(dst=dev, cmd=0x80, arg=calcsize(format), data=pack("<H", addr), has_response=True)
self.dev = dev
self.format = format
def handle_response(self, response):
if len(response.data)!=calcsize(self.format):
raise InvalidResponse("ReadMem {0:X}".format(self.dev))
return unpack(self.format, response.data)
__all__=["ReadMem"]

36
py9b/command/regio.py Normal file
View File

@ -0,0 +1,36 @@
from struct import pack, unpack, calcsize
from .base import BaseCommand, InvalidResponse
class ReadRegs(BaseCommand):
def __init__(self, dev, reg, format):
super(ReadRegs, self).__init__(dst=dev, cmd=0x01, arg=reg, data=pack("<B", calcsize(format)), has_response=True)
self.dev = dev
self.reg = reg
self.format = format
def handle_response(self, response):
if response.arg!=self.reg or len(response.data)!=calcsize(self.format):
raise InvalidResponse("ReadRegs {0:X}:{1:X}".format(self.dev, self.reg))
return unpack(self.format, response.data)
class WriteProtectError(Exception):
pass
class WriteRegs(BaseCommand):
def __init__(self, dev, reg, format, *args):
super(WriteRegs, self).__init__(dst=dev, cmd=0x02, arg=reg, data=pack(format, *args), has_response=True)
self.dev = dev
self.reg = reg
def handle_response(self, response):
if response.arg!=self.reg or len(response.data)!=1:
raise InvalidResponse("WriteRegs {0:X}:{1:X}".format(self.dev, self.reg))
if unpack("<B", response.data)[0]!=1:
raise WriteProtectError("WriteRegs {0:X}:{1:X}".format(self.dev, self.reg))
return True
__all__=["ReadRegs", "WriteRegs", "WriteProtectError"]

58
py9b/command/update.py Normal file
View File

@ -0,0 +1,58 @@
from struct import pack, unpack
from .base import BaseCommand, InvalidResponse
class UpdateError(Exception):
pass
class StartUpdate(BaseCommand):
def __init__(self, dev, size):
super(StartUpdate, self).__init__(dst=dev, cmd=0x07, data=pack("<L", size), has_response=True)
self.dev = dev
def handle_response(self, response):
if len(response.data)!=0:
raise InvalidResponse("StartUpdate {0:X}".format(self.dev))
if response.arg!=0:
raise UpdateError("StartUpdate {0:X} error {1:d}".format(self.dev, response.arg))
return True
class WriteUpdate(BaseCommand):
def __init__(self, dev, page, data):
super(WriteUpdate, self).__init__(dst=dev, cmd=0x08, arg=page, data=data, has_response=True)
self.dev = dev
self.page = page
def handle_response(self, response):
if len(response.data)!=0:
raise InvalidResponse("WriteUpdate {0:X} @{1:X}".format(self.dev, self.page))
if response.arg!=0:
raise UpdateError("WriteUpdate {0:X} @{1:X} error {2:d}".format(self.dev, self.page, response.arg))
return True
class FinishUpdate(BaseCommand):
def __init__(self, dev, checksum):
super(FinishUpdate, self).__init__(dst=dev, cmd=0x09, data=pack("<L", checksum), has_response=True)
self.dev = dev
def handle_response(self, response):
if len(response.data)!=0:
raise InvalidResponse("FinishUpdate {0:X}".format(self.dev))
if response.arg!=0:
raise UpdateError("FinishUpdate {0:X} error {1:d}".format(self.dev, response.arg))
return True
class RebootUpdate(BaseCommand):
def __init__(self, dev):
super(RebootUpdate, self).__init__(dst=dev, cmd=0x0A, has_response=False)
self.dev = dev
def handle_response(self, response):
return True
__all__ = ["UpdateError", "StartUpdate", "WriteUpdate", "FinishUpdate", "RebootUpdate"]

View File

@ -23,7 +23,7 @@ class SerialLink(BaseLink):
def scan(self):
ports = lp.comports()
res = [(port.device, "%04X:%04X" % (port.vid, port.pid)) for port in ports]
res = [("%s %04X:%04X" % (port.device, port.vid, port.pid), port.device) for port in ports]
return res

View File

@ -20,8 +20,16 @@ class BaseTransport(object):
def recv(self):
raise NotImplementedError()
def send(self, src, dst, cmd, param, data=""):
def send(self, src, dst, cmd, arg, data=""):
raise NotImplementedError()
def execute(self, command):
self.send(command.request)
if not command.has_response:
return True
#TODO: retry ?
rsp = self.recv()
return command.handle_response(rsp)
__all__ = ["checksum", "BaseTransport"]

View File

@ -1,15 +1,15 @@
from binascii import hexlify
class BasePacket(object):
def __init__(self, src=0, dst=0, cmd=0, reg=0, data=""):
def __init__(self, src=0, dst=0, cmd=0, arg=0, data=""):
self.src = src
self.dst = dst
self.cmd = cmd
self.reg = reg
self.arg = arg
self.data = data
def __str__(self):
return "%02X->%02X: %02X @%02X %s" % (self.src, self.dst, self.cmd, self.reg, hexlify(self.data).upper())
return "%02X->%02X: %02X @%02X %s" % (self.src, self.dst, self.cmd, self.arg, hexlify(self.data).upper())
__all__ = ["BasePacket"]

View File

@ -83,7 +83,7 @@ class XiaomiTransport(BT):
def send(self, packet):
dev = self._make_addr(packet.src, packet.dst)
pkt = pack("<BBBB", len(packet.data)+2, dev, packet.cmd, packet.reg)+packet.data
pkt = pack("<BBBB", len(packet.data)+2, dev, packet.cmd, packet.arg)+packet.data
pkt = "\x55\xAA" + pkt + pack("<H", checksum(pkt))
self.link.write(pkt)

43
read_bms.py Normal file
View File

@ -0,0 +1,43 @@
#!python2-32
from py9b.link.base import LinkOpenException, LinkTimeoutException
from py9b.link.tcp import TCPLink
from py9b.link.ble import BLELink
from py9b.link.serial import SerialLink
from py9b.transport.base import BaseTransport as BT
from py9b.transport.packet import BasePacket as PKT
from py9b.transport.xiaomi import XiaomiTransport
from py9b.command.regio import ReadRegs
READ_CHUNK_SIZE = 0x10
link = SerialLink(dump=True)
#link = TCPLink()
#link = BLELink()
with link:
print "Scanning..."
ports = link.scan()
print ports
tran = XiaomiTransport(link)
#link.open(("192.168.1.45", 6000))
link.open(ports[0][1])
print "Connected"
hfo = open("BmsRegs.bin", "wb")
for i in xrange(0x0, 0x100, READ_CHUNK_SIZE):
print ".",
for retry in xrange(5):
try:
data = tran.execute(ReadRegs(BT.BMS, i>>1, "16s"))[0]
except LinkTimeoutException:
continue
break
else:
print "No response !"
break
hfo.write(data)
hfo.close()
link.close()

46
read_bms_ll.py Normal file
View File

@ -0,0 +1,46 @@
#!python2-32
from py9b.link.base import LinkOpenException, LinkTimeoutException
from py9b.link.tcp import TCPLink
from py9b.link.ble import BLELink
from py9b.link.serial import SerialLink
from py9b.transport.base import BaseTransport as BT
from py9b.transport.packet import BasePacket as PKT
from py9b.transport.xiaomi import XiaomiTransport
READ_CHUNK_SIZE = 0x10
link = SerialLink(dump=True)
#link = TCPLink()
#link = BLELink()
with link:
print "Scanning..."
ports = link.scan()
print ports
tran = XiaomiTransport(link)
#link.open(("192.168.1.45", 6000))
link.open(ports[0][1])
print "Connected"
req = PKT(src=BT.HOST, dst=BT.BMS, cmd=0x01, arg=0, data=chr(READ_CHUNK_SIZE))
hfo = open("BmsRegs.bin", "wb")
for i in xrange(0x0, 0x100, READ_CHUNK_SIZE):
print ".",
req.arg = i>>1
for retry in xrange(5):
tran.send(req)
try:
rsp = tran.recv()
except LinkTimeoutException:
continue
break
else:
print "No response !"
break
hfo.write(rsp.data)
hfo.close()
link.close()

45
read_bms_mem.py Normal file
View File

@ -0,0 +1,45 @@
#!python2-32
from py9b.link.base import LinkOpenException, LinkTimeoutException
from py9b.link.tcp import TCPLink
from py9b.link.ble import BLELink
from py9b.link.serial import SerialLink
from py9b.transport.base import BaseTransport as BT
from py9b.transport.packet import BasePacket as PKT
from py9b.transport.xiaomi import XiaomiTransport
from py9b.command.custom import ReadMem
ADDR = 0x1000
SIZE = 0x800
READ_CHUNK_SIZE = 0x10
link = SerialLink(dump=True)
#link = TCPLink()
#link = BLELink()
with link:
print "Scanning..."
ports = link.scan()
print ports
tran = XiaomiTransport(link)
#link.open(("192.168.1.45", 6000))
link.open(ports[0][1])
print "Connected"
hfo = open("BmsEep.bin", "wb")
for i in xrange(ADDR, ADDR+SIZE, READ_CHUNK_SIZE):
print ".",
for retry in xrange(5):
try:
data = tran.execute(ReadMem(BT.BMS, i, "16s"))[0]
except LinkTimeoutException:
continue
break
else:
print "No response !"
break
hfo.write(data)
hfo.close()
link.close()

View File

@ -1,3 +1,4 @@
#!python2-32
from py9b.link.base import LinkOpenException, LinkTimeoutException
from py9b.link.tcp import TCPLink
from py9b.link.ble import BLELink
@ -22,12 +23,12 @@ with link:
link.open(ports[0][1])
print "Connected"
req = PKT(src=BT.HOST, dst=BT.ESC, cmd=0x01, reg=0, data=chr(READ_CHUNK_SIZE))
req = PKT(src=BT.HOST, dst=BT.ESC, cmd=0x01, arg=0, data=chr(READ_CHUNK_SIZE))
hfo = open("EscRegs.bin", "wb")
for i in xrange(0, 0x200, READ_CHUNK_SIZE):
print ".",
req.reg = i>>1
req.arg = i>>1
for retry in xrange(3):
tran.send(req)
try:

View File

@ -14,8 +14,8 @@ with TCPLink() as link:
#link.open(("192.168.1.45", 6000))
link.open(ports[0][1])
#req = PKT(src=BT.HOST, dst=BT.ESC, cmd=0x01, reg=0x10, data="\x10")
req = PKT(src=BT.HOST, dst=BT.BMS, cmd=0x01, reg=0x10, data="\x10")
#req = PKT(src=BT.HOST, dst=BT.ESC, cmd=0x01, arg=0x10, data="\x10")
req = PKT(src=BT.HOST, dst=BT.BMS, cmd=0x01, arg=0x10, data="\x10")
while raw_input("Press ENTER to send...")!="q":
tran.send(req)

31
wr_esc.py Normal file
View File

@ -0,0 +1,31 @@
from py9b.link.base import LinkOpenException, LinkTimeoutException
from py9b.link.tcp import TCPLink
from py9b.link.ble import BLELink
from py9b.transport.base import BaseTransport as BT
from py9b.transport.packet import BasePacket as PKT
from py9b.transport.xiaomi import XiaomiTransport
#link = SerialLink()
#link = TCPLink()
link = BLELink()
with link:
print "Scanning..."
ports = link.scan()
print ports
tran = XiaomiTransport(link)
#link.open(("192.168.1.45", 6000))
link.open(ports[0][1])
print "Connected"
req = PKT(src=BT.HOST, dst=BT.ESC, cmd=0x02, arg=0x41, data="\xCE\xAB\x00\x00")
tran.send(req)
try:
rsp = tran.recv()
finally:
link.close()
print rsp