bitvend/bitvend/processor.py

148 lines
4.2 KiB
Python

import threading
import json
import websocket
import time
import logging
from bitvend.utils import to_local_currency
from bitvend.models import db, Transaction
class PaymentProcessor(threading.Thread):
daemon = True
input_address = None
device = None
last_pong = 0
app = None
def __init__(self, device, input_address=None, chain_id=None, app=None, token=None):
super(PaymentProcessor, self).__init__()
self.device = device
self.input_address = input_address
self.chain_id = chain_id
self.logger = logging.getLogger(type(self).__name__)
self.token = token
if app:
self.init_app(app)
def init_app(self, app):
self.app = app
if not self.input_address:
self.input_address = self.app.config["INPUT_ADDRESS"]
self.chain_id = self.app.config["BLOCKCYPHER_CHAIN"]
self.token = self.app.config["BLOCKCYPHER_TOKEN"]
def run(self):
self.logger.info("Starting...")
while True:
try:
ws = websocket.WebSocketApp(
"wss://socket.blockcypher.com/v1/%s?token=%s"
% (self.chain_id, self.token),
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open,
)
ws.run_forever(ping_timeout=20, ping_interval=30)
except:
self.logger.exception("run_forever failed")
time.sleep(1)
def process_transaction(self, tx):
tx_size = tx["size"]
tx_hash = tx["hash"]
tx_value = sum(
[o["value"] for o in tx["outputs"] if self.input_address in o["addresses"]],
0,
)
fee = tx["fees"]
fee_byte = fee / tx_size
self.logger.info("%r %r %r %r %r", tx_size, tx_hash, tx_value, fee, fee_byte)
self.logger.info("In local currency: %r", to_local_currency(tx_value))
with self.app.app_context():
intx = Transaction(
uid="__bitcoin__",
type="bitcoin",
amount=to_local_currency(tx_value),
tx_hash=tx_hash,
)
db.session.add(intx)
db.session.commit()
outtx = Transaction(
uid="__bitcoin__",
type="purchase",
)
db.session.add(outtx)
db.session.commit()
tx_id = outtx.id
if to_local_currency(tx_value) < 100:
self.logger.warning("Whyyyy so low...")
return
if fee_byte < 15:
self.logger.warning("Fee too low...")
return
self.logger.info("Transaction %d ok, going to device...", tx_id)
# FIXME we need better handling of ACK on POLL responses...
self.device.begin_session(to_local_currency(tx_value), tx_id)
def on_message(self, ws, message):
# print message
data = json.loads(message)
self.logger.info("msg: %r", data)
if "inputs" in data:
self.process_transaction(data)
elif data.get("event") == "pong":
self.last_pong = time.time()
def on_error(self, ws, error):
self.logger.error(error)
def on_close(self, ws):
self.logger.info("Connection closed")
def on_open(self, ws):
self.logger.info("Connected, registering for: %r", self.input_address)
ws.send(
json.dumps(
{
"event": "tx-confidence",
"address": self.input_address,
"confidence": 0.9,
}
)
)
threading.Thread(target=self.keepalive, args=(ws,), daemon=True).start()
def keepalive(self, ws):
# Keepalive thread target, just send ping once in a while
while True:
# FIXME check last ping time
ws.send(json.dumps({"event": "ping"}))
time.sleep(20)
if time.time() - self.last_pong > 60:
self.logger.warning("Closing socket for inactivity")
ws.close()
return
@property
def online(self):
return time.time() - self.last_pong < 40