totally legit internal fork

This commit is contained in:
informatic 2019-10-22 16:15:12 +02:00
parent 2c0f5adadc
commit 34fd4465da
2 changed files with 87 additions and 5 deletions

85
cli.py
View file

@ -2,7 +2,8 @@ import click
from py9b.link.base import LinkTimeoutException
from py9b.transport.base import BaseTransport as BT
from py9b.command.regio import ReadRegs
from py9b.transport.packet import BasePacket as PKT
from py9b.command.regio import ReadRegs, WriteRegs
class Connection:
def __init__(self, transport, link, address):
@ -64,6 +65,14 @@ class Connection:
def cli(ctx, transport, link, address):
ctx.obj = Connection(transport, link, address)
@cli.command()
@click.pass_context
def dong(ctx):
with ctx.obj as tran:
#tran.execute(WriteRegs(BT.ESC, 0x1e, "<H", 2))
print(tran.execute(WriteRegs(BT.ESC, 0x1e, "<H", 0xe395 | 0x800)))
#print(tran.execute(WriteRegs(BT.ESC, 0x7e, "<H", 1)))
@cli.command()
@click.pass_context
def dump(ctx):
@ -74,6 +83,17 @@ def dump(ctx):
except Exception as exc:
print('0x%02x: %s' % (offset, exc))
@cli.command()
@click.pass_context
def disable_iot(ctx):
with ctx.obj as tran:
try:
print(tran.execute(WriteRegs(BT.ESC, 0x8f, "<H", 0x0)))
except Exception as exc:
print(exc)
print('Done')
@cli.command()
@click.pass_context
def sniff(ctx):
@ -86,6 +106,63 @@ def sniff(ctx):
except Exception as exc:
print(exc)
@cli.command()
@click.pass_context
def enable_ble(ctx):
with ctx.obj as tran:
fun = tran.execute(ReadRegs(BT.ESC, 0x80, "<H"))[0]
print('FUN_BOOL_1: %04x' % (fun))
try:
print(tran.execute(WriteRegs(BT.ESC, 0x80, "<H", fun | 0x400)))
except Exception as exc:
print(exc)
@cli.command()
@click.pass_context
def ble_reinit(ctx):
with ctx.obj as tran:
print(tran.execute(WriteRegs(BT.ESC, 0x4d, "<H", 1)))
@cli.command()
@click.pass_context
def reginfo(ctx):
with ctx.obj as tran:
#print(tran.execute(WriteRegs(BT.ESC, 0x7e, "<H", 1)))
# 0xe395
#print(tran.execute(WriteRegs(BT.ESC, 0x90, "<H", 1)))
#print('FUN_BOOL_1: %s' % tran.execute(ReadRegs(BT.ESC, 0x1e, "<H")))
#print(tran.execute(WriteRegs(BT.ESC, 0x71, "<H", 1)))
#print('headlight: %s' % tran.execute(ReadRegs(BT.ESC, 0x90, "<H")))
print('beep: %s' % tran.execute(ReadRegs(BT.ESC, 0x91, "<H")))
print('beep control: %s' % tran.execute(ReadRegs(BT.ESC, 0x92, "<H")))
print('work_sys: %s' % tran.execute(ReadRegs(BT.ESC, 0x7e, "<H")))
print('FUN_BOOL_1: %04x' % (tran.execute(ReadRegs(BT.ESC, 0x80, "<H"))[0]))
print('FUN_BOOL_2: %04x' % (tran.execute(ReadRegs(BT.ESC, 0x81, "<H"))[0]))
print('FUN_BOOL_X: %04x' % (tran.execute(ReadRegs(BT.ESC, 0x8f, "<H"))[0]))
print('speedlimit1: %d' % tran.execute(ReadRegs(BT.ESC, 0x72, "<H")))
print('speedlimit2: %d' % tran.execute(ReadRegs(BT.ESC, 0x73, "<H")))
print('speedlimit3: %d' % tran.execute(ReadRegs(BT.ESC, 0x74, "<H")))
@cli.command()
@click.pass_context
def heartbeat(ctx):
with ctx.obj as tran:
req = PKT(
src=BT.HOST, dst=BT.ESC, cmd=0x55, arg=0x7c, data=bytearray([0x7c]))
while True:
print('heartbeat...')
tran.send(req)
try:
for n in range(5):
rsp = tran.recv()
print('response:', rsp)
except LinkTimeoutException:
print('timeout')
import time
time.sleep(0.5)
continue
@cli.command()
@click.pass_context
def powerdown(ctx):
@ -149,6 +226,12 @@ def info(ctx):
print('Total riding: %s' % pp_time(tran.execute(ReadRegs(BT.ESC, 0x34, "<L"))[0]))
print('Chassis temp: %d°C' % (tran.execute(ReadRegs(BT.ESC, 0x3e, "<H"))[0] / 10.0,))
print()
print('BMS charge: %d%%' % tran.execute(ReadRegs(BT.BMS, 0x32, "<H")))
print('BMS full cycles: %d' % tran.execute(ReadRegs(BT.BMS, 0x1b, "<H")))
print('BMS charges: %d' % tran.execute(ReadRegs(BT.BMS, 0x1c, "<H")))
print('BMS health: %d%%' % tran.execute(ReadRegs(BT.BMS, 0x3b, "<H")))
print('BMS current: %.2fA' % (tran.execute(ReadRegs(BT.BMS, 0x33, "<h"))[0] / 100.0,))
print('BMS voltage: %.2fV' % (tran.execute(ReadRegs(BT.BMS, 0x34, "<h"))[0] / 100.0,))
try:
print(' *** Internal BMS ***')

View file

@ -14,7 +14,7 @@ class BaseTransport(object):
BLE = 0x21
BMS = 0x22
EXTBMS = 0x23
HOST = 0x3E
HOST = 0x3D
DeviceNames = {
MOTOR: "MOTOR",
@ -34,18 +34,17 @@ class BaseTransport(object):
def send(self, src, dst, cmd, arg, data=bytearray()):
raise NotImplementedError()
def execute(self, command):
def execute(self, command, retries=3):
self.send(command.request)
if not command.has_response:
return True
# TODO: retry ?
exc = None
for n in range(1):
for n in range(retries):
try:
rsp = self.recv()
return command.handle_response(rsp)
except Exception as e:
print("retry")
exc = e
pass
raise exc