bitvend/cygpio/cygpio.pyx

99 lines
2.7 KiB
Cython

RX_PIN = 4
TX_PIN = 17
cdef extern from "pigpio.h":
int gpioInitialise()
int gpioCfgInterfaces(unsigned ifFlags)
int gpioSetMode(unsigned gpio, unsigned mode)
int gpioSerialReadOpen(unsigned user_gpio, unsigned baud, unsigned data_bits)
int gpioSerialRead(unsigned user_gpio, void *buf, size_t bufSize) nogil
int gpioSerialReadClose(unsigned user_gpio)
int gpioWaveCreate()
int gpioWaveDelete(unsigned wave_id)
int gpioWaveClear()
int gpioWaveTxSend(unsigned wave_id, unsigned wave_mode)
int gpioWaveTxBusy()
int gpioWaveAddSerial(unsigned user_gpio, unsigned baud, unsigned data_bits, unsigned stop_bits, unsigned offset, unsigned numBytes, char *str)
int gpioCfgMemAlloc(unsigned memAllocMode)
unsigned int gpioCfgGetInternals()
int gpioCfgSetInternals(unsigned int cfgVal)
cdef int INPUT "PI_INPUT"
cdef int OUTPUT "PI_OUTPUT"
cdef int PI_DISABLE_FIFO_IF
cdef int PI_DISABLE_SOCK_IF
cdef int PI_WAVE_MODE_ONE_SHOT
cdef unsigned PI_MEM_ALLOC_PAGEMAP
cdef extern from "unistd.h" nogil:
unsigned int sleep(unsigned int seconds)
unsigned int usleep(unsigned int usecs)
def test():
b = CythonRaspiBackend()
b.open()
while True:
print(repr(b.read()))
cdef class CythonRaspiBackend(object):
cdef int rx_pin
cdef int tx_pin
def __init__(self, rx_pin=RX_PIN, tx_pin=TX_PIN):
self.rx_pin = rx_pin
self.tx_pin = tx_pin
cpdef open(self):
# Enable full on debug
gpioCfgSetInternals(gpioCfgGetInternals() | 8);
# Force usage of non-mailbox DMA
gpioCfgMemAlloc(PI_MEM_ALLOC_PAGEMAP);
gpioCfgInterfaces(PI_DISABLE_FIFO_IF | PI_DISABLE_SOCK_IF);
gpioInitialise()
gpioWaveClear()
gpioSetMode(self.tx_pin, INPUT)
# gpioSerClose...
cdef int resp = gpioSerialReadOpen(self.rx_pin, 9600, 9)
if resp != 0:
raise Exception('Serial open failed: %d' % resp)
cpdef read(self):
cdef unsigned char buf[1024]
cdef int read_size
with nogil:
while 1:
read_size = gpioSerialRead(self.rx_pin, &buf, sizeof(buf))
if read_size > 0:
break
usleep(100)
return bytes(buf[0:read_size])
cpdef write(self, data):
cdef char* c_data = data
gpioWaveAddSerial(self.tx_pin, 9600, 9, 6, 0, len(data), c_data)
wid = gpioWaveCreate()
gpioSetMode(self.tx_pin, OUTPUT)
gpioWaveTxSend(wid, PI_WAVE_MODE_ONE_SHOT)
while gpioWaveTxBusy():
usleep(100)
gpioWaveDelete(wid)
gpioSetMode(self.tx_pin, INPUT)