Basic watch communication working.

master
q3k 2013-09-20 11:23:48 +02:00
parent 8a0745d3e6
commit fdf94da495
4 changed files with 135 additions and 4 deletions

8
main.py Normal file
View File

@ -0,0 +1,8 @@
import serial
from metawatch.watch import Watch
w = Watch(serial.Serial('/dev/rfcomm0'))
print(w.get_device_type())
print(w.get_rtc())

View File

@ -1,4 +1,5 @@
import struct
import datetime
from functools import reduce
# Adapted & Pythonized from pymetawatch by Travis Goodspeed
@ -53,10 +54,97 @@ class OutboundPacket:
stream.write(self._data)
stream.flush()
class GetDeviceType(OutboundPacket):
def __init__(self):
self._generate(1, 0)
self._generate(0x01, 0)
class GetInformationString(OutboundPacket):
class GetRTC(OutboundPacket):
def __init__(self):
self._generate(3, 0)
self._generate(0x27, 0)
class PacketDecodeError(Exception):
pass
class TypedInboundPacket:
def __init__(self, inbound):
self._payload = inbound._payload
self._type = inbound._type
self._options = inbound._options
def _assert_payload_length(self, l):
"""Asserts that the received payload is at least `l` bytes long."""
if len(self._payload) < l:
raise PacketDecodeError("Incorrect payload length! Is {} bytes long, should be at least {}.".format(len(self._payload), l))
def _assert_type(self, _type):
if self._type != _type:
raise PacketDecodeError("Incorrect message type! Expected {0:x}, got {1:x}.".format(_type, self._type))
class GetDeviceTypeResponse(TypedInboundPacket):
def parse(self):
self._assert_type(0x02)
self._assert_payload_length(1)
self.device_type = self._payload[0]
class GetRTCResponse(TypedInboundPacket):
def parse(self):
self._assert_type(0x28)
self._assert_payload_length(8)
year = (self._payload[1] << 8) | self._payload[0]
month = self._payload[2]
day = self._payload[3]
h, m, s = self._payload[4:7]
dt = datetime.datetime(year=year, month=month, day=day, hour=h, minute=m, second=s)
self.datetime = dt
class InboundPacket:
PACKET_TYPES = {
0x02: GetDeviceTypeResponse,
0x28: GetRTCResponse,
}
def __init__(self, stream):
self._s = stream
def _start_crc(self):
self._crc_check = []
def _read(self, count):
"""Gets bytes and saves them to an list for later CRC-Checking."""
b = self._s.read(count)
self._crc_check += b
return b
def _end_crc(self):
mspcrc = MSPCRC()
return mspcrc.checksum_bytes(bytes(self._crc_check))
def read(self):
self._start_crc()
start, = self._read(1)
if start != 1:
raise PacketDecodeError("Bad start byte (received 0x{0:x}).".format(start))
length, = self._read(1)
if length > 32 or length < 6:
raise PacketDecodeError("Length not in spec (got {}).".format(length))
payload_length = length - 6
self._type, = self._read(1)
self._options, = self._read(1)
self._payload = self._read(payload_length)
received_crc, = struct.unpack('<H', self._s.read(2))
correct_crc = self._end_crc()
if received_crc != correct_crc:
raise PacketDecodeError("Wrong CRC! Got 0x{0:x}, expected 0x{1:x}.".format(received_crc, correct_crc))
if self._type not in self.PACKET_TYPES:
raise PacketDecodeError("Unknown packet type {0:x}.".format(self._type))
p = self.PACKET_TYPES[self._type](self)
p.parse()
return p

28
metawatch/watch.py Normal file
View File

@ -0,0 +1,28 @@
import queue
from metawatch import protocol
class Watch:
def __init__(self, fd):
self._fd = fd
self._extra_packets = queue.Queue()
def _read_packet(self, expected):
while True:
ib = protocol.InboundPacket(self._fd)
p = ib.read()
if isinstance(p, expected):
return p
self._extra_packets.put(p)
def get_device_type(self):
o = protocol.GetDeviceType()
o.emit(self._fd)
i = self._read_packet(protocol.GetDeviceTypeResponse)
return i.device_type
def get_rtc(self):
o = protocol.GetRTC()
o.emit(self._fd)
i = self._read_packet(protocol.GetRTCResponse)
return i.datetime

View File

@ -30,5 +30,12 @@ class TestPackets(unittest.TestCase):
def test_4_get_device_type(self):
self._test_packet(b'\x01\x06\x01\x00\x0B\xD9', protocol.GetDeviceType)
def test_5_get_information_string(self):
def test_4_get_information_string(self):
self._test_packet(b'\x01\x06\x03\x00\xc7\xd4', protocol.GetInformationString)
def test_5_decode_packet(self):
i = io.BytesIO(b'\x01\x07\x02\x00\x01\x13\x3b')
ib = protocol.InboundPacket(i)
p = ib.read()
self.assertIsInstance(p, protocol.GetDeviceTypeResponse)
self.assertEqual(p.device_type, 1)