Initial commit

master
informatic 2014-10-07 18:48:43 +02:00
commit 366538f58c
9 changed files with 425 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
*.pyc
*.*~
*.swp

2
dev_requirements.txt Normal file
View File

@ -0,0 +1,2 @@
pyserial
nose

210
eSSP/__init__.py Normal file
View File

@ -0,0 +1,210 @@
import serial
import logging
import struct
from . import constants, utils
from .exceptions import *
# This code is mostly based on documentation from:
# http://www.innovative-technology.co.uk/product-files/ssp-manuals/bv20-ssp-manual.pdf
from collections import namedtuple
class Event(namedtuple('Event', ['code', 'args'])):
def __str__(self):
return constants.poll_codes[self.code] if self.code in constants.poll_codes \
else 'Unknown event (0x%02X)' % self.code
class eSSP(object):
def __init__(self, path='/dev/ttyACM0', slave_id=0, fd=None):
'''
path and fd arguments are mutually exclusive.
'''
self.logger = logging.getLogger('eSSP')
if fd is not None:
self.serial = fd
else:
self.serial = serial.Serial(path, 9600)
self.slave_id = slave_id
self.seq = 0x00
def send(self, command, args=[], parse_status=True):
''' Sends command and reads response from slave.
Returns received data payload.'''
data = bytearray([command]) + bytearray(args)
self.seq ^= 0x80
payload = bytearray([0x7f, self.seq | self.slave_id, len(data)])
payload += data
payload += bytearray(utils.crc16(payload[1:]))
payload[1:] = utils.stuff_stx(payload[1:])
self.logger.debug('-> %s' % (utils.stringify_payload(payload),))
self.serial.write(payload)
return self.read(parse_status)
def send_unpack(self, format_string, command, args=[], parse_status=True):
resp = self.send(command, args, parse_status)
return struct.unpack(format_string, resp)
def read(self, parse_status=True):
''' Reads, parses and checks a single message from serial.
Returns received data payload.
Raises DesyncException, ProtocolException and InvalidChecksumException.
'''
stx = self.unpack('B')
if stx != 0x7f:
raise ProtocolException('Expected start byte 0x7f, got 0x%02X' % (stx,))
seq = self.unpack('B')
if seq != self.seq | self.slave_id:
raise DesyncException('Invalid sequence bit or Slave ID, expected' \
' 0x%02X, got 0x%02X' % (self.seq | self.slave_id, seq))
length = self.unpack('B')
data = bytearray(self.serial.read(length))
crc = tuple(self.unpack('BB', multi=True))
data_crc = utils.crc16(bytearray([seq, length]) + data)
# FIXME
self.logger.debug('<- %s' % (utils.stringify_payload(bytearray([stx, seq, length]) + data + bytearray(crc)),))
if data_crc != crc:
raise InvalidChecksumException('CRC mismatch (0x%02X%02X expected,' \
' got 0x%02X%02X)' % (data_crc + crc))
if parse_status:
status, data = data[0], data[1:]
if status != 0xf0:
raise DeviceError(status)
return data
def pack(self, *args):
''' pack and write to serial shorthand.'''
self.serial.write(struct.pack(*args))
def unpack(self, fmt, multi=False):
''' read from serial and unpack shorthand.
If multi flag is left False first unpacked value will be returned only.
'''
v = struct.unpack(fmt, self.serial.read(struct.calcsize(fmt)))
if multi:
return v
else:
return v[0]
class NV11(eSSP):
def generic_command(command_id, unpack_string=None):
def command(self, *args):
result = self.send(command_id, args)
if unpack_string:
return struct.unpack(unpack_string, result)
else:
return result
return command
# Init
reset = generic_command(0x1)
last_reject_code = generic_command(0x17)
set_channel_inhibits = generic_command(0x2)
enable = generic_command(0x0a)
disable = generic_command(0x09)
display_on = generic_command(0x3)
display_off = generic_command(0x4)
# Events
#poll = generic_command(0x7)
def parse_poll(self, data):
statuses = [i for i, v in enumerate(data) if v in constants.poll_codes]
events = []
for i, n in enumerate(statuses):
if len(statuses) > i+1:
event = Event(data[n], data[n+1:statuses[i+1]])
else:
event = Event(data[n], data[n+1:])
self.logger.info('Event: %s (args: %r)', event, event.args)
events.append(event)
return events
def poll(self):
result = self.send(0x07)
return self.parse_poll(result)
poll_with_ack = generic_command(0x56)
event_ack = generic_command(0x57)
hold = generic_command(0x18)
reject_banknote = generic_command(0x8)
# Payouts
enable_payout_device = generic_command(0x5c)
disable_payout_device = generic_command(0x5b)
payout_note = generic_command(0x42)
stack_note = generic_command(0x43)
get_note_positions = generic_command(0x41)
empty_all = generic_command(0x3f)
smart_empty = generic_command(0x52)
cashbox_payout_operation_data = generic_command(0x53)
# Routing
get_denomination_route = generic_command(0x3c)
set_denomination_route = generic_command(0x3b)
# Counters
get_counters = generic_command(0x58)
reset_counters = generic_command(0x59)
# System
get_build_revision = generic_command(0x4f)
get_firmware_version = generic_command(0x20)
get_dataset_version = generic_command(0x21)
host_protocol_version = generic_command(0x6)
get_serial_number = generic_command(0xc)
def unit_data(self):
(status, dev_type, fw_ver, currency, _rest) = self.send_unpack('<BB4s3sI', 0x0d)
proto_ver = _rest >> 24
value_multiplier = _rest & 0xffffff
return (status, dev_type, fw_ver, currency, value_multiplier, proto_ver)
setup_request = generic_command(0x5)
set_value_reporting_type = generic_command(0x45)
set_baud_rate = generic_command(0x4d)
channel_reteach_data = generic_command(0x10)
channel_security_data = generic_command(0xf)
channel_value_request = generic_command(0xe)
# Encryption
set_modulus = generic_command(0x4b)
set_generator = generic_command(0x4a)
request_key_exchange = generic_command(0x4c)
set_fixed_encryption_key = generic_command(0x60)
reset_fixed_encryption_key = generic_command(0x61)
def sync(self):
self.send(0x11)
self.seq = 0x80 # seq is toggled before send, next sent seq will be 0x00
# -> Utils

