import secrets import os import logging import json import ecdsa import binascii from jose.jwk import ECKey from flask import Flask, render_template, g, session from flask_socketio import SocketIO, emit app = Flask(__name__) app.config['SECRET_KEY'] = 'secret!' app.config['ADMIN_PASSWORD'] = 'changeme' socketio = SocketIO(app) log = logging.getLogger() log.setLevel(logging.DEBUG) log.addHandler(logging.StreamHandler()) log.info("starting up...") class Board: def __init__(self, type): self.type = type self.categories = [] def add_category(self, c): self.categories.append(c) def serialize(self): res = {'categories': [c.serialize() for c in self.categories]} return res def copy(self): r = Board(self.type) for c in self.categories: r.add_category(c.copy()) return r @classmethod def unserialize(cls, j): r = cls(j) print('UNSERIALIZING ROUND', j) if 'categories' not in j: log.error("No categories") return for cid, c in enumerate(j['categories']): if 'name' not in c: log.error("No category name") if 'questions' not in c or len(c['questions']) != 5: log.error("Invalid question count") cat = Category(c['name']) r.add_category(cat) questions = c['questions'] for qid, q in enumerate(questions): if 'clue' not in q: log.error("No clue in question") if 'answer' not in q: log.error("No answer in question") value = (qid+1)*100 ques = Question(value, q['clue'], q['answer'], c['name'], qid, cid) cat.add_question(ques) return r class Category: def __init__(self, name): self.name = name self.questions = [] def add_question(self, q): self.questions.append(q) def serialize(self): return {'name': self.name, 'questions': [q.serialize() for q in self.questions]} def copy(self): c = Category(self.name) for q in self.questions: c.add_question(q.copy()) return c class Question: def __init__(self, value, clue, answer, category, qid, cid, answered=False): self.value = value self.clue = clue self.answer = answer self.category = category self.answered = answered self.qid = qid self.cid = cid def serialize(self): return { 'value': self.value, 'clue': self.clue, 'answer': self.answer, 'category': self.category, 'answered': self.answered, } def copy(self): return Question(self.value, self.clue, self.answer, self.category, self.qid, self.cid, self.answered) class Compaction: def __init__(self): self.ct = 0 self.board = None self.roles = {} self.identity = {} self.contestants = {} self.question = None @property def buzzing(self): for _, c in self.contestants.items(): if c.buzzing: return c.identity return None def feed(self, entry): log.info("Compacting {}".format(entry)) if entry['event'] == 'new round': self.board = entry['round'].copy() log.info("LOG ENTRY, new round") elif entry['event'] == 'role set': who, what = entry['identity'], entry['what'] self.roles[who] = what if what == 'CONTESTANT' and who not in self.contestants: self.contestants[who] = Contestant(who, who[:8]) elif who in self.contestants: del self.contestants[who] log.info("LOG ENTRY, role set, {} is {}".format(who, what)) elif entry['event'] == 'pretty set': self.identity[entry['pretty']] = entry['identity'] log.info("LOG ENTRY, pretty set, {} is {}".format(entry['pretty'], entry['identity'])) elif entry['event'] == 'question': cid = entry['cid'] qid = entry['qid'] log.info("LOG ENTRY, question activated, {}/{}".format(cid, qid)) self.question = self.board.categories[cid].questions[qid].copy() self.question.losers = set() for _, c in self.contestants.items(): c.buzzing = False elif entry['event'] == 'buzz': if not self.question: self.ct += 1 return log.info("LOG ENTRY, buzz") who = entry['identity'] if not self.buzzing and who not in self.question.losers: self.contestants[who].buzzing = True elif entry['event'] == 'answered': if not self.question: self.ct += 1 return log.info("LOG ENTRY, answer") ok = entry['ok'] value = self.question.value who = self.buzzing if not who: self.ct += 1 return self.contestants[who].buzzing = False if who in self.question.losers: self.ct += 1 return if ok : self.contestants[who].points += value self.board.categories[self.question.cid].questions[self.question.qid].answered = True self.question = None self.ct += 1 return self.contestants[who].points -= value self.question.losers.add(who) elif entry['event'] == 'give up': if not self.question: self.ct += 1 return self.board.categories[self.question.cid].questions[self.question.qid].answered = True self.question = None elif entry['event'] == 'set name': who = entry['identity'] name = entry['name'] self.contestants[who].name = name else: log.error("Invalid log entry type {}".format(entry['event'])) self.ct += 1 def serialize(self): return { 'board': self.board.serialize() if self.board else None, 'roles': dict(self.roles), 'identity': dict(self.identity), 'contestants': [c.serialize() for (_, c) in self.contestants.items()], 'question': self.question.serialize() if self.question else None, 'losers': list(self.question.losers) if self.question else [], } class Contestant: def __init__(self, identity, name): self.identity = identity self.name = name self.points = 0 self.buzzing = False def serialize(self): return { 'name': self.name, 'points': self.points, 'buzzing': self.buzzing, 'identity': self.identity, } class Game: def __init__(self): self.state = 'IDLE' self.log = [] self._compaction = None def start_round(self, path): with open(path) as f: j = json.load(f) r = Board.unserialize(j) self.log.append({'event': 'new round', 'round': r}) self.save() def activate_question(self, qid, cid): if self.compacted.board.categories[cid].questions[qid].answered: return self.log.append({'event': 'question', 'qid': qid, 'cid': cid}) self.save() def set_role(self, pretty, what): self.log.append({'event': 'role set', 'identity': self.identity[pretty], 'what': what}) self.save() def set_pretty(self, pretty, identity): self.log.append({'event': 'pretty set', 'pretty': pretty, 'identity': identity}) self.save() def answer(self, ok): self.log.append({'event': 'answered', 'ok': ok}) self.save() def give_up(self): self.log.append({'event': 'give up'}) self.save() def set_name(self, who, name): self.log.append({'event': 'set name', 'identity': who, 'name': name}) @property def roles(self): return self.compacted.roles @property def identity(self): return self.compacted.identity @property def compacted(self): if self._compaction is None: self._compaction = Compaction() if self._compaction.ct == len(self.log): return self._compaction missing = self.log[self._compaction.ct:] for m in missing: self._compaction.feed(m) return self._compaction def save(self): with open('jeopardy.dat.new', 'wb') as f: for l in self.log: if l['event'] == 'new round': l = dict(l) l['round'] = l['round'].serialize() f.write(binascii.hexlify(json.dumps(l).encode())) f.write(b'\n') os.rename('jeopardy.dat.new', 'jeopardy.dat') def load(self): if not os.path.exists('jeopardy.dat'): log.info("New game...") return log.info("Loading game...") with open('jeopardy.dat', 'r') as f: for line in f: line = line.strip() entry = json.loads(binascii.unhexlify(line)) print('RESTORING', entry) if entry['event'] == 'new round': entry['round'] = Board.unserialize(entry['round']) self.log.append(entry) self._compaction = None def client_state(self, identity, admin): role = self.roles.get(identity, 'SPECTATOR') return { 'role': role, 'admin': admin, 'state': self.state, 'compacted': self.compacted_filter(self.compacted.serialize(), role, admin), 'ct': self.compacted.ct, } def compacted_filter(self, d, role, admin): if admin: return d if role in ('SPECTATOR', 'CONTESTANT'): del d['roles'] del d['identity'] if d['question']: del d['question']['answer'] # filter out answers if d['board'] is not None: for i, c in enumerate(d['board']['categories']): for j, q in enumerate(c['questions']): del d['board']['categories'][i]['questions'][j]['answer'] return d def buzz(self, identity): if self.compacted.question is None: return for _, c in self.compacted.contestants.items(): if c.buzzing: return self.log.append({'event': 'buzz', 'identity': identity}) app.game = Game() app.game.load() #app.game.start_round('questions/final.js') #app.game.save() def emit_state(): identity = session['identity'] admin = session['admin'] emit('state', app.game.client_state(identity, admin)) @app.route('/') def view_index(): return render_template("index.html") @socketio.on('connect') def sio_connect(): log.info("Client connected") session['identity'] = None session['admin'] = False session['nonce'] = secrets.token_hex(64) emit_state() emit('nonce', session['nonce']) @socketio.on('ping') def sio_ping(): emit_state() @socketio.on('proof') def sio_proof(data): log.info("Client attempting identity proof") if data.get('nonce', '') != session['nonce']: emit('identified', {'identified': False}) return # ,___, #jwk = data.get('jwk', {}) #k = ECKey(jwk, 'ES384') #proof = binascii.unhexlify(data.get('proof')) #if not k.verify(session['nonce'], proof): # emit('identified', {'identified': False}) # return identity = data.get('identity') session['identity'] = identity app.game.set_pretty(identity[:8], identity) emit('identified', {'identified': True}) emit_state() @socketio.on('admin') def sio_admin(data): log.info("Admin connecting...") if data.get('password') != app.config['ADMIN_PASSWORD']: emit('identified', {'identifier': True}) return emit('identified', {'identified': True}) session['admin'] = True if data.get('set_role') != None: who = data['set_role'].get('who') what = data['set_role'].get('what') app.game.set_role(who, what) emit_state() @socketio.on('host-question') def sio_host_question(data): if app.game.roles[session['identity']] != 'HOST': return cid = data.get('category') qid = data.get('question') if cid not in (0, 1, 2, 3, 4): return if qid not in (0, 1, 2, 3, 4): return app.game.activate_question(qid, cid) @socketio.on('host-set-role') def sio_host_contestant(data): if app.game.roles[session['identity']] != 'HOST': return if not data.get('who'): log.info("Who?") return if data.get('what') not in ('SPECTATOR', 'CONTESTANT'): log.info("What?") return app.game.set_role(data['who'], data['what']) @socketio.on('buzz') def sio_buz(): role = app.game.roles.get(session['identity']) if role != 'CONTESTANT': return app.game.buzz(session['identity']) @socketio.on('answer') def sio_answer(data): if app.game.roles[session['identity']] != 'HOST': return ok = data.get('ok', False) app.game.answer(ok) @socketio.on('give up') def sio_give_up(): if app.game.roles[session['identity']] != 'HOST': return app.game.give_up() @socketio.on('set name') def sio_set_name(msg): if app.game.roles[session['identity']] != 'HOST': return app.game.set_name(msg['identity'], msg['name']) if __name__ == '__main__': socketio.run(app, debug=True, host='0.0.0.0')