bitvend/mdb/backend.py

99 lines
2.0 KiB
Python

import time
try:
import pigpio
except:
pigpio = None
RX_PIN = 4
TX_PIN = 17
class Backend(object):
def __init__(self):
pass
def open(self):
pass
def close(self):
pass
def read(self):
return b''
def write(self, data):
pass
#
# Dummy backend driver, for testing purposes only
#
class DummyBackend(Backend):
def read(self):
time.sleep(0.005)
return b''
#
# Backend driver for raspberry pi pigpio library
#
class RaspiBackend(Backend):
def __init__(self, rx_pin=RX_PIN, tx_pin=TX_PIN):
self.rx_pin = rx_pin
self.tx_pin = tx_pin
def open(self):
self.pi = pigpio.pi()
self.pi.wave_clear()
self.pi.set_mode(self.tx_pin, pigpio.INPUT)
try:
self.pi.bb_serial_read_close(self.rx_pin)
except:
pass
status = self.pi.bb_serial_read_open(self.rx_pin, 9600, 9)
if status:
raise Exception('Port open failed: %d', status)
def read(self):
while True:
cnt, data = self.pi.bb_serial_read(self.rx_pin)
if cnt <= 0:
time.sleep(0.001)
else:
return data
def write(self, data):
self.pi.wave_add_serial(self.tx_pin, 9600, data, bb_bits=9, bb_stop=6)
wid = self.pi.wave_create()
self.pi.set_mode(self.tx_pin, pigpio.OUTPUT)
self.pi.wave_send_once(wid) # transmit serial data
while self.pi.wave_tx_busy(): # wait until all data sent
time.sleep(0.001)
self.pi.wave_delete(wid)
self.pi.set_mode(self.tx_pin, pigpio.INPUT)
#
# Backend device based on STM32F1 MDB-USB adapter (to be actually designed...)
#
class SerialBackend(Backend):
def __init__(self, device='/dev/ttyACM0'):
import serial
self.ser = serial.Serial(device)
def read(self):
buf = b''
while len(buf) < 2:
buf += self.ser.read(1)
return buf
def write(self, data):
self.ser.write(data)
self.ser.flush()