bitvend/bitvend/processor.py

146 lines
4.2 KiB
Python

import threading
import json
import websocket
import time
import logging
from bitvend.utils import to_local_currency
from bitvend.models import db, Transaction
class PaymentProcessor(threading.Thread):
daemon = True
input_address = None
device = None
last_pong = 0
app = None
def __init__(
self, device, input_address=None, chain_id=None, app=None,
token=None):
super(PaymentProcessor, self).__init__()
self.device = device
self.input_address = input_address
self.chain_id = chain_id
self.logger = logging.getLogger(type(self).__name__)
self.token = token
if app:
self.init_app(app)
def init_app(self, app):
self.app = app
if not self.input_address:
self.input_address = self.app.config['INPUT_ADDRESS']
self.chain_id = self.app.config['BLOCKCYPHER_CHAIN']
self.token = self.app.config['BLOCKCYPHER_TOKEN']
def run(self):
self.logger.info('Starting...')
while True:
try:
ws = websocket.WebSocketApp(
"wss://socket.blockcypher.com/v1/%s?token=%s" \
% (self.chain_id, self.token),
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open)
ws.run_forever(ping_timeout=20, ping_interval=30)
except:
self.logger.exception('run_forever failed')
time.sleep(1)
def process_transaction(self, tx):
tx_size = tx['size']
tx_hash = tx['hash']
tx_value = sum([
o['value'] for o in tx['outputs'] if self.input_address in o['addresses']
], 0)
fee = tx['fees']
fee_byte = fee / tx_size
self.logger.info('%r %r %r %r %r', tx_size, tx_hash, tx_value, fee, fee_byte)
self.logger.info('In local currency: %r', to_local_currency(tx_value))
with self.app.app_context():
intx = Transaction(
uid='__bitcoin__',
type='bitcoin',
amount=to_local_currency(tx_value),
tx_hash=tx_hash,
)
db.session.add(intx)
db.session.commit()
outtx = Transaction(
uid='__bitcoin__',
type='purchase',
)
db.session.add(outtx)
db.session.commit()
tx_id = outtx.id
if to_local_currency(tx_value) < 100:
self.logger.warning('Whyyyy so low...')
return
if fee_byte < 15:
self.logger.warning('Fee too low...')
return
self.logger.info('Transaction %d ok, going to device...', tx_id)
# FIXME we need better handling of ACK on POLL responses...
self.device.begin_session(to_local_currency(tx_value), tx_id)
def on_message(self, ws, message):
#print message
data = json.loads(message)
self.logger.info('msg: %r', data)
if 'inputs' in data:
self.process_transaction(data)
elif data.get('event') == 'pong':
self.last_pong = time.time()
def on_error(self, ws, error):
self.logger.error(error)
def on_close(self, ws):
self.logger.info('Connection closed')
def on_open(self, ws):
self.logger.info('Connected, registering for: %r', self.input_address)
ws.send(json.dumps({
"event": "tx-confidence",
"address": self.input_address,
"confidence": 0.9,
}))
threading.Thread(target=self.keepalive, args=(ws,), daemon=True).start()
def keepalive(self, ws):
# Keepalive thread target, just send ping once in a while
while True:
# FIXME check last ping time
ws.send(json.dumps({
"event": "ping"
}))
time.sleep(20)
if time.time() - self.last_pong > 60:
self.logger.warning('Closing socket for inactivity')
ws.close()
return
@property
def online(self):
return time.time() - self.last_pong < 40