py9b/cli.py

276 lines
9.4 KiB
Python

import click
from py9b.link.base import LinkTimeoutException
from py9b.transport.base import BaseTransport as BT
from py9b.transport.packet import BasePacket as PKT
from py9b.command.regio import ReadRegs, WriteRegs
class Connection:
def __init__(self, transport, link, address):
self.transport = transport
self.link = link
self.address = address
def __enter__(self):
link = None
if self.link == 'bleak':
from py9b.link.bleak import BleakLink
link = BleakLink()
elif self.link == 'tcp':
from py9b.link.tcp import TCPLink
link = TCPLink()
elif self.link == 'serial':
from py9b.link.serial import SerialLink
link = SerialLink(timeout=0.1)
link.__enter__()
if not self.address:
ports = link.scan()
if not ports:
raise Exception('No devices found')
self.address = ports[0]
link.open(self.address)
transport = None
if self.transport == 'ninebot':
from py9b.transport.ninebot import NinebotTransport
transport = NinebotTransport(link)
elif self.transport == 'xiaomi':
from py9b.transport.xiaomi import XiaomiTransport
transport = XiaomiTransport(link)
if transport.execute(ReadRegs(BT.ESC, 0x68, "<H"))[0] > 0x081 and self.link.startswith('ble'):
transport.keys = link.fetch_keys()
transport.recover_keys()
print('Keys recovered')
self._transport = transport
self._link = link
return transport
def __exit__(self, a, b, c):
self._link.__exit__(a, b, c)
@click.group()
@click.option('--transport', default='ninebot',
help='Transport to use (one of xiaomi, ninebot)')
@click.option('--link', default='bleak',
help='Link implementation to use (one of serial, tcp, bleak)')
@click.option('--address', default=None,
help='Device address to use (dependent on link, defaults to automatic scan)')
@click.pass_context
def cli(ctx, transport, link, address):
ctx.obj = Connection(transport, link, address)
@cli.command()
@click.pass_context
def dong(ctx):
with ctx.obj as tran:
#tran.execute(WriteRegs(BT.ESC, 0x1e, "<H", 2))
print(tran.execute(WriteRegs(BT.ESC, 0x1e, "<H", 0xe395 | 0x800)))
#print(tran.execute(WriteRegs(BT.ESC, 0x7e, "<H", 1)))
@cli.command()
@click.pass_context
def dump(ctx):
with ctx.obj as tran:
for offset in range(256):
try:
print('0x%02x: %04x' % (offset, tran.execute(ReadRegs(BT.ESC, offset, "<H"))[0]))
except Exception as exc:
print('0x%02x: %s' % (offset, exc))
@cli.command()
@click.pass_context
def disable_iot(ctx):
with ctx.obj as tran:
try:
print(tran.execute(WriteRegs(BT.ESC, 0x8f, "<H", 0x0)))
except Exception as exc:
print(exc)
print('Done')
@cli.command()
@click.pass_context
def sniff(ctx):
with ctx.obj as tran:
while True:
try:
print(tran.recv())
except LinkTimeoutException as exc:
pass
except Exception as exc:
print(exc)
@cli.command()
@click.pass_context
def enable_ble(ctx):
with ctx.obj as tran:
fun = tran.execute(ReadRegs(BT.ESC, 0x80, "<H"))[0]
print('FUN_BOOL_1: %04x' % (fun))
try:
print(tran.execute(WriteRegs(BT.ESC, 0x80, "<H", fun | 0x400)))
except Exception as exc:
print(exc)
@cli.command()
@click.pass_context
def ble_reinit(ctx):
with ctx.obj as tran:
print(tran.execute(WriteRegs(BT.ESC, 0x4d, "<H", 1)))
@cli.command()
@click.pass_context
def reginfo(ctx):
with ctx.obj as tran:
#print(tran.execute(WriteRegs(BT.ESC, 0x7e, "<H", 1)))
# 0xe395
#print(tran.execute(WriteRegs(BT.ESC, 0x90, "<H", 1)))
#print('FUN_BOOL_1: %s' % tran.execute(ReadRegs(BT.ESC, 0x1e, "<H")))
#print(tran.execute(WriteRegs(BT.ESC, 0x71, "<H", 1)))
#print('headlight: %s' % tran.execute(ReadRegs(BT.ESC, 0x90, "<H")))
print('beep: %s' % tran.execute(ReadRegs(BT.ESC, 0x91, "<H")))
print('beep control: %s' % tran.execute(ReadRegs(BT.ESC, 0x92, "<H")))
print('work_sys: %s' % tran.execute(ReadRegs(BT.ESC, 0x7e, "<H")))
print('FUN_BOOL_1: %04x' % (tran.execute(ReadRegs(BT.ESC, 0x80, "<H"))[0]))
print('FUN_BOOL_2: %04x' % (tran.execute(ReadRegs(BT.ESC, 0x81, "<H"))[0]))
print('FUN_BOOL_X: %04x' % (tran.execute(ReadRegs(BT.ESC, 0x8f, "<H"))[0]))
print('speedlimit1: %d' % tran.execute(ReadRegs(BT.ESC, 0x72, "<H")))
print('speedlimit2: %d' % tran.execute(ReadRegs(BT.ESC, 0x73, "<H")))
print('speedlimit3: %d' % tran.execute(ReadRegs(BT.ESC, 0x74, "<H")))
@cli.command()
@click.pass_context
def heartbeat(ctx):
with ctx.obj as tran:
req = PKT(
src=BT.HOST, dst=BT.ESC, cmd=0x55, arg=0x7c, data=bytearray([0x7c]))
while True:
print('heartbeat...')
tran.send(req)
try:
for n in range(5):
rsp = tran.recv()
print('response:', rsp)
except LinkTimeoutException:
print('timeout')
import time
time.sleep(0.5)
continue
@cli.command()
@click.pass_context
def powerdown(ctx):
with ctx.obj as tran:
tran.execute(WriteRegs(BT.ESC, 0x79, "<H", 0x0001))
print('Done')
@cli.command()
@click.pass_context
def lock(ctx):
with ctx.obj as tran:
tran.execute(WriteRegs(BT.ESC, 0x70, "<H", 0x0001))
print('Done')
@cli.command()
@click.pass_context
def unlock(ctx):
with ctx.obj as tran:
tran.execute(WriteRegs(BT.ESC, 0x71, "<H", 0x0001))
print('Done')
@cli.command()
@click.pass_context
def reboot(ctx):
with ctx.obj as tran:
tran.execute(WriteRegs(BT.ESC, 0x78, "<H", 0x0001))
print('Done')
def print_reg(tran, desc, reg, format, dev=BT.ESC):
try:
data = tran.execute(ReadRegs(dev, reg, format))
print(desc % data)
except Exception as exc:
print(desc, repr(exc))
def bms_info(tran, dev):
print('BMS S/N: %s' % tran.execute(ReadRegs(dev, 0x10, "14s"))[0].decode())
print_reg(tran, 'BMS Version: %04x', 0x17, "<H", dev=dev)
print_reg(tran, 'BMS charge: %d%%', 0x32, "<H", dev=dev)
print_reg(tran, 'BMS full cycles: %d', 0x1b, "<H", dev=dev)
print_reg(tran, 'BMS charges: %d', 0x1c, "<H", dev=dev)
print_reg(tran, 'BMS health: %d%%', 0x3b, "<H", dev=dev)
print('BMS current: %.2fA' % (tran.execute(ReadRegs(dev, 0x33, "<h"))[0] / 100.0,))
print('BMS voltage: %.2fV' % (tran.execute(ReadRegs(dev, 0x34, "<h"))[0] / 100.0,))
@cli.command()
@click.pass_context
def info(ctx):
with ctx.obj as tran:
print('ESC S/N: %s' % tran.execute(ReadRegs(BT.ESC, 0x10, "14s"))[0].decode())
print('ESC PIN: %s' % tran.execute(ReadRegs(BT.ESC, 0x17, "6s"))[0].decode())
print()
print_reg(tran, 'BLE Version: %04x', 0x68, "<H")
print_reg(tran, 'ESC Version: %04x', 0x1A, "<H")
print()
print_reg(tran, 'Error code: %d', 0x1B, "<H")
print_reg(tran, 'Warning code: %d', 0x1C, "<H")
print()
print('Total mileage: %s' % pp_distance(tran.execute(ReadRegs(BT.ESC, 0x29, "<L"))[0]))
print('Total runtime: %s' % pp_time(tran.execute(ReadRegs(BT.ESC, 0x32, "<L"))[0]))
print('Total riding: %s' % pp_time(tran.execute(ReadRegs(BT.ESC, 0x34, "<L"))[0]))
print('Chassis temp: %d°C' % (tran.execute(ReadRegs(BT.ESC, 0x3e, "<H"))[0] / 10.0,))
print()
print('BMS charge: %d%%' % tran.execute(ReadRegs(BT.BMS, 0x32, "<H")))
print('BMS full cycles: %d' % tran.execute(ReadRegs(BT.BMS, 0x1b, "<H")))
print('BMS charges: %d' % tran.execute(ReadRegs(BT.BMS, 0x1c, "<H")))
print('BMS health: %d%%' % tran.execute(ReadRegs(BT.BMS, 0x3b, "<H")))
print('BMS current: %.2fA' % (tran.execute(ReadRegs(BT.BMS, 0x33, "<h"))[0] / 100.0,))
print('BMS voltage: %.2fV' % (tran.execute(ReadRegs(BT.BMS, 0x34, "<h"))[0] / 100.0,))
try:
print(' *** Internal BMS ***')
bms_info(tran, BT.BMS)
except Exception as exc:
print('No internal BMS found', repr(exc))
print()
try:
print(' *** External BMS ***')
bms_info(tran, BT.EXTBMS)
except Exception as exc:
print('No external BMS found', repr(exc))
def pp_distance(dist):
if dist < 1000:
return '%dm' % dist
return '%dkm %dm' % (dist / 1000.0, dist % 1000)
def pp_time(seconds):
periods = [
('year', 60*60*24*365),
('month', 60*60*24*30),
('day', 60*60*24),
('hour', 60*60),
('minute', 60),
('second', 1)
]
strings = []
for period_name, period_seconds in periods:
if seconds > period_seconds:
period_value, seconds = divmod(seconds, period_seconds)
has_s = 's' if period_value > 1 else ''
strings.append("%2s %s%s" % (period_value, period_name, has_s))
return " ".join(strings)
if __name__ == '__main__':
cli()