468 lines
14 KiB
Python
468 lines
14 KiB
Python
import secrets
|
|
import os
|
|
import logging
|
|
import json
|
|
import ecdsa
|
|
import binascii
|
|
|
|
from jose.jwk import ECKey
|
|
from flask import Flask, render_template, g, session
|
|
from flask_socketio import SocketIO, emit
|
|
|
|
|
|
app = Flask(__name__)
|
|
app.config['SECRET_KEY'] = 'secret!'
|
|
app.config['ADMIN_PASSWORD'] = 'changeme'
|
|
socketio = SocketIO(app)
|
|
log = logging.getLogger()
|
|
log.setLevel(logging.DEBUG)
|
|
log.addHandler(logging.StreamHandler())
|
|
|
|
log.info("starting up...")
|
|
|
|
|
|
class Board:
|
|
def __init__(self, type):
|
|
self.type = type
|
|
self.categories = []
|
|
|
|
def add_category(self, c):
|
|
self.categories.append(c)
|
|
|
|
def serialize(self):
|
|
res = {'categories': [c.serialize() for c in self.categories]}
|
|
return res
|
|
|
|
def copy(self):
|
|
r = Board(self.type)
|
|
for c in self.categories:
|
|
r.add_category(c.copy())
|
|
return r
|
|
|
|
@classmethod
|
|
def unserialize(cls, j):
|
|
r = cls(j)
|
|
print('UNSERIALIZING ROUND', j)
|
|
|
|
if 'categories' not in j:
|
|
log.error("No categories")
|
|
return
|
|
|
|
for cid, c in enumerate(j['categories']):
|
|
if 'name' not in c:
|
|
log.error("No category name")
|
|
if 'questions' not in c or len(c['questions']) != 5:
|
|
log.error("Invalid question count")
|
|
|
|
cat = Category(c['name'])
|
|
r.add_category(cat)
|
|
|
|
questions = c['questions']
|
|
for qid, q in enumerate(questions):
|
|
if 'clue' not in q:
|
|
log.error("No clue in question")
|
|
if 'answer' not in q:
|
|
log.error("No answer in question")
|
|
|
|
value = (qid+1)*100
|
|
ques = Question(value, q['clue'], q['answer'], c['name'], qid, cid)
|
|
cat.add_question(ques)
|
|
|
|
return r
|
|
|
|
class Category:
|
|
def __init__(self, name):
|
|
self.name = name
|
|
self.questions = []
|
|
|
|
def add_question(self, q):
|
|
self.questions.append(q)
|
|
|
|
def serialize(self):
|
|
return {'name': self.name, 'questions': [q.serialize() for q in self.questions]}
|
|
|
|
def copy(self):
|
|
c = Category(self.name)
|
|
for q in self.questions:
|
|
c.add_question(q.copy())
|
|
return c
|
|
|
|
class Question:
|
|
def __init__(self, value, clue, answer, category, qid, cid, answered=False):
|
|
self.value = value
|
|
self.clue = clue
|
|
self.answer = answer
|
|
self.category = category
|
|
self.answered = answered
|
|
self.qid = qid
|
|
self.cid = cid
|
|
|
|
def serialize(self):
|
|
return {
|
|
'value': self.value,
|
|
'clue': self.clue,
|
|
'answer': self.answer,
|
|
'category': self.category,
|
|
'answered': self.answered,
|
|
}
|
|
|
|
def copy(self):
|
|
return Question(self.value, self.clue, self.answer, self.category, self.qid, self.cid, self.answered)
|
|
|
|
class Compaction:
|
|
def __init__(self):
|
|
self.ct = 0
|
|
self.board = None
|
|
self.roles = {}
|
|
self.identity = {}
|
|
self.contestants = {}
|
|
self.question = None
|
|
|
|
@property
|
|
def buzzing(self):
|
|
for _, c in self.contestants.items():
|
|
if c.buzzing:
|
|
return c.identity
|
|
return None
|
|
|
|
def feed(self, entry):
|
|
log.info("Compacting {}".format(entry))
|
|
if entry['event'] == 'new round':
|
|
self.board = entry['round'].copy()
|
|
log.info("LOG ENTRY, new round")
|
|
elif entry['event'] == 'role set':
|
|
who, what = entry['identity'], entry['what']
|
|
self.roles[who] = what
|
|
if what == 'CONTESTANT' and who not in self.contestants:
|
|
self.contestants[who] = Contestant(who, who[:8])
|
|
elif who in self.contestants:
|
|
del self.contestants[who]
|
|
log.info("LOG ENTRY, role set, {} is {}".format(who, what))
|
|
elif entry['event'] == 'pretty set':
|
|
self.identity[entry['pretty']] = entry['identity']
|
|
log.info("LOG ENTRY, pretty set, {} is {}".format(entry['pretty'], entry['identity']))
|
|
elif entry['event'] == 'question':
|
|
cid = entry['cid']
|
|
qid = entry['qid']
|
|
log.info("LOG ENTRY, question activated, {}/{}".format(cid, qid))
|
|
self.question = self.board.categories[cid].questions[qid].copy()
|
|
self.question.losers = set()
|
|
for _, c in self.contestants.items():
|
|
c.buzzing = False
|
|
elif entry['event'] == 'buzz':
|
|
if not self.question:
|
|
self.ct += 1
|
|
return
|
|
log.info("LOG ENTRY, buzz")
|
|
who = entry['identity']
|
|
if not self.buzzing and who not in self.question.losers:
|
|
self.contestants[who].buzzing = True
|
|
elif entry['event'] == 'answered':
|
|
if not self.question:
|
|
self.ct += 1
|
|
return
|
|
log.info("LOG ENTRY, answer")
|
|
ok = entry['ok']
|
|
value = self.question.value
|
|
who = self.buzzing
|
|
if not who:
|
|
self.ct += 1
|
|
return
|
|
self.contestants[who].buzzing = False
|
|
|
|
if who in self.question.losers:
|
|
self.ct += 1
|
|
return
|
|
if ok :
|
|
self.contestants[who].points += value
|
|
self.board.categories[self.question.cid].questions[self.question.qid].answered = True
|
|
self.question = None
|
|
self.ct += 1
|
|
return
|
|
|
|
self.contestants[who].points -= value
|
|
self.question.losers.add(who)
|
|
elif entry['event'] == 'give up':
|
|
if not self.question:
|
|
self.ct += 1
|
|
return
|
|
self.board.categories[self.question.cid].questions[self.question.qid].answered = True
|
|
self.question = None
|
|
elif entry['event'] == 'set name':
|
|
who = entry['identity']
|
|
name = entry['name']
|
|
self.contestants[who].name = name
|
|
else:
|
|
log.error("Invalid log entry type {}".format(entry['event']))
|
|
self.ct += 1
|
|
|
|
def serialize(self):
|
|
return {
|
|
'board': self.board.serialize() if self.board else None,
|
|
'roles': dict(self.roles),
|
|
'identity': dict(self.identity),
|
|
'contestants': [c.serialize() for (_, c) in self.contestants.items()],
|
|
'question': self.question.serialize() if self.question else None,
|
|
'losers': list(self.question.losers) if self.question else [],
|
|
}
|
|
|
|
|
|
class Contestant:
|
|
def __init__(self, identity, name):
|
|
self.identity = identity
|
|
self.name = name
|
|
self.points = 0
|
|
self.buzzing = False
|
|
|
|
def serialize(self):
|
|
return {
|
|
'name': self.name,
|
|
'points': self.points,
|
|
'buzzing': self.buzzing,
|
|
'identity': self.identity,
|
|
}
|
|
|
|
|
|
class Game:
|
|
def __init__(self):
|
|
self.state = 'IDLE'
|
|
self.log = []
|
|
self._compaction = None
|
|
|
|
def start_round(self, path):
|
|
with open(path) as f:
|
|
j = json.load(f)
|
|
r = Board.unserialize(j)
|
|
self.log.append({'event': 'new round', 'round': r})
|
|
self.save()
|
|
|
|
def activate_question(self, qid, cid):
|
|
if self.compacted.board.categories[cid].questions[qid].answered:
|
|
return
|
|
self.log.append({'event': 'question', 'qid': qid, 'cid': cid})
|
|
self.save()
|
|
|
|
def set_role(self, pretty, what):
|
|
self.log.append({'event': 'role set', 'identity': self.identity[pretty], 'what': what})
|
|
self.save()
|
|
|
|
def set_pretty(self, pretty, identity):
|
|
self.log.append({'event': 'pretty set', 'pretty': pretty, 'identity': identity})
|
|
self.save()
|
|
|
|
def answer(self, ok):
|
|
self.log.append({'event': 'answered', 'ok': ok})
|
|
self.save()
|
|
|
|
def give_up(self):
|
|
self.log.append({'event': 'give up'})
|
|
self.save()
|
|
|
|
def set_name(self, who, name):
|
|
self.log.append({'event': 'set name', 'identity': who, 'name': name})
|
|
|
|
@property
|
|
def roles(self):
|
|
return self.compacted.roles
|
|
|
|
@property
|
|
def identity(self):
|
|
return self.compacted.identity
|
|
|
|
@property
|
|
def compacted(self):
|
|
if self._compaction is None:
|
|
self._compaction = Compaction()
|
|
|
|
if self._compaction.ct == len(self.log):
|
|
return self._compaction
|
|
|
|
missing = self.log[self._compaction.ct:]
|
|
for m in missing:
|
|
self._compaction.feed(m)
|
|
|
|
return self._compaction
|
|
|
|
def save(self):
|
|
with open('jeopardy.dat.new', 'wb') as f:
|
|
for l in self.log:
|
|
if l['event'] == 'new round':
|
|
l = dict(l)
|
|
l['round'] = l['round'].serialize()
|
|
f.write(binascii.hexlify(json.dumps(l).encode()))
|
|
f.write(b'\n')
|
|
os.rename('jeopardy.dat.new', 'jeopardy.dat')
|
|
|
|
def load(self):
|
|
if not os.path.exists('jeopardy.dat'):
|
|
log.info("New game...")
|
|
return
|
|
log.info("Loading game...")
|
|
with open('jeopardy.dat', 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
entry = json.loads(binascii.unhexlify(line))
|
|
print('RESTORING', entry)
|
|
if entry['event'] == 'new round':
|
|
entry['round'] = Board.unserialize(entry['round'])
|
|
self.log.append(entry)
|
|
|
|
self._compaction = None
|
|
|
|
def client_state(self, identity, admin):
|
|
role = self.roles.get(identity, 'SPECTATOR')
|
|
return {
|
|
'role': role,
|
|
'admin': admin,
|
|
'state': self.state,
|
|
'compacted': self.compacted_filter(self.compacted.serialize(), role, admin),
|
|
'ct': self.compacted.ct,
|
|
}
|
|
|
|
def compacted_filter(self, d, role, admin):
|
|
if admin:
|
|
return d
|
|
if role in ('SPECTATOR', 'CONTESTANT'):
|
|
del d['roles']
|
|
del d['identity']
|
|
if d['question']:
|
|
del d['question']['answer']
|
|
# filter out answers
|
|
if d['board'] is not None:
|
|
for i, c in enumerate(d['board']['categories']):
|
|
for j, q in enumerate(c['questions']):
|
|
del d['board']['categories'][i]['questions'][j]['answer']
|
|
|
|
return d
|
|
|
|
def buzz(self, identity):
|
|
if self.compacted.question is None:
|
|
return
|
|
for _, c in self.compacted.contestants.items():
|
|
if c.buzzing:
|
|
return
|
|
self.log.append({'event': 'buzz', 'identity': identity})
|
|
|
|
|
|
app.game = Game()
|
|
app.game.load()
|
|
#app.game.start_round('questions/final.js')
|
|
#app.game.save()
|
|
|
|
|
|
def emit_state():
|
|
identity = session['identity']
|
|
admin = session['admin']
|
|
emit('state', app.game.client_state(identity, admin))
|
|
|
|
@app.route('/')
|
|
def view_index():
|
|
return render_template("index.html")
|
|
|
|
|
|
@socketio.on('connect')
|
|
def sio_connect():
|
|
log.info("Client connected")
|
|
session['identity'] = None
|
|
session['admin'] = False
|
|
session['nonce'] = secrets.token_hex(64)
|
|
emit_state()
|
|
emit('nonce', session['nonce'])
|
|
|
|
@socketio.on('ping')
|
|
def sio_ping():
|
|
emit_state()
|
|
|
|
@socketio.on('proof')
|
|
def sio_proof(data):
|
|
log.info("Client attempting identity proof")
|
|
if data.get('nonce', '') != session['nonce']:
|
|
emit('identified', {'identified': False})
|
|
return
|
|
|
|
# ,___,
|
|
#jwk = data.get('jwk', {})
|
|
#k = ECKey(jwk, 'ES384')
|
|
#proof = binascii.unhexlify(data.get('proof'))
|
|
#if not k.verify(session['nonce'], proof):
|
|
# emit('identified', {'identified': False})
|
|
# return
|
|
|
|
identity = data.get('identity')
|
|
session['identity'] = identity
|
|
app.game.set_pretty(identity[:8], identity)
|
|
emit('identified', {'identified': True})
|
|
|
|
emit_state()
|
|
|
|
@socketio.on('admin')
|
|
def sio_admin(data):
|
|
log.info("Admin connecting...")
|
|
if data.get('password') != app.config['ADMIN_PASSWORD']:
|
|
emit('identified', {'identifier': True})
|
|
return
|
|
|
|
emit('identified', {'identified': True})
|
|
session['admin'] = True
|
|
|
|
if data.get('set_role') != None:
|
|
who = data['set_role'].get('who')
|
|
what = data['set_role'].get('what')
|
|
app.game.set_role(who, what)
|
|
|
|
emit_state()
|
|
|
|
@socketio.on('host-question')
|
|
def sio_host_question(data):
|
|
if app.game.roles[session['identity']] != 'HOST':
|
|
return
|
|
|
|
cid = data.get('category')
|
|
qid = data.get('question')
|
|
if cid not in (0, 1, 2, 3, 4):
|
|
return
|
|
if qid not in (0, 1, 2, 3, 4):
|
|
return
|
|
app.game.activate_question(qid, cid)
|
|
|
|
@socketio.on('host-set-role')
|
|
def sio_host_contestant(data):
|
|
if app.game.roles[session['identity']] != 'HOST':
|
|
return
|
|
if not data.get('who'):
|
|
log.info("Who?")
|
|
return
|
|
if data.get('what') not in ('SPECTATOR', 'CONTESTANT'):
|
|
log.info("What?")
|
|
return
|
|
app.game.set_role(data['who'], data['what'])
|
|
|
|
@socketio.on('buzz')
|
|
def sio_buz():
|
|
role = app.game.roles.get(session['identity'])
|
|
if role != 'CONTESTANT':
|
|
return
|
|
app.game.buzz(session['identity'])
|
|
|
|
@socketio.on('answer')
|
|
def sio_answer(data):
|
|
if app.game.roles[session['identity']] != 'HOST':
|
|
return
|
|
|
|
ok = data.get('ok', False)
|
|
app.game.answer(ok)
|
|
|
|
@socketio.on('give up')
|
|
def sio_give_up():
|
|
if app.game.roles[session['identity']] != 'HOST':
|
|
return
|
|
app.game.give_up()
|
|
|
|
@socketio.on('set name')
|
|
def sio_set_name(msg):
|
|
if app.game.roles[session['identity']] != 'HOST':
|
|
return
|
|
app.game.set_name(msg['identity'], msg['name'])
|
|
|
|
if __name__ == '__main__':
|
|
socketio.run(app, debug=True, host='0.0.0.0')
|