165 lines
5.0 KiB
Python
165 lines
5.0 KiB
Python
import struct
|
|
import datetime
|
|
from functools import reduce
|
|
|
|
# Adapted & Pythonized from pymetawatch by Travis Goodspeed
|
|
class MSPCRC:
|
|
"""Performs a MSP430-compatible CRC-CCIIT."""
|
|
def __init__(self):
|
|
self._tab=256*[[]]
|
|
for i in range(256):
|
|
crc=0
|
|
c = i << 8
|
|
for j in range(8):
|
|
if (crc ^ c) & 0x8000:
|
|
crc = ( crc << 1) ^ 0x1021
|
|
else:
|
|
crc = crc << 1
|
|
c = c << 1
|
|
crc = crc & 0xffff
|
|
self._tab[i] = crc
|
|
|
|
def _calculate_crc(self, crc, c):
|
|
c = c & 0xFF
|
|
c = self._flip(c)
|
|
|
|
tmp = ((crc >> 8) ^ c) & 0xffff
|
|
crc = (((crc << 8) ^ self._tab[tmp])) & 0xffff
|
|
return crc
|
|
|
|
def checksum_bytes(self, b):
|
|
"""Returns the checksum of a bytes object."""
|
|
return reduce(self._calculate_crc, b, 0xFFFF)
|
|
|
|
def _flip(self,c):
|
|
"""Flips the bit order, because that's how the MSP430 rolls.."""
|
|
l=[0, 8, 4, 12, 2, 10, 6, 14, 1, 9, 5, 13, 3, 11, 7, 15]
|
|
return ((l[c & 0x0F]) << 4) + l[(c & 0xF0) >> 4]
|
|
|
|
class OutboundPacket:
|
|
def _generate(self, _type, options, *args):
|
|
"""Generates a MetaWatch packet based on a given type, options byte and payload bytes."""
|
|
length = 6 + len(args)
|
|
data = []
|
|
data.append(1) # Start Bytes
|
|
data.append(length)
|
|
data.append(_type)
|
|
data.append(options)
|
|
data += args
|
|
mspcrc = MSPCRC()
|
|
crc = mspcrc.checksum_bytes(bytearray(data))
|
|
self._data = bytes(data) + struct.pack("<H", crc)
|
|
|
|
def emit(self, stream):
|
|
stream.write(self._data)
|
|
stream.flush()
|
|
|
|
|
|
class GetDeviceType(OutboundPacket):
|
|
def __init__(self):
|
|
self._generate(0x01, 0)
|
|
|
|
class GetRTC(OutboundPacket):
|
|
def __init__(self):
|
|
self._generate(0x27, 0)
|
|
|
|
class SetRTC(OutboundPacket):
|
|
def __init__(self, dt):
|
|
payload = []
|
|
payload.append(dt.year & 0xFF)
|
|
payload.append(dt.year >> 8)
|
|
payload.append(dt.month)
|
|
payload.append(dt.day)
|
|
payload.append(dt.weekday() + 1)
|
|
payload.append(dt.hour)
|
|
payload.append(dt.minute)
|
|
payload.append(dt.second)
|
|
self._generate(0x26, 0, *payload)
|
|
|
|
|
|
class PacketDecodeError(Exception):
|
|
pass
|
|
|
|
|
|
class TypedInboundPacket:
|
|
def __init__(self, inbound):
|
|
self._payload = inbound._payload
|
|
self._type = inbound._type
|
|
self._options = inbound._options
|
|
|
|
def _assert_payload_length(self, l):
|
|
"""Asserts that the received payload is at least `l` bytes long."""
|
|
if len(self._payload) < l:
|
|
raise PacketDecodeError("Incorrect payload length! Is {} bytes long, should be at least {}.".format(len(self._payload), l))
|
|
|
|
def _assert_type(self, _type):
|
|
if self._type != _type:
|
|
raise PacketDecodeError("Incorrect message type! Expected {0:x}, got {1:x}.".format(_type, self._type))
|
|
|
|
|
|
class GetDeviceTypeResponse(TypedInboundPacket):
|
|
def parse(self):
|
|
self._assert_type(0x02)
|
|
self._assert_payload_length(1)
|
|
self.device_type = self._payload[0]
|
|
|
|
|
|
class GetRTCResponse(TypedInboundPacket):
|
|
def parse(self):
|
|
self._assert_type(0x28)
|
|
self._assert_payload_length(8)
|
|
year = (self._payload[0] << 8) | self._payload[1]
|
|
month = self._payload[2]
|
|
day = self._payload[3]
|
|
h, m, s = self._payload[5:8]
|
|
dt = datetime.datetime(year=year, month=month, day=day, hour=h, minute=m, second=s)
|
|
self.datetime = dt
|
|
|
|
class InboundPacket:
|
|
PACKET_TYPES = {
|
|
0x02: GetDeviceTypeResponse,
|
|
0x28: GetRTCResponse,
|
|
}
|
|
def __init__(self, stream):
|
|
self._s = stream
|
|
|
|
def _start_crc(self):
|
|
self._crc_check = []
|
|
|
|
def _read(self, count):
|
|
"""Gets bytes and saves them to an list for later CRC-Checking."""
|
|
b = self._s.read(count)
|
|
self._crc_check += b
|
|
return b
|
|
|
|
def _end_crc(self):
|
|
mspcrc = MSPCRC()
|
|
return mspcrc.checksum_bytes(bytes(self._crc_check))
|
|
|
|
def read(self):
|
|
self._start_crc()
|
|
start, = self._read(1)
|
|
if start != 1:
|
|
raise PacketDecodeError("Bad start byte (received 0x{0:x}).".format(start))
|
|
length, = self._read(1)
|
|
if length > 32 or length < 6:
|
|
raise PacketDecodeError("Length not in spec (got {}).".format(length))
|
|
|
|
payload_length = length - 6
|
|
self._type, = self._read(1)
|
|
self._options, = self._read(1)
|
|
self._payload = self._read(payload_length)
|
|
|
|
received_crc, = struct.unpack('<H', self._s.read(2))
|
|
correct_crc = self._end_crc()
|
|
if received_crc != correct_crc:
|
|
raise PacketDecodeError("Wrong CRC! Got 0x{0:x}, expected 0x{1:x}.".format(received_crc, correct_crc))
|
|
if self._type not in self.PACKET_TYPES:
|
|
raise PacketDecodeError("Unknown packet type {0:x}.".format(self._type))
|
|
|
|
p = self.PACKET_TYPES[self._type](self)
|
|
p.parse()
|
|
return p
|
|
|
|
|