63
eSSP/constants.py Normal file
View File

@ -0,0 +1,63 @@
poll_codes = {
0xb0: 'Jam recovery',
0xb1: 'Error during payout',
0xb3: 'Smart emptying',
0xb4: 'Smart emptied',
0xb5: 'Channel Disable',
0xb6: 'Initialising',
0xb7: 'Coin mech error',
0xc2: 'Emptying',
0xc3: 'Emptied',
0xc4: 'Coin mech jammed',
0xc5: 'Coin mech return pressed',
0xc6: 'Payout out of service',
0xc7: 'Note float removed',
0xc8: 'Note float attached',
0xc9: 'Device full',
0xca: 'Note paid into stacker at power-up',
0xcb: 'Note paid into store at power-up',
0xcc: 'Note Stacking',
0xcd: 'Note Dispensed at power-up',
0xce: 'Note held in bezel',
0xd1: 'Bar Code Ticket Acknowledge',
0xd2: 'Dispensed',
0xd5: 'Jammed',
0xd6: 'Halted',
0xd7: 'Floating',
0xd8: 'Floated',
0xd9: 'Time out',
0xda: 'Dispensing',
0xdb: 'Note stored in payout',
0xdc: 'Incomplete payout',
0xdd: 'Incomplete float',
0xde: 'Cashbox paid',
0xdf: 'Coin credit',
0xe0: 'Note Path Open',
0xe1: 'Note Cleared From Front',
0xe2: 'Note Cleared To Cashbox',
0xe3: 'Cashbox Removed',
0xe4: 'Cashbox Replaced',
0xe5: 'Bar Code Ticket Validated',
0xe6: 'Fraud Attempt',
0xe7: 'Stacker Full',
0xe8: 'Disabled',
0xe9: 'Unsafe Note Jam',
0xea: 'Safe Note Jam',
0xeb: 'Note Stacked',
0xec: 'Note Rejected',
0xed: 'Note Rejecting',
0xee: 'Credit Note',
0xef: 'Read Note',
0xf1: 'Slave reset'
}
error_codes = {
0xf0: 'OK',
0xf2: 'Command not known',
0xf3: 'Wrong number of parameters',
0xf4: 'Parameter out of range',
0xf5: 'Command cannot be processed',
0xf6: 'Software error',
0xf8: 'Command failure',
0xfa: 'Key not set'
}

13
eSSP/exceptions.py Normal file
View File

@ -0,0 +1,13 @@
from . import constants
class eSSPException(Exception): pass
class ProtocolException(eSSPException): pass
class DesyncException(eSSPException): pass
class InvalidChecksumException(eSSPException): pass
class DeviceError(eSSPException):
def __str__(self):
if self.msg in self.error_codes:
return constants.error_codes[self.msg]
else:
return 'Unknown error (0x%02X)' % self.msg

31
eSSP/utils.py Normal file
View File

