jeopardy/jeopardy.py
2019-04-13 17:09:11 +02:00

468 lines
14 KiB
Python

import secrets
import os
import logging
import json
import ecdsa
import binascii
from jose.jwk import ECKey
from flask import Flask, render_template, g, session
from flask_socketio import SocketIO, emit
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
app.config['ADMIN_PASSWORD'] = 'changeme'
socketio = SocketIO(app)
log = logging.getLogger()
log.setLevel(logging.DEBUG)
log.addHandler(logging.StreamHandler())
log.info("starting up...")
class Board:
def __init__(self, type):
self.type = type
self.categories = []
def add_category(self, c):
self.categories.append(c)
def serialize(self):
res = {'categories': [c.serialize() for c in self.categories]}
return res
def copy(self):
r = Board(self.type)
for c in self.categories:
r.add_category(c.copy())
return r
@classmethod
def unserialize(cls, j):
r = cls(j)
print('UNSERIALIZING ROUND', j)
if 'categories' not in j:
log.error("No categories")
return
for cid, c in enumerate(j['categories']):
if 'name' not in c:
log.error("No category name")
if 'questions' not in c or len(c['questions']) != 5:
log.error("Invalid question count")
cat = Category(c['name'])
r.add_category(cat)
questions = c['questions']
for qid, q in enumerate(questions):
if 'clue' not in q:
log.error("No clue in question")
if 'answer' not in q:
log.error("No answer in question")
value = (qid+1)*100
ques = Question(value, q['clue'], q['answer'], c['name'], qid, cid)
cat.add_question(ques)
return r
class Category:
def __init__(self, name):
self.name = name
self.questions = []
def add_question(self, q):
self.questions.append(q)
def serialize(self):
return {'name': self.name, 'questions': [q.serialize() for q in self.questions]}
def copy(self):
c = Category(self.name)
for q in self.questions:
c.add_question(q.copy())
return c
class Question:
def __init__(self, value, clue, answer, category, qid, cid, answered=False):
self.value = value
self.clue = clue
self.answer = answer
self.category = category
self.answered = answered
self.qid = qid
self.cid = cid
def serialize(self):
return {
'value': self.value,
'clue': self.clue,
'answer': self.answer,
'category': self.category,
'answered': self.answered,
}
def copy(self):
return Question(self.value, self.clue, self.answer, self.category, self.qid, self.cid, self.answered)
class Compaction:
def __init__(self):
self.ct = 0
self.board = None
self.roles = {}
self.identity = {}
self.contestants = {}
self.question = None
@property
def buzzing(self):
for _, c in self.contestants.items():
if c.buzzing:
return c.identity
return None
def feed(self, entry):
log.info("Compacting {}".format(entry))
if entry['event'] == 'new round':
self.board = entry['round'].copy()
log.info("LOG ENTRY, new round")
elif entry['event'] == 'role set':
who, what = entry['identity'], entry['what']
self.roles[who] = what
if what == 'CONTESTANT' and who not in self.contestants:
self.contestants[who] = Contestant(who, who[:8])
elif who in self.contestants:
del self.contestants[who]
log.info("LOG ENTRY, role set, {} is {}".format(who, what))
elif entry['event'] == 'pretty set':
self.identity[entry['pretty']] = entry['identity']
log.info("LOG ENTRY, pretty set, {} is {}".format(entry['pretty'], entry['identity']))
elif entry['event'] == 'question':
cid = entry['cid']
qid = entry['qid']
log.info("LOG ENTRY, question activated, {}/{}".format(cid, qid))
self.question = self.board.categories[cid].questions[qid].copy()
self.question.losers = set()
for _, c in self.contestants.items():
c.buzzing = False
elif entry['event'] == 'buzz':
if not self.question:
self.ct += 1
return
log.info("LOG ENTRY, buzz")
who = entry['identity']
if not self.buzzing and who not in self.question.losers:
self.contestants[who].buzzing = True
elif entry['event'] == 'answered':
if not self.question:
self.ct += 1
return
log.info("LOG ENTRY, answer")
ok = entry['ok']
value = self.question.value
who = self.buzzing
if not who:
self.ct += 1
return
self.contestants[who].buzzing = False
if who in self.question.losers:
self.ct += 1
return
if ok :
self.contestants[who].points += value
self.board.categories[self.question.cid].questions[self.question.qid].answered = True
self.question = None
self.ct += 1
return
self.contestants[who].points -= value
self.question.losers.add(who)
elif entry['event'] == 'give up':
if not self.question:
self.ct += 1
return
self.board.categories[self.question.cid].questions[self.question.qid].answered = True
self.question = None
elif entry['event'] == 'set name':
who = entry['identity']
name = entry['name']
self.contestants[who].name = name
else:
log.error("Invalid log entry type {}".format(entry['event']))
self.ct += 1
def serialize(self):
return {
'board': self.board.serialize() if self.board else None,
'roles': dict(self.roles),
'identity': dict(self.identity),
'contestants': [c.serialize() for (_, c) in self.contestants.items()],
'question': self.question.serialize() if self.question else None,
'losers': list(self.question.losers) if self.question else [],
}
class Contestant:
def __init__(self, identity, name):
self.identity = identity
self.name = name
self.points = 0
self.buzzing = False
def serialize(self):
return {
'name': self.name,
'points': self.points,
'buzzing': self.buzzing,
'identity': self.identity,
}
class Game:
def __init__(self):
self.state = 'IDLE'
self.log = []
self._compaction = None
def start_round(self, path):
with open(path) as f:
j = json.load(f)
r = Board.unserialize(j)
self.log.append({'event': 'new round', 'round': r})
self.save()
def activate_question(self, qid, cid):
if self.compacted.board.categories[cid].questions[qid].answered:
return
self.log.append({'event': 'question', 'qid': qid, 'cid': cid})
self.save()
def set_role(self, pretty, what):
self.log.append({'event': 'role set', 'identity': self.identity[pretty], 'what': what})
self.save()
def set_pretty(self, pretty, identity):
self.log.append({'event': 'pretty set', 'pretty': pretty, 'identity': identity})
self.save()
def answer(self, ok):
self.log.append({'event': 'answered', 'ok': ok})
self.save()
def give_up(self):
self.log.append({'event': 'give up'})
self.save()
def set_name(self, who, name):
self.log.append({'event': 'set name', 'identity': who, 'name': name})
@property
def roles(self):
return self.compacted.roles
@property
def identity(self):
return self.compacted.identity
@property
def compacted(self):
if self._compaction is None:
self._compaction = Compaction()
if self._compaction.ct == len(self.log):
return self._compaction
missing = self.log[self._compaction.ct:]
for m in missing:
self._compaction.feed(m)
return self._compaction
def save(self):
with open('jeopardy.dat.new', 'wb') as f:
for l in self.log:
if l['event'] == 'new round':
l = dict(l)
l['round'] = l['round'].serialize()
f.write(binascii.hexlify(json.dumps(l).encode()))
f.write(b'\n')
os.rename('jeopardy.dat.new', 'jeopardy.dat')
def load(self):
if not os.path.exists('jeopardy.dat'):
log.info("New game...")
return
log.info("Loading game...")
with open('jeopardy.dat', 'r') as f:
for line in f:
line = line.strip()
entry = json.loads(binascii.unhexlify(line))
print('RESTORING', entry)
if entry['event'] == 'new round':
entry['round'] = Board.unserialize(entry['round'])
self.log.append(entry)
self._compaction = None
def client_state(self, identity, admin):
role = self.roles.get(identity, 'SPECTATOR')
return {
'role': role,
'admin': admin,
'state': self.state,
'compacted': self.compacted_filter(self.compacted.serialize(), role, admin),
'ct': self.compacted.ct,
}
def compacted_filter(self, d, role, admin):
if admin:
return d
if role in ('SPECTATOR', 'CONTESTANT'):
del d['roles']
del d['identity']
if d['question']:
del d['question']['answer']
# filter out answers
if d['board'] is not None:
for i, c in enumerate(d['board']['categories']):
for j, q in enumerate(c['questions']):
del d['board']['categories'][i]['questions'][j]['answer']
return d
def buzz(self, identity):
if self.compacted.question is None:
return
for _, c in self.compacted.contestants.items():
if c.buzzing:
return
self.log.append({'event': 'buzz', 'identity': identity})
app.game = Game()
app.game.load()
#app.game.start_round('questions/final.js')
#app.game.save()
def emit_state():
identity = session['identity']
admin = session['admin']
emit('state', app.game.client_state(identity, admin))
@app.route('/')
def view_index():
return render_template("index.html")
@socketio.on('connect')
def sio_connect():
log.info("Client connected")
session['identity'] = None
session['admin'] = False
session['nonce'] = secrets.token_hex(64)
emit_state()
emit('nonce', session['nonce'])
@socketio.on('ping')
def sio_ping():
emit_state()
@socketio.on('proof')
def sio_proof(data):
log.info("Client attempting identity proof")
if data.get('nonce', '') != session['nonce']:
emit('identified', {'identified': False})
return
# ,___,
#jwk = data.get('jwk', {})
#k = ECKey(jwk, 'ES384')
#proof = binascii.unhexlify(data.get('proof'))
#if not k.verify(session['nonce'], proof):
# emit('identified', {'identified': False})
# return
identity = data.get('identity')
session['identity'] = identity
app.game.set_pretty(identity[:8], identity)
emit('identified', {'identified': True})
emit_state()
@socketio.on('admin')
def sio_admin(data):
log.info("Admin connecting...")
if data.get('password') != app.config['ADMIN_PASSWORD']:
emit('identified', {'identifier': True})
return
emit('identified', {'identified': True})
session['admin'] = True
if data.get('set_role') != None:
who = data['set_role'].get('who')
what = data['set_role'].get('what')
app.game.set_role(who, what)
emit_state()
@socketio.on('host-question')
def sio_host_question(data):
if app.game.roles[session['identity']] != 'HOST':
return
cid = data.get('category')
qid = data.get('question')
if cid not in (0, 1, 2, 3, 4):
return
if qid not in (0, 1, 2, 3, 4):
return
app.game.activate_question(qid, cid)
@socketio.on('host-set-role')
def sio_host_contestant(data):
if app.game.roles[session['identity']] != 'HOST':
return
if not data.get('who'):
log.info("Who?")
return
if data.get('what') not in ('SPECTATOR', 'CONTESTANT'):
log.info("What?")
return
app.game.set_role(data['who'], data['what'])
@socketio.on('buzz')
def sio_buz():
role = app.game.roles.get(session['identity'])
if role != 'CONTESTANT':
return
app.game.buzz(session['identity'])
@socketio.on('answer')
def sio_answer(data):
if app.game.roles[session['identity']] != 'HOST':
return
ok = data.get('ok', False)
app.game.answer(ok)
@socketio.on('give up')
def sio_give_up():
if app.game.roles[session['identity']] != 'HOST':
return
app.game.give_up()
@socketio.on('set name')
def sio_set_name(msg):
if app.game.roles[session['identity']] != 'HOST':
return
app.game.set_name(msg['identity'], msg['name'])
if __name__ == '__main__':
socketio.run(app, debug=True, host='0.0.0.0')