148 lines
4.2 KiB
Python
148 lines
4.2 KiB
Python
import threading
|
|
import json
|
|
import websocket
|
|
import time
|
|
import logging
|
|
|
|
from bitvend.utils import to_local_currency
|
|
from bitvend.models import db, Transaction
|
|
|
|
|
|
class PaymentProcessor(threading.Thread):
|
|
daemon = True
|
|
input_address = None
|
|
device = None
|
|
last_pong = 0
|
|
app = None
|
|
|
|
def __init__(self, device, input_address=None, chain_id=None, app=None, token=None):
|
|
super(PaymentProcessor, self).__init__()
|
|
self.device = device
|
|
self.input_address = input_address
|
|
self.chain_id = chain_id
|
|
self.logger = logging.getLogger(type(self).__name__)
|
|
self.token = token
|
|
|
|
if app:
|
|
self.init_app(app)
|
|
|
|
def init_app(self, app):
|
|
self.app = app
|
|
|
|
if not self.input_address:
|
|
self.input_address = self.app.config["INPUT_ADDRESS"]
|
|
self.chain_id = self.app.config["BLOCKCYPHER_CHAIN"]
|
|
self.token = self.app.config["BLOCKCYPHER_TOKEN"]
|
|
|
|
def run(self):
|
|
self.logger.info("Starting...")
|
|
|
|
while True:
|
|
try:
|
|
ws = websocket.WebSocketApp(
|
|
"wss://socket.blockcypher.com/v1/%s?token=%s"
|
|
% (self.chain_id, self.token),
|
|
on_message=self.on_message,
|
|
on_error=self.on_error,
|
|
on_close=self.on_close,
|
|
on_open=self.on_open,
|
|
)
|
|
|
|
ws.run_forever(ping_timeout=20, ping_interval=30)
|
|
except:
|
|
self.logger.exception("run_forever failed")
|
|
time.sleep(1)
|
|
|
|
def process_transaction(self, tx):
|
|
tx_size = tx["size"]
|
|
tx_hash = tx["hash"]
|
|
tx_value = sum(
|
|
[o["value"] for o in tx["outputs"] if self.input_address in o["addresses"]],
|
|
0,
|
|
)
|
|
fee = tx["fees"]
|
|
|
|
fee_byte = fee / tx_size
|
|
|
|
self.logger.info("%r %r %r %r %r", tx_size, tx_hash, tx_value, fee, fee_byte)
|
|
self.logger.info("In local currency: %r", to_local_currency(tx_value))
|
|
|
|
with self.app.app_context():
|
|
intx = Transaction(
|
|
uid="__bitcoin__",
|
|
type="bitcoin",
|
|
amount=to_local_currency(tx_value),
|
|
tx_hash=tx_hash,
|
|
)
|
|
db.session.add(intx)
|
|
db.session.commit()
|
|
|
|
outtx = Transaction(
|
|
uid="__bitcoin__",
|
|
type="purchase",
|
|
)
|
|
db.session.add(outtx)
|
|
db.session.commit()
|
|
|
|
tx_id = outtx.id
|
|
|
|
if to_local_currency(tx_value) < 100:
|
|
self.logger.warning("Whyyyy so low...")
|
|
return
|
|
|
|
if fee_byte < 15:
|
|
self.logger.warning("Fee too low...")
|
|
return
|
|
|
|
self.logger.info("Transaction %d ok, going to device...", tx_id)
|
|
|
|
# FIXME we need better handling of ACK on POLL responses...
|
|
self.device.begin_session(to_local_currency(tx_value), tx_id)
|
|
|
|
def on_message(self, ws, message):
|
|
# print message
|
|
data = json.loads(message)
|
|
self.logger.info("msg: %r", data)
|
|
|
|
if "inputs" in data:
|
|
self.process_transaction(data)
|
|
elif data.get("event") == "pong":
|
|
self.last_pong = time.time()
|
|
|
|
def on_error(self, ws, error):
|
|
self.logger.error(error)
|
|
|
|
def on_close(self, ws):
|
|
self.logger.info("Connection closed")
|
|
|
|
def on_open(self, ws):
|
|
self.logger.info("Connected, registering for: %r", self.input_address)
|
|
|
|
ws.send(
|
|
json.dumps(
|
|
{
|
|
"event": "tx-confidence",
|
|
"address": self.input_address,
|
|
"confidence": 0.9,
|
|
}
|
|
)
|
|
)
|
|
|
|
threading.Thread(target=self.keepalive, args=(ws,), daemon=True).start()
|
|
|
|
def keepalive(self, ws):
|
|
# Keepalive thread target, just send ping once in a while
|
|
while True:
|
|
# FIXME check last ping time
|
|
ws.send(json.dumps({"event": "ping"}))
|
|
time.sleep(20)
|
|
|
|
if time.time() - self.last_pong > 60:
|
|
self.logger.warning("Closing socket for inactivity")
|
|
ws.close()
|
|
return
|
|
|
|
@property
|
|
def online(self):
|
|
return time.time() - self.last_pong < 40
|