convert to asyncio
parent
989751c102
commit
0ba4b41917
211
esp32/main.py
211
esp32/main.py
|
@ -2,57 +2,172 @@ from pn532 import PN532Uart, PN532Error
|
|||
import utime
|
||||
import machine
|
||||
import hashlib
|
||||
import os
|
||||
import asyncio
|
||||
|
||||
DEBUG = True
|
||||
|
||||
try:
|
||||
door = machine.Pin(2, machine.Pin.OUT)
|
||||
door.value(0)
|
||||
|
||||
uart = machine.UART(1, tx=16, rx=17, baudrate=9600)
|
||||
uart.write("F")
|
||||
rf = PN532Uart(2, tx=22, rx=19)
|
||||
rf.SAM_configuration()
|
||||
ic, ver, rev, support = rf.get_firmware_version()
|
||||
print('Found PN532 with firmware version: {0}.{1}'.format(ver, rev))
|
||||
except Exception as e:
|
||||
rf = None
|
||||
print('No NFC reader (PN532) detected')
|
||||
class Keypad:
|
||||
CMD_RESET = 'F'
|
||||
CMD_ENABLE_FEEDBACK = 'Q'
|
||||
CMD_LED_GREEN = 'G'
|
||||
CMD_LED_RED = 'R'
|
||||
CMD_GRANTED = 'H' # AKA happy
|
||||
CMD_DENIED = 'S' # AKA sad
|
||||
|
||||
while rf is not None:
|
||||
try:
|
||||
uid = rf.read_passive_target()
|
||||
print("Card UUID: " + ''.join('{:02x}'.format(x) for x in uid))
|
||||
card=''.join('{:02x}'.format(x) for x in uid)
|
||||
uart.write("QG")
|
||||
def __init__(self, uart):
|
||||
self._uart = uart
|
||||
|
||||
def write(self, data):
|
||||
self._uart.write(data)
|
||||
self._uart.flush()
|
||||
|
||||
async def get_pin(self, timeout_ms=10000):
|
||||
uart = self._uart
|
||||
|
||||
# discard previous bytes
|
||||
uart.read(uart.any())
|
||||
|
||||
uart.write(self.CMD_ENABLE_FEEDBACK)
|
||||
uart.write(self.CMD_LED_GREEN)
|
||||
uart.flush()
|
||||
c=0
|
||||
buf=''
|
||||
success = False
|
||||
|
||||
data = bytearray()
|
||||
while len(data) < 4 and timeout_ms > 0:
|
||||
n = uart.any()
|
||||
for c in uart.read(n):
|
||||
if ord('0') <= c <= ord('9'):
|
||||
# print(f"rx: {c}")
|
||||
data.append(c)
|
||||
await asyncio.sleep(0.1)
|
||||
timeout_ms -= 100
|
||||
|
||||
return data[:4]
|
||||
|
||||
|
||||
class Door:
|
||||
def __init__(self, pin):
|
||||
self._pin = pin
|
||||
|
||||
def unlock(self):
|
||||
if self._pin is not None:
|
||||
self._pin.value(1)
|
||||
|
||||
def lock(self):
|
||||
if self._pin is not None:
|
||||
self._pin.value(0)
|
||||
|
||||
def generate_hash(card_uid, pin):
|
||||
card_uid = bytes(reversed(card_uid[:4])).hex()
|
||||
s = "{:08x}:{}".format(int(pin), card_uid)
|
||||
hash=hashlib.sha256(s.encode('ascii')).digest().hex()
|
||||
return hash
|
||||
|
||||
|
||||
async def handle_auth(nfc, keypad, door):
|
||||
while True:
|
||||
card_uid = await nfc.wait_uid()
|
||||
print("Card UUID: " + ''.join('{:02x}'.format(x) for x in card_uid))
|
||||
|
||||
pin = await keypad.get_pin()
|
||||
keypad.write(keypad.CMD_RESET)
|
||||
|
||||
if len(pin) < 4:
|
||||
print("Pin timeout")
|
||||
keypad.write(keypad.CMD_DENIED)
|
||||
await asyncio.sleep(0.5)
|
||||
keypad.write(keypad.CMD_RESET)
|
||||
continue
|
||||
|
||||
hash = generate_hash(card_uid, pin)
|
||||
print(f'Card hash: {hash}')
|
||||
|
||||
hash_found = False
|
||||
with open("hashes") as f:
|
||||
for user in f:
|
||||
if(user.strip() == hash):
|
||||
hash_found = True
|
||||
break
|
||||
|
||||
try:
|
||||
if hash_found:
|
||||
print('Known hash, opening door')
|
||||
keypad.write(keypad.CMD_GRANTED)
|
||||
door.unlock()
|
||||
else:
|
||||
print('Unknown hash, ignoring')
|
||||
keypad.write(keypad.CMD_DENIED)
|
||||
|
||||
await asyncio.sleep(2)
|
||||
|
||||
finally:
|
||||
door.lock()
|
||||
keypad.write(keypad.CMD_RESET)
|
||||
|
||||
|
||||
class Nfc:
|
||||
def __init__(self):
|
||||
self._uids = []
|
||||
self._flag = asyncio.Event()
|
||||
|
||||
async def wait_uid(self):
|
||||
# discard queued uids
|
||||
for _ in range(len(self._uids)):
|
||||
self._uids.pop()
|
||||
|
||||
while not self._uids:
|
||||
await self._flag.wait()
|
||||
self._flag.clear()
|
||||
|
||||
uid = None
|
||||
while self._uids:
|
||||
uid = self._uids.pop()
|
||||
return uid
|
||||
|
||||
async def loop(self):
|
||||
rf = PN532Uart(2, rx=19, tx=22)
|
||||
|
||||
try:
|
||||
rf.SAM_configuration()
|
||||
ic, ver, rev, support = rf.get_firmware_version()
|
||||
print(f'Found PN532 with firmware version: {ver}.{rev}')
|
||||
except Exception as e:
|
||||
print('No NFC reader (PN532) detected')
|
||||
raise
|
||||
|
||||
while True:
|
||||
if uart.any():
|
||||
c=c+1
|
||||
buf+=str(chr(uart.read()[0]))
|
||||
if c == 4:
|
||||
break
|
||||
hash=hashlib.sha256(str("{0:#0{1}x}".format(int(buf),10)+":"+card[6:8]+card[4:6]+card[2:4]+card[0:2])[2:].encode()).digest().hex()
|
||||
print('Card hash: {0}'.format(hash))
|
||||
f = open("hashes")
|
||||
for user in f:
|
||||
if(user.strip() == hash):
|
||||
success = True
|
||||
if(success):
|
||||
print('Known hash, opening door')
|
||||
uart.write("FH")
|
||||
door.value(1)
|
||||
utime.sleep(2)
|
||||
door.value(0)
|
||||
else:
|
||||
print('Unknown hash, ignoring')
|
||||
uart.write("FS")
|
||||
utime.sleep(2)
|
||||
buf=''
|
||||
except PN532Error as e:
|
||||
print('PN532:', e)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
uid = None
|
||||
try:
|
||||
uid = rf.read_passive_target()
|
||||
except PN532Error as e:
|
||||
print('PN532:', e)
|
||||
|
||||
rf.power_down()
|
||||
|
||||
if uid is not None:
|
||||
self._uids.append(uid)
|
||||
self._flag.set()
|
||||
|
||||
await asyncio.sleep(0.25)
|
||||
|
||||
|
||||
async def main():
|
||||
print("Lock app wil start in 2s")
|
||||
await asyncio.sleep(2)
|
||||
|
||||
keypad = Keypad(machine.UART(1, tx=16, rx=17, baudrate=9600))
|
||||
keypad.write(keypad.CMD_RESET)
|
||||
|
||||
nfc = Nfc()
|
||||
|
||||
door = Door(machine.Pin(2, machine.Pin.OUT))
|
||||
door.lock()
|
||||
|
||||
await asyncio.gather(
|
||||
handle_auth(nfc, keypad, door),
|
||||
nfc.loop(),
|
||||
)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
asyncio.run(main())
|
||||
|
|
Loading…
Reference in New Issue