cygpio: basic cython-based MDB backend for raspi & pigpio

feature/cython
informatic 2018-06-10 19:49:49 +02:00
parent 37ba4aa07b
commit 9adff5141e
3 changed files with 95 additions and 0 deletions

86
cygpio/cygpio.pyx Normal file
View File

@ -0,0 +1,86 @@
RX_PIN = 4
TX_PIN = 17
cdef extern from "pigpio.h":
int gpioInitialise()
int gpioCfgInterfaces(unsigned ifFlags)
int gpioSetMode(unsigned gpio, unsigned mode)
int gpioSerialReadOpen(unsigned user_gpio, unsigned baud, unsigned data_bits)
int gpioSerialRead(unsigned user_gpio, void *buf, size_t bufSize) nogil
int gpioSerialReadClose(unsigned user_gpio)
int gpioWaveCreate()
int gpioWaveDelete(unsigned wave_id)
int gpioWaveClear()
int gpioWaveTxSend(unsigned wave_id, unsigned wave_mode)
int gpioWaveTxBusy()
int gpioWaveAddSerial(unsigned user_gpio, unsigned baud, unsigned data_bits, unsigned stop_bits, unsigned offset, unsigned numBytes, char *str)
cdef int INPUT "PI_INPUT"
cdef int OUTPUT "PI_OUTPUT"
cdef int PI_DISABLE_FIFO_IF
cdef int PI_DISABLE_SOCK_IF
cdef int PI_WAVE_MODE_ONE_SHOT
cdef extern from "unistd.h" nogil:
unsigned int sleep(unsigned int seconds)
unsigned int usleep(unsigned int usecs)
def test():
b = CythonRaspiBackend()
b.open()
while True:
print(repr(b.read()))
cdef class CythonRaspiBackend(object):
cdef int rx_pin
cdef int tx_pin
def __init__(self, rx_pin=RX_PIN, tx_pin=TX_PIN):
self.rx_pin = rx_pin
self.tx_pin = tx_pin
cpdef open(self):
gpioCfgInterfaces(PI_DISABLE_FIFO_IF | PI_DISABLE_SOCK_IF);
gpioInitialise()
gpioWaveClear()
gpioSetMode(self.tx_pin, INPUT)
# gpioSerClose...
cdef int resp = gpioSerialReadOpen(self.rx_pin, 9600, 9)
if resp != 0:
raise Exception('Serial open failed: %d' % resp)
cpdef read(self):
cdef unsigned char buf[1024]
cdef int read_size
with nogil:
while 1:
read_size = gpioSerialRead(self.rx_pin, &buf, sizeof(buf))
if read_size > 0:
break
usleep(100)
return bytes(buf[0:read_size])
cpdef write(self, data):
cdef char* c_data = data
gpioWaveAddSerial(self.tx_pin, 9600, 9, 6, 0, len(data), c_data)
wid = gpioWaveCreate()
gpioSetMode(self.tx_pin, OUTPUT)
gpioWaveTxSend(wid, PI_WAVE_MODE_ONE_SHOT)
while gpioWaveTxBusy():
usleep(100)
gpioWaveDelete(wid)
gpioSetMode(self.tx_pin, INPUT)

2
cygpio/cygpio_test.py Normal file
View File

@ -0,0 +1,2 @@
import cygpio
cygpio.test()

7
cygpio/setup.py Normal file
View File

@ -0,0 +1,7 @@
from distutils.core import setup
from distutils.extension import Extension
from Cython.Build import cythonize
setup(
ext_modules = cythonize([Extension("cygpio", ["cygpio.pyx"], libraries=["pigpio"])])
)