@ -0,0 +1,31 @@
def stringify_payload(data):
''' Returns user-friendly string representation of bytearray.'''
return '(%2d) %s' % (len(data), ' '.join(['%02X' % n for n in data]))
def stuff_stx(data):
''' Does stx (0x7f) byte stuffing as per documentation.'''
oc = [i for i, v in enumerate(data) if v == 0x7f]
oc.reverse()
for i in oc:
data.insert(i, 0x7f)
return data
def crc16(data):
''' CRC16 implementation compliant with SSP.
Returns tuple with low and high checksum byte.'''
seed = 0xffff
poly = 0x8005
checksum = seed
for b in data:
checksum ^= b << 8
for j in range(0, 8):
if checksum & 0x8000:
checksum = ( (checksum << 1) & seed ) ^ poly
else:
checksum <<= 1
return ( checksum & 0xff, (checksum >> 8) & 0xff )

19
examples/basic_usage.py Normal file
View File

@ -0,0 +1,19 @@
import eSSP
import time
import logging
logging.basicConfig(level=logging.DEBUG)
k = eSSP.NV11()
k.sync()
k.set_channel_inhibits(0xff, 0xff)
k.enable()
k.display_on()
while True:
events = k.poll()
if events:
print events
time.sleep(0.5)

1
requirements.txt Normal file
View File

@ -0,0 +1 @@
pyserial

83
tests.py Normal file
View File

@ -0,0 +1,83 @@
import unittest
from eSSP import eSSP
from eSSP import utils
from eSSP.exceptions import ProtocolException, InvalidChecksumException, DesyncException, DeviceError
from StringIO import StringIO
class eSSPTests(unittest.TestCase):
def prep(self, data=''):
s = rwStringIO(data)
return eSSP(fd=s)
def test_read(self):
self.assertEquals(bytearray([0xcc]), \
self.prep('\x7f\x00\x02\xf0\xcc\xa8\x22').read())
def test_send_basic(self):
s = rwStringIO('\x7f\x80\x02\xf0\xcc\x97\xa2')
k = eSSP(fd=s)
k.send(0x07)
self.assertEquals('\x7f\x80\x01\x07\x12\x02', s.outstream.getvalue())
def test_send_sequence(self):
# Test sequence alternation
s = rwStringIO('\x7f\x80\x01\xf0\x23\x80' \
'\x7f\x00\x01\xf0\x20\x0a')
k = eSSP(fd=s)
k.send(0x02, [0xf0, 0x07, 0x00])
self.assertEquals('\x7f\x80\x04\x02\xf0\x07\x00\x70\x37', s.outstream.getvalue())
s.outstream.truncate(0)
k.send(0x05, [0x10, 0x21])
self.assertEquals('\x7f\x00\x03\x05\x10\x21\x81\xf8', s.outstream.getvalue())
def test_send_stuffing(self):
# Test send with byte stuffing
s = rwStringIO('\x7f\x80\x01\xf0\x23\x80')
k = eSSP(fd=s)
k.send(0xa0, [0xf0, 0x7f, 0xa1, 0x7f])
self.assertEquals('\x7f\x80\x05\xa0\xf0\x7f\x7f\xa1\x7f\x7f\x29\x7a', s.outstream.getvalue())
def test_stuffing(self):
self.assertEquals(bytearray([0x7f, 0x7f, 0x01, 0xf0, 0x7a, 0x7f, 0x7f]), \
utils.stuff_stx(bytearray([0x7f, 0x01, 0xf0, 0x7a, 0x7f])))
self.assertEquals(bytearray([0x05, 0x7f, 0x7f, 0x7a, 0x7f, 0x7f]), \
utils.stuff_stx(bytearray([0x05, 0x7f, 0x7a, 0x7f])))
self.assertEquals(bytearray([0x4a, 0x22, 0x83]), \
utils.stuff_stx(bytearray([0x4a, 0x22, 0x83])))
def test_crc_calculation(self):
self.assertEquals((0x23, 0x80), utils.crc16([0x80, 0x1, 0xf0]))
self.assertEquals((0x02, 0xfd), utils.crc16([0x00]))
self.assertEquals((0x02, 0xfd), utils.crc16(bytearray([0x00])))
def test_invalid_startbyte(self):
self.assertRaises(ProtocolException, self.prep('\x80').read)
def test_invalid_checksum(self):
self.assertRaises(InvalidChecksumException, \
self.prep('\x7f\x00\x02\xf0\xcc\x97\xa1').read)
self.assertRaises(InvalidChecksumException, \
self.prep('\x7f\x00\x02\xf0\xcc\x97\xa3').read)
def test_protocol_desync(self):
self.assertRaises(DesyncException, \
self.prep('\x7f\x80\x02\xf0\xcc\x97\xa1').read)
def test_device_error(self):
self.assertRaises(DeviceError, \
self.prep('\x7f\x00\x01\xf2\x2f\x8a').read)
class rwStringIO(StringIO):
outstream = None
def __init__(self, *args):
StringIO.__init__(self, *args)
self.outstream = StringIO()
def write(self, *args):
self.outstream.write(*args)