begin python 2to3 migration (automated)
parent
ecf20d57ae
commit
f9829f1cea
|
@ -0,0 +1,110 @@
|
|||
"""BLE link using BlueGiga adapter via PyGatt/BGAPI"""
|
||||
|
||||
|
||||
import pygatt
|
||||
from .base import BaseLink, LinkTimeoutException, LinkOpenException
|
||||
from binascii import hexlify
|
||||
|
||||
SCAN_TIMEOUT = 3
|
||||
|
||||
|
||||
try:
|
||||
import queue
|
||||
except ImportError:
|
||||
import queue as queue
|
||||
|
||||
class Fifo():
|
||||
def __init__(self):
|
||||
self.q = queue.Queue()
|
||||
|
||||
def write(self, data): # put bytes
|
||||
for b in data:
|
||||
self.q.put(b)
|
||||
|
||||
def read(self, size=1, timeout=None): # but read string
|
||||
res = ''
|
||||
for i in range(size):
|
||||
res += chr(self.q.get(True, timeout))
|
||||
return res
|
||||
|
||||
|
||||
#_cccd_uuid = '00002902-0000-1000-8000-00805f9b34fb'
|
||||
_rx_char_uuid = '6e400002-b5a3-f393-e0a9-e50e24dcca9e'
|
||||
_tx_char_uuid = '6e400003-b5a3-f393-e0a9-e50e24dcca9e'
|
||||
|
||||
_write_chunk_size = 20 # as in android dumps
|
||||
|
||||
class BLELink(BaseLink):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(BLELink, self).__init__(*args, **kwargs)
|
||||
self._adapter = None
|
||||
self._dev = None
|
||||
self._wr_handle = None
|
||||
self._rx_fifo = Fifo()
|
||||
|
||||
|
||||
def __enter__(self):
|
||||
self._adapter = pygatt.BGAPIBackend()
|
||||
self._adapter.start()
|
||||
return self
|
||||
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.close()
|
||||
|
||||
|
||||
def _make_rx_cb(self): # this is a closure :)
|
||||
def rx_cb(handle, value):
|
||||
self._rx_fifo.write(value)
|
||||
return rx_cb
|
||||
|
||||
|
||||
def scan(self):
|
||||
res = []
|
||||
devices = self._adapter.scan(timeout=SCAN_TIMEOUT)
|
||||
for dev in devices:
|
||||
if dev['name'].startswith(('MISc', 'NBSc')):
|
||||
res.append((dev['name'], dev['address']))
|
||||
return res
|
||||
|
||||
|
||||
def open(self, port):
|
||||
try:
|
||||
self._dev = self._adapter.connect(port, address_type=pygatt.BLEAddressType.random)
|
||||
self._dev.subscribe(_tx_char_uuid, callback=self._make_rx_cb())
|
||||
self._wr_handle = self._dev.get_handle(_rx_char_uuid)
|
||||
except pygatt.exceptions.NotConnectedError:
|
||||
raise LinkOpenException
|
||||
|
||||
|
||||
def close(self):
|
||||
if self._dev:
|
||||
self._dev.disconnect()
|
||||
self._dev = None
|
||||
if self._adapter:
|
||||
self._adapter.stop()
|
||||
|
||||
|
||||
def read(self, size):
|
||||
try:
|
||||
data = self._rx_fifo.read(size, timeout=self.timeout)
|
||||
except queue.Empty:
|
||||
raise LinkTimeoutException
|
||||
if self.dump:
|
||||
print('<', hexlify(data).upper())
|
||||
return data
|
||||
|
||||
|
||||
def write(self, data):
|
||||
if self.dump:
|
||||
print('>', hexlify(data).upper())
|
||||
size = len(data)
|
||||
ofs = 0
|
||||
while size:
|
||||
chunk_sz = min(size, _write_chunk_size)
|
||||
self._dev.char_write_handle(self._wr_handle, bytearray(data[ofs:ofs+chunk_sz]))
|
||||
ofs += chunk_sz
|
||||
size -= chunk_sz
|
||||
|
||||
|
||||
__all__ = ['BLELink']
|
|
@ -0,0 +1,109 @@
|
|||
"""BLE link using BlueGiga adapter via PyGatt/BGAPI"""
|
||||
|
||||
|
||||
import pygatt
|
||||
from .base import BaseLink, LinkTimeoutException, LinkOpenException
|
||||
from binascii import hexlify
|
||||
|
||||
SCAN_TIMEOUT = 3
|
||||
|
||||
|
||||
try:
|
||||
import queue
|
||||
except ImportError:
|
||||
import queue as queue
|
||||
|
||||
class Fifo():
|
||||
def __init__(self):
|
||||
self.q = queue.Queue()
|
||||
|
||||
def write(self, data): # put bytes
|
||||
for b in data:
|
||||
self.q.put(b)
|
||||
|
||||
def read(self, size=1, timeout=None): # but read string
|
||||
res = ''
|
||||
for i in range(size):
|
||||
res += chr(self.q.get(True, timeout))
|
||||
return res
|
||||
|
||||
|
||||
#_cccd_uuid = '00002902-0000-1000-8000-00805f9b34fb'
|
||||
_rx_char_uuid = eval(input('RX UUID?'))
|
||||
_tx_char_uuid = eval(input('TX UUID?'))
|
||||
_write_chunk_size = 20 # as in android dumps
|
||||
|
||||
class BLELink(BaseLink):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(BLELink, self).__init__(*args, **kwargs)
|
||||
self._adapter = None
|
||||
self._dev = None
|
||||
self._wr_handle = None
|
||||
self._rx_fifo = Fifo()
|
||||
|
||||
|
||||
def __enter__(self):
|
||||
self._adapter = pygatt.BGAPIBackend()
|
||||
self._adapter.start()
|
||||
return self
|
||||
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.close()
|
||||
|
||||
|
||||
def _make_rx_cb(self): # this is a closure :)
|
||||
def rx_cb(handle, value):
|
||||
self._rx_fifo.write(value)
|
||||
return rx_cb
|
||||
|
||||
|
||||
def scan(self):
|
||||
res = []
|
||||
devices = self._adapter.scan(timeout=SCAN_TIMEOUT)
|
||||
for dev in devices:
|
||||
if dev['name'].startswith(('MISc', 'NBSc')):
|
||||
res.append((dev['name'], dev['address']))
|
||||
return res
|
||||
|
||||
|
||||
def open(self, port):
|
||||
try:
|
||||
self._dev = self._adapter.connect(port, address_type=pygatt.BLEAddressType.random)
|
||||
self._dev.subscribe(_tx_char_uuid, callback=self._make_rx_cb())
|
||||
self._wr_handle = self._dev.get_handle(_rx_char_uuid)
|
||||
except pygatt.exceptions.NotConnectedError:
|
||||
raise LinkOpenException
|
||||
|
||||
|
||||
def close(self):
|
||||
if self._dev:
|
||||
self._dev.disconnect()
|
||||
self._dev = None
|
||||
if self._adapter:
|
||||
self._adapter.stop()
|
||||
|
||||
|
||||
def read(self, size):
|
||||
try:
|
||||
data = self._rx_fifo.read(size, timeout=self.timeout)
|
||||
except queue.Empty:
|
||||
raise LinkTimeoutException
|
||||
if self.dump:
|
||||
print('<', hexlify(data).upper())
|
||||
return data
|
||||
|
||||
|
||||
def write(self, data):
|
||||
if self.dump:
|
||||
print('>', hexlify(data).upper())
|
||||
size = len(data)
|
||||
ofs = 0
|
||||
while size:
|
||||
chunk_sz = min(size, _write_chunk_size)
|
||||
self._dev.char_write_handle(self._wr_handle, bytearray(data[ofs:ofs+chunk_sz]))
|
||||
ofs += chunk_sz
|
||||
size -= chunk_sz
|
||||
|
||||
|
||||
__all__ = ['BLELink']
|
|
@ -0,0 +1,243 @@
|
|||
"""BLE link using ABLE"""
|
||||
|
||||
|
||||
try:
|
||||
from able import GATT_SUCCESS, Advertisement, BluetoothDispatcher
|
||||
except ImportError:
|
||||
exit('error importing able')
|
||||
try:
|
||||
from .base import BaseLink, LinkTimeoutException, LinkOpenException
|
||||
except ImportError:
|
||||
exit('error importing .base')
|
||||
from binascii import hexlify
|
||||
from kivy.logger import Logger
|
||||
from kivy.properties import StringProperty
|
||||
|
||||
try:
|
||||
import queue
|
||||
except ImportError:
|
||||
import queue as queue
|
||||
|
||||
SCAN_TIMEOUT = 3
|
||||
|
||||
_write_chunk_size = 20
|
||||
|
||||
identity = bytearray([
|
||||
0x4e, 0x42, 0x21, 0x00, 0x00, 0x00, 0x00, 0xDE, # Ninebot Bluetooth ID 4E422100000000DE
|
||||
0x4e, 0x42, 0x21, 0x00, 0x00, 0x00, 0x00, 0xDF # Xiaomi Bluetooth ID 4E422100000000DF
|
||||
])
|
||||
|
||||
service_ids = {
|
||||
'retail': '6e400001-b5a3-f393-e0a9-e50e24dcca9e' #service UUID
|
||||
}
|
||||
|
||||
receive_ids = {
|
||||
'retail': '6e400002-b5a3-f393-e0a9-e50e24dcca9e' #receive characteristic UUID
|
||||
}
|
||||
|
||||
transmit_ids = {
|
||||
'retail': '6e400003-b5a3-f393-e0a9-e50e24dcca9e' #transmit characteristic UUID
|
||||
}
|
||||
|
||||
scoot_found = False
|
||||
|
||||
|
||||
class Fifo():
|
||||
def __init__(self):
|
||||
self.q = queue.Queue()
|
||||
|
||||
def write(self, data): # put bytes
|
||||
for b in data:
|
||||
self.q.put(b)
|
||||
|
||||
def read(self, size=1, timeout=None): # but read string
|
||||
res = ''
|
||||
for i in range(size):
|
||||
res += chr(self.q.get(True, timeout))
|
||||
return res
|
||||
|
||||
|
||||
class ScootBT(BluetoothDispatcher):
|
||||
def __init__(self):
|
||||
super(ScootBT, self).__init__()
|
||||
self.rx_fifo = Fifo()
|
||||
self.ble_device = None
|
||||
self.state = StringProperty()
|
||||
self.dump = True
|
||||
self.tx_characteristic = None
|
||||
self.rx_characteristic = None
|
||||
self.timeout = SCAN_TIMEOUT
|
||||
self.set_queue_timeout(self.timeout)
|
||||
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.close()
|
||||
|
||||
|
||||
def discover(self):
|
||||
self.start_scan()
|
||||
self.state = 'scan'
|
||||
print((self.state))
|
||||
|
||||
|
||||
def on_device(self, device, rssi, advertisement):
|
||||
global scoot_found
|
||||
if self.state != 'scan':
|
||||
return
|
||||
Logger.debug("on_device event {}".format(list(advertisement)))
|
||||
self.addr = device.getAddress()
|
||||
if self.addr and address.startswith(self.addr):
|
||||
print((self.addr))
|
||||
self.ble_device = device
|
||||
self.scoot_found = True
|
||||
self.stop_scan()
|
||||
else:
|
||||
for ad in advertisement:
|
||||
print(ad)
|
||||
if ad.ad_type == Advertisement.ad_types.manufacturer_specific_data:
|
||||
if ad.data.startswith(self.identity):
|
||||
scoot_found = True
|
||||
else:
|
||||
break
|
||||
elif ad.ad_type == Advertisement.ad_types.complete_local_name:
|
||||
name = str(ad.data)
|
||||
if scoot_found:
|
||||
self.state = 'found'
|
||||
print((self.state))
|
||||
self.ble_device = device
|
||||
Logger.debug("Scooter detected: {}".format(name))
|
||||
self.stop_scan()
|
||||
|
||||
|
||||
def on_scan_completed(self):
|
||||
if self.ble_device:
|
||||
self.connect_gatt(self.ble_device)
|
||||
self.state = 'connected'
|
||||
print((self.state))
|
||||
else:
|
||||
self.start_scan()
|
||||
|
||||
|
||||
def on_connection_state_change(self, status, state):
|
||||
if status == GATT_SUCCESS and state:
|
||||
self.discover_services()
|
||||
self.state = 'discover'
|
||||
print((self.state))
|
||||
else:
|
||||
self.close_gatt()
|
||||
self.rx_characteristic = None
|
||||
self.tx_characteristic = None
|
||||
self.services = None
|
||||
|
||||
|
||||
def on_services(self, status, services):
|
||||
self.services = services
|
||||
for uuid in list(receive_ids.values()):
|
||||
self.rx_characteristic = self.services.search(uuid)
|
||||
print(('RX: '+uuid))
|
||||
for uuid in list(transmit_ids.values()):
|
||||
self.tx_characteristic = self.services.search(uuid)
|
||||
print(('TX: '+uuid))
|
||||
self.enable_notifications(self.tx_characteristic)
|
||||
|
||||
|
||||
def on_characteristic_changed(self, characteristic):
|
||||
if characteristic == self.tx_characteristic:
|
||||
data = characteristic.getValue()
|
||||
self.rx_fifo.write(data)
|
||||
|
||||
|
||||
def open(self, port):
|
||||
self.addr = port
|
||||
if self.ble_device == None:
|
||||
self.discover()
|
||||
if self.state!='connected':
|
||||
self.connect_gatt(self.ble_device)
|
||||
else:
|
||||
return
|
||||
|
||||
|
||||
def close(self):
|
||||
if self.ble_device != None:
|
||||
self.close_gatt()
|
||||
self.services = None
|
||||
print('close')
|
||||
|
||||
|
||||
def read(self, size):
|
||||
print('read')
|
||||
if self.ble_device:
|
||||
try:
|
||||
data = self.rx_fifo.read(size, timeout=self.timeout)
|
||||
except queue.Empty:
|
||||
raise LinkTimeoutException
|
||||
if self.dump:
|
||||
print('<', hexlify(data).upper())
|
||||
return data
|
||||
else:
|
||||
print('BLE not connected')
|
||||
self.discover()
|
||||
|
||||
|
||||
def write(self, data):
|
||||
print('write')
|
||||
if self.ble_device:
|
||||
if self.dump:
|
||||
print('>', hexlify(data).upper())
|
||||
size = len(data)
|
||||
ofs = 0
|
||||
while size:
|
||||
chunk_sz = min(size, _write_chunk_size)
|
||||
self.write_characteristic(self.rx_characteristic, bytearray(data[ofs:ofs+chunk_sz]))
|
||||
ofs += chunk_sz
|
||||
size -= chunk_sz
|
||||
else:
|
||||
print('BLE not connected')
|
||||
self.discover()
|
||||
|
||||
|
||||
def scan(self):
|
||||
self.discover()
|
||||
|
||||
|
||||
class BLELink(BaseLink):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(BLELink, self).__init__(*args, **kwargs)
|
||||
self._adapter = None
|
||||
|
||||
|
||||
def __enter__(self):
|
||||
self._adapter = ScootBT()
|
||||
self._adapter.discover()
|
||||
return self
|
||||
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.close()
|
||||
|
||||
|
||||
def scan(self):
|
||||
devices = self._adapter.scan()
|
||||
|
||||
|
||||
def open(self, port):
|
||||
self._adapter.open(port)
|
||||
|
||||
|
||||
def close(self):
|
||||
self._adapter.close()
|
||||
|
||||
|
||||
def read(self, size):
|
||||
self._adapter.read(size)
|
||||
|
||||
|
||||
def write(self, data):
|
||||
self._adapter.write(data)
|
||||
|
||||
|
||||
__all__ = ['BLELink']
|
|
@ -0,0 +1,61 @@
|
|||
"""Direct serial link"""
|
||||
|
||||
|
||||
import serial
|
||||
import serial.tools.list_ports as lp
|
||||
from binascii import hexlify
|
||||
from .base import BaseLink, LinkTimeoutException, LinkOpenException
|
||||
|
||||
|
||||
class SerialLink(BaseLink):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(SerialLink, self).__init__(*args, **kwargs)
|
||||
self.com = None
|
||||
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.close()
|
||||
|
||||
|
||||
def scan(self):
|
||||
ports = lp.comports()
|
||||
res = [("%s %04X:%04X" % (port.device, port.vid, port.pid), port.device) for port in ports]
|
||||
return res
|
||||
|
||||
|
||||
def open(self, port):
|
||||
try:
|
||||
self.com = serial.Serial(port, 115200, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, timeout=self.timeout)
|
||||
except serial.SerialException:
|
||||
raise LinkOpenException
|
||||
|
||||
|
||||
def close(self):
|
||||
if self.com:
|
||||
self.com.close()
|
||||
self.com = None
|
||||
|
||||
|
||||
def read(self, size):
|
||||
try:
|
||||
data = self.com.read(size)
|
||||
except serial.SerialTimeoutException:
|
||||
raise LinkTimeoutException
|
||||
if len(data)<size:
|
||||
raise LinkTimeoutException
|
||||
if self.dump:
|
||||
print("<", hexlify(data).upper())
|
||||
return data
|
||||
|
||||
|
||||
def write(self, data):
|
||||
if self.dump:
|
||||
print(">", hexlify(data).upper())
|
||||
self.com.write(data)
|
||||
|
||||
|
||||
__all__ = ["SerialLink"]
|
|
@ -0,0 +1,80 @@
|
|||
"""TCP-BLE bridge link"""
|
||||
|
||||
import socket
|
||||
from binascii import hexlify
|
||||
from .base import BaseLink, LinkTimeoutException, LinkOpenException
|
||||
|
||||
HOST, PORT = "127.0.0.1", 6000
|
||||
|
||||
_write_chunk_size = 20 # 20 as in android dumps
|
||||
|
||||
def recvall(sock, size):
|
||||
data = ""
|
||||
while len(data)<size:
|
||||
try:
|
||||
pkt = sock.recv(size-len(data))
|
||||
except socket.timeout:
|
||||
raise LinkTimeoutException()
|
||||
data+=pkt
|
||||
return data
|
||||
|
||||
|
||||
class TCPLink(BaseLink):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(TCPLink, self).__init__(*args, **kwargs)
|
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.sock.settimeout(self.timeout)
|
||||
self.connected = False
|
||||
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.close()
|
||||
|
||||
|
||||
def scan(self):
|
||||
res = [("Android UART Bridge", HOST+':'+str(PORT))]
|
||||
return res
|
||||
|
||||
|
||||
def open(self, port):
|
||||
p = port.partition(':')
|
||||
host = p[0]
|
||||
port = int(p[2], 10)
|
||||
print(host, port)
|
||||
try:
|
||||
self.sock.connect((host, port))
|
||||
except socket.timeout:
|
||||
raise LinkOpenException
|
||||
self.connected = True
|
||||
|
||||
|
||||
def close(self):
|
||||
if self.connected:
|
||||
self.sock.close()
|
||||
self.connected = False
|
||||
|
||||
|
||||
def read(self, size):
|
||||
data = recvall(self.sock, size)
|
||||
if data and self.dump:
|
||||
print("<", hexlify(data).upper())
|
||||
return data
|
||||
|
||||
|
||||
def write(self, data):
|
||||
if self.dump:
|
||||
print(">", hexlify(data).upper())
|
||||
size = len(data)
|
||||
ofs = 0
|
||||
while size:
|
||||
chunk_sz = min(size, _write_chunk_size)
|
||||
self.sock.sendall(data[ofs:ofs+chunk_sz])
|
||||
ofs += chunk_sz
|
||||
size -= chunk_sz
|
||||
|
||||
|
||||
__all__ = ["TCPLink"]
|
|
@ -0,0 +1,243 @@
|
|||
"""BLE link using ABLE"""
|
||||
|
||||
from __future__ import absolute_import
|
||||
try:
|
||||
from able import GATT_SUCCESS, Advertisement, BluetoothDispatcher
|
||||
except ImportError:
|
||||
exit('error importing able')
|
||||
try:
|
||||
from .base import BaseLink, LinkTimeoutException, LinkOpenException
|
||||
except ImportError:
|
||||
exit('error importing .base')
|
||||
from binascii import hexlify
|
||||
from kivy.logger import Logger
|
||||
from kivy.properties import StringProperty
|
||||
|
||||
try:
|
||||
import queue
|
||||
except ImportError:
|
||||
import Queue as queue
|
||||
|
||||
SCAN_TIMEOUT = 3
|
||||
|
||||
_write_chunk_size = 20
|
||||
|
||||
identity = bytearray([
|
||||
0x4e, 0x42, 0x21, 0x00, 0x00, 0x00, 0x00, 0xDE, # Ninebot Bluetooth ID 4E422100000000DE
|
||||
0x4e, 0x42, 0x21, 0x00, 0x00, 0x00, 0x00, 0xDF # Xiaomi Bluetooth ID 4E422100000000DF
|
||||
])
|
||||
|
||||
service_ids = {
|
||||
'retail': '6e400001-b5a3-f393-e0a9-e50e24dcca9e' #service UUID
|
||||
}
|
||||
|
||||
receive_ids = {
|
||||
'retail': '6e400002-b5a3-f393-e0a9-e50e24dcca9e' #receive characteristic UUID
|
||||
}
|
||||
|
||||
transmit_ids = {
|
||||
'retail': '6e400003-b5a3-f393-e0a9-e50e24dcca9e' #transmit characteristic UUID
|
||||
}
|
||||
|
||||
scoot_found = False
|
||||
|
||||
|
||||
class Fifo():
|
||||
def __init__(self):
|
||||
self.q = queue.Queue()
|
||||
|
||||
def write(self, data): # put bytes
|
||||
for b in data:
|
||||
self.q.put(b)
|
||||
|
||||
def read(self, size=1, timeout=None): # but read string
|
||||
res = ''
|
||||
for i in xrange(size):
|
||||
res += chr(self.q.get(True, timeout))
|
||||
return res
|
||||
|
||||
|
||||
class ScootBT(BluetoothDispatcher):
|
||||
def __init__(self):
|
||||
super(ScootBT, self).__init__()
|
||||
self.rx_fifo = Fifo()
|
||||
self.ble_device = None
|
||||
self.state = StringProperty()
|
||||
self.dump = True
|
||||
self.tx_characteristic = None
|
||||
self.rx_characteristic = None
|
||||
self.timeout = SCAN_TIMEOUT
|
||||
self.set_queue_timeout(self.timeout)
|
||||
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.close()
|
||||
|
||||
|
||||
def discover(self):
|
||||
self.start_scan()
|
||||
self.state = 'scan'
|
||||
print(self.state)
|
||||
|
||||
|
||||
def on_device(self, device, rssi, advertisement):
|
||||
global scoot_found
|
||||
if self.state != 'scan':
|
||||
return
|
||||
Logger.debug("on_device event {}".format(list(advertisement)))
|
||||
self.addr = device.getAddress()
|
||||
if self.addr and address.startswith(self.addr):
|
||||
print(self.addr)
|
||||
self.ble_device = device
|
||||
self.scoot_found = True
|
||||
self.stop_scan()
|
||||
else:
|
||||
for ad in advertisement:
|
||||
print(ad)
|
||||
if ad.ad_type == Advertisement.ad_types.manufacturer_specific_data:
|
||||
if ad.data.startswith(self.identity):
|
||||
scoot_found = True
|
||||
else:
|
||||
break
|
||||
elif ad.ad_type == Advertisement.ad_types.complete_local_name:
|
||||
name = str(ad.data)
|
||||
if scoot_found:
|
||||
self.state = 'found'
|
||||
print(self.state)
|
||||
self.ble_device = device
|
||||
Logger.debug("Scooter detected: {}".format(name))
|
||||
self.stop_scan()
|
||||
|
||||
|
||||
def on_scan_completed(self):
|
||||
if self.ble_device:
|
||||
self.connect_gatt(self.ble_device)
|
||||
self.state = 'connected'
|
||||
print(self.state)
|
||||
else:
|
||||
self.start_scan()
|
||||
|
||||
|
||||
def on_connection_state_change(self, status, state):
|
||||
if status == GATT_SUCCESS and state:
|
||||
self.discover_services()
|
||||
self.state = 'discover'
|
||||
print(self.state)
|
||||
else:
|
||||
self.close_gatt()
|
||||
self.rx_characteristic = None
|
||||
self.tx_characteristic = None
|
||||
self.services = None
|
||||
|
||||
|
||||
def on_services(self, status, services):
|
||||
self.services = services
|
||||
for uuid in receive_ids.values():
|
||||
self.rx_characteristic = self.services.search(uuid)
|
||||
print('RX: '+uuid)
|
||||
for uuid in transmit_ids.values():
|
||||
self.tx_characteristic = self.services.search(uuid)
|
||||
print('TX: '+uuid)
|
||||
self.enable_notifications(self.tx_characteristic)
|
||||
|
||||
|
||||
def on_characteristic_changed(self, characteristic):
|
||||
if characteristic == self.tx_characteristic:
|
||||
data = characteristic.getValue()
|
||||
self.rx_fifo.write(data)
|
||||
|
||||
|
||||
def open(self, port):
|
||||
self.addr = port
|
||||
if self.ble_device == None:
|
||||
self.discover()
|
||||
if self.state!='connected':
|
||||
self.connect_gatt(self.ble_device)
|
||||
else:
|
||||
return
|
||||
|
||||
|
||||
def close(self):
|
||||
if self.ble_device != None:
|
||||
self.close_gatt()
|
||||
self.services = None
|
||||
print('close')
|
||||
|
||||
|
||||
def read(self, size):
|
||||
print('read')
|
||||
if self.ble_device:
|
||||
try:
|
||||
data = self.rx_fifo.read(size, timeout=self.timeout)
|
||||
except queue.Empty:
|
||||
raise LinkTimeoutException
|
||||
if self.dump:
|
||||
print '<', hexlify(data).upper()
|
||||
return data
|
||||
else:
|
||||
print('BLE not connected')
|
||||
self.discover()
|
||||
|
||||
|
||||
def write(self, data):
|
||||
print('write')
|
||||
if self.ble_device:
|
||||
if self.dump:
|
||||
print '>', hexlify(data).upper()
|
||||
size = len(data)
|
||||
ofs = 0
|
||||
while size:
|
||||
chunk_sz = min(size, _write_chunk_size)
|
||||
self.write_characteristic(self.rx_characteristic, bytearray(data[ofs:ofs+chunk_sz]))
|
||||
ofs += chunk_sz
|
||||
size -= chunk_sz
|
||||
else:
|
||||
print('BLE not connected')
|
||||
self.discover()
|
||||
|
||||
|
||||
def scan(self):
|
||||
self.discover()
|
||||
|
||||
|
||||
class BLELink(BaseLink):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(BLELink, self).__init__(*args, **kwargs)
|
||||
self._adapter = None
|
||||
|
||||
|
||||
def __enter__(self):
|
||||
self._adapter = ScootBT()
|
||||
self._adapter.discover()
|
||||
return self
|
||||
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.close()
|
||||
|
||||
|
||||
def scan(self):
|
||||
devices = self._adapter.scan()
|
||||
|
||||
|
||||
def open(self, port):
|
||||
self._adapter.open(port)
|
||||
|
||||
|
||||
def close(self):
|
||||
self._adapter.close()
|
||||
|
||||
|
||||
def read(self, size):
|
||||
self._adapter.read(size)
|
||||
|
||||
|
||||
def write(self, data):
|
||||
self._adapter.write(data)
|
||||
|
||||
|
||||
__all__ = ['BLELink']
|
|
@ -0,0 +1,47 @@
|
|||
"""Ninebot packet transport"""
|
||||
from struct import pack, unpack
|
||||
from .base import checksum, BaseTransport as BT
|
||||
from .packet import BasePacket
|
||||
|
||||
|
||||
class NinebotTransport(BT):
|
||||
def __init__(self, link, device=BT.HOST):
|
||||
super(NinebotTransport, self).__init__(link)
|
||||
self.device = device
|
||||
|
||||
|
||||
def _wait_pre(self):
|
||||
while True:
|
||||
while True:
|
||||
c = self.link.read(1)
|
||||
if c=="\x5A":
|
||||
break
|
||||
while True:
|
||||
c = self.link.read(1)
|
||||
if c=="\xA5":
|
||||
return True
|
||||
if c!="\x5A":
|
||||
break # start waiting 5A again, else - this is 5A, so wait for A5
|
||||
|
||||
|
||||
def recv(self):
|
||||
self._wait_pre()
|
||||
pkt = self.link.read(1)
|
||||
l = ord(pkt)+6
|
||||
for i in range(l):
|
||||
pkt += self.link.read(1)
|
||||
ck_calc = checksum(pkt[0:-2])
|
||||
ck_pkt = unpack("<H", pkt[-2:])[0]
|
||||
if ck_pkt!=ck_calc:
|
||||
print("Checksum mismatch !")
|
||||
return None
|
||||
return BasePacket(ord(pkt[1]), ord(pkt[2]), ord(pkt[3]), ord(pkt[4]), pkt[5:-2]) # sa, da, cmd, arg, data
|
||||
|
||||
|
||||
def send(self, packet):
|
||||
pkt = pack("<BBBBB", len(packet.data), packet.src, packet.dst, packet.cmd, packet.arg)+packet.data
|
||||
pkt = "\x5A\xA5" + pkt + pack("<H", checksum(pkt))
|
||||
self.link.write(pkt)
|
||||
|
||||
|
||||
__all__ = ["NinebotTransport"]
|
|
@ -0,0 +1,95 @@
|
|||
"""Xiaomi packet transport"""
|
||||
from struct import pack, unpack
|
||||
from .base import checksum, BaseTransport as BT
|
||||
from .packet import BasePacket
|
||||
|
||||
|
||||
class XiaomiTransport(BT):
|
||||
MASTER2ESC = 0x20
|
||||
ESC2MASTER = 0x23
|
||||
|
||||
MASTER2BLE = 0x21
|
||||
BLE2MASTER = 0x24
|
||||
|
||||
MASTER2BMS = 0x22
|
||||
BMS2MASTER = 0x25
|
||||
|
||||
MOTOR = 0x01
|
||||
DEVFF = 0xFF
|
||||
|
||||
_SaDa2Addr = { BT.HOST : { BT.MOTOR : MOTOR, BT.ESC : MASTER2ESC, BT.BLE : MASTER2BLE, BT.BMS : MASTER2BMS },
|
||||
BT.ESC : { BT.HOST : ESC2MASTER, BT.BLE : MASTER2BLE, BT.BMS : MASTER2BMS, BT.MOTOR : MOTOR },
|
||||
BT.BMS : { BT.HOST : BMS2MASTER, BT.ESC : BMS2MASTER, BT.MOTOR : MOTOR },
|
||||
BT.MOTOR : {BT.HOST : MOTOR, BT.ESC : MOTOR, BT.BMS : MOTOR } }
|
||||
|
||||
# TBC
|
||||
_BleAddr2SaDa = { MASTER2ESC : (BT.HOST, BT.ESC),
|
||||
ESC2MASTER : (BT.ESC, BT.HOST),
|
||||
MASTER2BMS : (BT.HOST, BT.BMS),
|
||||
BMS2MASTER : (BT.BMS, BT.HOST),
|
||||
MASTER2BLE : (BT.HOST, BT.BLE),
|
||||
BLE2MASTER : (BT.BLE, BT.HOST),
|
||||
MOTOR : (BT.MOTOR, BT.HOST) }
|
||||
|
||||
_BmsAddr2SaDa = { MASTER2ESC : (BT.BMS, BT.ESC),
|
||||
ESC2MASTER : (BT.ESC, BT.BMS),
|
||||
MASTER2BMS : (BT.ESC, BT.BMS),
|
||||
BMS2MASTER : (BT.BMS, BT.ESC),
|
||||
MASTER2BLE : (BT.BMS, BT.BLE),
|
||||
BLE2MASTER : (BT.BLE, BT.BMS),
|
||||
MOTOR : (BT.MOTOR, BT.BMS) }
|
||||
|
||||
|
||||
def __init__(self, link, device=BT.HOST):
|
||||
super(XiaomiTransport, self).__init__(link)
|
||||
self.device = device
|
||||
|
||||
|
||||
def _make_addr(self, src, dst):
|
||||
return XiaomiTransport._SaDa2Addr[src][dst]
|
||||
|
||||
|
||||
def _split_addr(self, addr):
|
||||
if self.device==BT.BMS:
|
||||
return XiaomiTransport._BmsAddr2SaDa[addr]
|
||||
else:
|
||||
return XiaomiTransport._BleAddr2SaDa[addr]
|
||||
|
||||
|
||||
def _wait_pre(self):
|
||||
while True:
|
||||
while True:
|
||||
c = self.link.read(1)
|
||||
if c=="\x55":
|
||||
break
|
||||
while True:
|
||||
c = self.link.read(1)
|
||||
if c=="\xAA":
|
||||
return True
|
||||
if c!="\x55":
|
||||
break # start waiting 55 again, else - this is 55, so wait for AA
|
||||
|
||||
|
||||
def recv(self):
|
||||
self._wait_pre()
|
||||
pkt = self.link.read(1)
|
||||
l = ord(pkt)+3
|
||||
for i in range(l):
|
||||
pkt += self.link.read(1)
|
||||
ck_calc = checksum(pkt[0:-2])
|
||||
ck_pkt = unpack("<H", pkt[-2:])[0]
|
||||
if ck_pkt!=ck_calc:
|
||||
print("Checksum mismatch !")
|
||||
return None
|
||||
sa, da = self._split_addr(ord(pkt[1]))
|
||||
return BasePacket(sa, da, ord(pkt[2]), ord(pkt[3]), pkt[4:-2]) # sa, da, cmd, arg, data
|
||||
|
||||
|
||||
def send(self, packet):
|
||||
dev = self._make_addr(packet.src, packet.dst)
|
||||
pkt = pack("<BBBB", len(packet.data)+2, dev, packet.cmd, packet.arg)+packet.data
|
||||
pkt = "\x55\xAA" + pkt + pack("<H", checksum(pkt))
|
||||
self.link.write(pkt)
|
||||
|
||||
|
||||
__all__ = ["XiaomiTransport"]
|
Loading…
Reference in New Issue