First commit.

master
q3k 2013-09-19 18:59:48 +02:00
commit 8a0745d3e6
4 changed files with 99 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
*pyc
*swp
env

0
metawatch/__init__.py Normal file
View File

62
metawatch/protocol.py Normal file
View File

@ -0,0 +1,62 @@
import struct
from functools import reduce
# Adapted & Pythonized from pymetawatch by Travis Goodspeed
class MSPCRC:
"""Performs a MSP430-compatible CRC-CCIIT."""
def __init__(self):
self._tab=256*[[]]
for i in range(256):
crc=0
c = i << 8
for j in range(8):
if (crc ^ c) & 0x8000:
crc = ( crc << 1) ^ 0x1021
else:
crc = crc << 1
c = c << 1
crc = crc & 0xffff
self._tab[i] = crc
def _calculate_crc(self, crc, c):
c = c & 0xFF
c = self._flip(c)
tmp = ((crc >> 8) ^ c) & 0xffff
crc = (((crc << 8) ^ self._tab[tmp])) & 0xffff
return crc
def checksum_bytes(self, b):
"""Returns the checksum of a bytes object."""
return reduce(self._calculate_crc, b, 0xFFFF)
def _flip(self,c):
"""Flips the bit order, because that's how the MSP430 rolls.."""
l=[0, 8, 4, 12, 2, 10, 6, 14, 1, 9, 5, 13, 3, 11, 7, 15]
return ((l[c & 0x0F]) << 4) + l[(c & 0xF0) >> 4]
class OutboundPacket:
def _generate(self, _type, options, *args):
"""Generates a MetaWatch packet based on a given type, options byte and payload bytes."""
length = 6 + len(args)
data = []
data.append(1) # Start Bytes
data.append(length)
data.append(_type)
data.append(options)
data += args
mspcrc = MSPCRC()
crc = mspcrc.checksum_bytes(bytearray(data))
self._data = bytes(data) + struct.pack("<H", crc)
def emit(self, stream):
stream.write(self._data)
stream.flush()
class GetDeviceType(OutboundPacket):
def __init__(self):
self._generate(1, 0)
class GetInformationString(OutboundPacket):
def __init__(self):
self._generate(3, 0)

34
tests/test_packets.py Normal file
View File

@ -0,0 +1,34 @@
import unittest
import io
from metawatch import protocol
class TestPackets(unittest.TestCase):
def test_1_crc(self):
crc = protocol.MSPCRC()
checksum = crc.checksum_bytes(b'\x01\x06\x01\x00')
self.assertEqual(checksum, 0xD90B)
def test_2_basic_outbount_packet(self):
p = protocol.OutboundPacket()
p._generate(1, 0)
self.assertEqual(p._data, b'\x01\x06\x01\x00\x0B\xD9')
def test_3_packet_emitting(self):
p = protocol.OutboundPacket()
i = io.BytesIO()
p._generate(1, 0)
p.emit(i)
self.assertEqual(i.getvalue(), b'\x01\x06\x01\x00\x0B\xD9')
def _test_packet(self, result, klass, *args, **kwargs):
p = klass(*args, **kwargs)
i = io.BytesIO()
p.emit(i)
self.assertEqual(i.getvalue(), result)
def test_4_get_device_type(self):
self._test_packet(b'\x01\x06\x01\x00\x0B\xD9', protocol.GetDeviceType)
def test_5_get_information_string(self):
self._test_packet(b'\x01\x06\x03\x00\xc7\xd4', protocol.GetInformationString)