legit-fork
flowswitch 2018-10-15 14:10:00 +02:00
parent d6d9b93a10
commit 856a226b6f
12 changed files with 384 additions and 0 deletions

View File

@ -1,2 +1,8 @@
# py9b
Ninebot/Xiaomi electric scooter communication library
## Requirements
* Python 2.6.x [www.python.org]
* PySerial [pip install pyserial] - for direct serial link backend
* PyGatt [pip install pygatt] - for BLED112 dongle backend
* nRFUARTBridge [https://github.com/flowswitch/nRFUARTBridge] - for Android BLE-TCP backend

0
py9b/__init__.py Normal file
View File

0
py9b/link/__init__.py Normal file
View File

34
py9b/link/base.py Normal file
View File

@ -0,0 +1,34 @@
class LinkTimeoutException(Exception):
pass
class LinkOpenException(Exception):
pass
class BaseLink(object):
DEF_TIMEOUT = 1
def __init__(self, timeout=DEF_TIMEOUT, dump=False):
self.dump = dump
self.timeout = timeout
def scan(self):
raise NotImplementedError()
def open(self, port):
raise NotImplementedError()
def close(self):
pass
def read(self, size):
raise NotImplementedError()
def write(self, data):
raise NotImplementedError()
__all__ = ["LinkTimeoutException", "LinkOpenException", "BaseLink"]

58
py9b/link/ble.py Normal file
View File

@ -0,0 +1,58 @@
"""BLE link using BlueGiga adapter via PyGatt/BGAPI"""
from __future__ import absolute_import
import pygatt
from binascii import hexlify
SCAN_TIMEOUT = 3
class BLELink():
def __init__(self, dump=False):
self.dev = None
self.dump = dump
def __enter__(self):
self.adapter = pygatt.BGAPIBackend()
self.adapter.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def scan(self):
res = []
devices = self.adapter.scan(timeout=SCAN_TIMEOUT)
for dev in devices:
if dev["name"].startswith(u"MISc"):
res.append((dev["name"], dev["address"]))
return res
def open(self, port):
self.dev = self.adapter.connect(port, address_type=pygatt.BLEAddressType.random)
def close(self):
if self.dev:
self.dev.disconnect()
self.dev = None
self.adapter.stop()
def read(self, size):
data = self.dev.read(size)
if self.dump:
print "<", hexlify(data).upper()
return data
def write(self, data):
if self.dump:
print ">", hexlify(data).upper()
return self.dev.write(data)
__all__ = ["BLELink"]

61
py9b/link/serial.py Normal file
View File

@ -0,0 +1,61 @@
"""Direct serial link"""
from __future__ import absolute_import
import serial
import serial.tools.list_ports as lp
from binascii import hexlify
from .base import BaseLink, LinkTimeoutException, LinkOpenException
class SerialLink():
def __init__(self, *args, **kwargs):
super(SerialLink, self).__init__(*args, **kwargs)
self.com = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def scan(self):
ports = lp.comports()
res = [(port.device, "%04X:%04X" % (port.vid, port.pid)) for port in ports]
return res
def open(self, port):
try:
self.com = serial.Serial(port, 115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=self.timeout)
except serial.SerialException:
raise LinkOpenException
def close(self):
if self.com:
self.com.close()
self.com = None
def read(self, size):
try:
data = self.com.read(size)
except serial.SerialTimeoutException:
raise LinkTimeoutException
if len(data)<size:
raise LinkTimeoutException
if self.dump:
print "<", hexlify(data).upper()
return data
def write(self, data):
if self.dump:
print ">", hexlify(data).upper()
self.com.write(data)
__all__ = ["SerialLink"]

69
py9b/link/tcp.py Normal file
View File

@ -0,0 +1,69 @@
"""TCP-BLE bridge link"""
from __future__ import absolute_import
import socket
from binascii import hexlify
from .base import BaseLink, LinkTimeoutException, LinkOpenException
HOST, PORT = "127.0.0.1", 6000
def recvall(sock, size):
data = ""
while len(data)<size:
try:
pkt = sock.recv(size-len(data))
except socket.timeout:
raise LinkTimeoutException()
data+=pkt
return data
class TCPLink(BaseLink):
def __init__(self, *args, **kwargs):
super(TCPLink, self).__init__(*args, **kwargs)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(self.timeout)
self.connected = False
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def scan(self):
res = [("Android UART Bridge", (HOST, PORT))]
return res
def open(self, port):
try:
self.sock.connect(port)
except socket.timeout:
raise LinkOpenException
self.connected = True
def close(self):
if self.connected:
self.sock.close()
self.connected = False
def read(self, size):
data = recvall(self.sock, size)
if data and self.dump:
print "<", hexlify(data).upper()
return data
def write(self, data):
if self.dump:
print ">", hexlify(data).upper()
self.sock.sendall(data)
__all__ = ["TCPLink"]

View File

27
py9b/transport/base.py Normal file
View File

@ -0,0 +1,27 @@
"""Transport abstract class"""
def checksum(data):
s = 0
for c in data:
s += ord(c)
return (s & 0xFFFF) ^ 0xFFFF
class BaseTransport(object):
DEV01 = 1
ESC = 0x20
BLE = 0x21
BMS = 0x22
HOST = 0x3E
def __init__(self, link):
self.link = link
def recv(self):
raise NotImplementedError()
def send(self, src, dst, cmd, param, data=""):
raise NotImplementedError()
__all__ = ["checksum", "BaseTransport"]

15
py9b/transport/packet.py Normal file
View File

@ -0,0 +1,15 @@
from binascii import hexlify
class BasePacket(object):
def __init__(self, src=0, dst=0, cmd=0, reg=0, data=""):
self.src = src
self.dst = dst
self.cmd = cmd
self.reg = reg
self.data = data
def __str__(self):
return "%02X->%02X: %02X @%02X %s" % (self.src, self.dst, self.cmd, self.reg, hexlify(self.data).upper())
__all__ = ["BasePacket"]

85
py9b/transport/xiaomi.py Normal file
View File

@ -0,0 +1,85 @@
"""Xiaomi packet transport"""
from struct import pack, unpack
from .base import checksum, BaseTransport as BT
from .packet import BasePacket
class XiaomiTransport(BT):
HEAD2ESC = 0x20
ESC2HEAD = 0x23
HEAD2BMS = 0x22
BMS2HEAD = 0x25
DEV01 = 0x01
_SaDa2Addr = { BT.HOST : { BT.DEV01 : DEV01, BT.ESC : HEAD2ESC, BT.BMS : HEAD2BMS },
BT.ESC : { BT.HOST : ESC2HEAD, BT.BMS : HEAD2BMS, BT.DEV01 : DEV01 },
BT.BMS : { BT.HOST : BMS2HEAD, BT.ESC : BMS2HEAD, BT.DEV01 : DEV01 },
BT.DEV01 : {BT.HOST : DEV01, BT.ESC : DEV01, BT.BMS : DEV01 } }
# TBC
_BleAddr2SaDa = { HEAD2ESC : (BT.HOST, BT.ESC),
ESC2HEAD : (BT.ESC, BT.HOST),
HEAD2BMS : (BT.HOST, BT.BMS),
BMS2HEAD : (BT.BMS, BT.HOST),
DEV01 : (BT.DEV01, BT.HOST) }
_BmsAddr2SaDa = { HEAD2ESC : (BT.BMS, BT.ESC),
ESC2HEAD : (BT.ESC, BT.BMS),
HEAD2BMS : (BT.ESC, BT.BMS),
BMS2HEAD : (BT.BMS, BT.ESC),
DEV01 : (BT.DEV01, BT.BMS) }
def __init__(self, link, device=BT.HOST):
super(XiaomiTransport, self).__init__(link)
self.device = device
def _make_addr(self, src, dst):
return XiaomiTransport._SaDa2Addr[src][dst]
def _split_addr(self, addr):
if self.device==BT.BMS:
return XiaomiTransport._BmsAddr2SaDa[addr]
else:
return XiaomiTransport._BleAddr2SaDa[addr]
def _wait_pre(self):
while True:
while True:
c = self.link.read(1)
if c=="\x55":
break
while True:
c = self.link.read(1)
if c=="\xAA":
return True
if c!="\x55":
break # start waiting 55 again, else - this is 55, so wait for AA
def recv(self):
self._wait_pre()
pkt = self.link.read(1)
l = ord(pkt)+3
for i in xrange(l):
pkt += self.link.read(1)
ck_calc = checksum(pkt[0:-2])
ck_pkt = unpack("<H", pkt[-2:])[0]
if ck_pkt!=ck_calc:
print "Checksum mismatch !"
return None
sa, da = self._split_addr(ord(pkt[1]))
return BasePacket(sa, da, ord(pkt[2]), ord(pkt[3]), pkt[4:-2]) # sa, da, cmd, param, data
def send(self, packet):
dev = self._make_addr(packet.src, packet.dst)
pkt = pack("<BBBB", len(packet.data)+2, dev, packet.cmd, packet.reg)+packet.data
pkt = "\x55\xAA" + pkt + pack("<H", checksum(pkt))
self.link.write(pkt)
__all__ = ["XiaomiTransport"]

29
tcp_test.py Normal file
View File

@ -0,0 +1,29 @@
from py9b.link.base import LinkOpenException, LinkTimeoutException
from py9b.link.tcp import TCPLink
from py9b.transport.base import BaseTransport as BT
from py9b.transport.packet import BasePacket as PKT
from py9b.transport.xiaomi import XiaomiTransport
#link = SerialLink()
with TCPLink() as link:
ports = link.scan()
print ports
tran = XiaomiTransport(link)
#link.open(("192.168.1.45", 6000))
link.open(ports[0][1])
#req = PKT(src=BT.HOST, dst=BT.ESC, cmd=0x01, reg=0x10, data="\x10")
req = PKT(src=BT.HOST, dst=BT.BMS, cmd=0x01, reg=0x10, data="\x10")
while raw_input("Press ENTER to send...")!="q":
tran.send(req)
try:
rsp = tran.recv()
except LinkTimeoutException:
print "No response"
continue
print rsp
link.close()