import click from py9b.link.base import LinkTimeoutException from py9b.transport.base import BaseTransport as BT from py9b.transport.packet import BasePacket as PKT from py9b.command.regio import ReadRegs, WriteRegs class Connection: def __init__(self, transport, link, address): self.transport = transport self.link = link self.address = address def __enter__(self): link = None if self.link == 'bleak': from py9b.link.bleak import BleakLink link = BleakLink() elif self.link == 'tcp': from py9b.link.tcp import TCPLink link = TCPLink() elif self.link == 'serial': from py9b.link.serial import SerialLink link = SerialLink(timeout=0.1) link.__enter__() if not self.address: ports = link.scan() if not ports: raise Exception('No devices found') self.address = ports[0] link.open(self.address) transport = None if self.transport == 'ninebot': from py9b.transport.ninebot import NinebotTransport transport = NinebotTransport(link) elif self.transport == 'xiaomi': from py9b.transport.xiaomi import XiaomiTransport transport = XiaomiTransport(link) if transport.execute(ReadRegs(BT.ESC, 0x68, " 0x081 and self.link.startswith('ble'): transport.keys = link.fetch_keys() transport.recover_keys() print('Keys recovered') self._transport = transport self._link = link return transport def __exit__(self, a, b, c): self._link.__exit__(a, b, c) @click.group() @click.option('--transport', default='ninebot', help='Transport to use (one of xiaomi, ninebot)') @click.option('--link', default='bleak', help='Link implementation to use (one of serial, tcp, bleak)') @click.option('--address', default=None, help='Device address to use (dependent on link, defaults to automatic scan)') @click.pass_context def cli(ctx, transport, link, address): ctx.obj = Connection(transport, link, address) @cli.command() @click.pass_context def dong(ctx): with ctx.obj as tran: #tran.execute(WriteRegs(BT.ESC, 0x1e, " period_seconds: period_value, seconds = divmod(seconds, period_seconds) has_s = 's' if period_value > 1 else '' strings.append("%2s %s%s" % (period_value, period_name, has_s)) return " ".join(strings) if __name__ == '__main__': cli()