import json import sqlite3 from pathlib import Path from datetime import datetime, timezone from typing import NamedTuple, Iterable, Iterator, List from functools import wraps from flask import Flask, render_template, abort, g, \ redirect, request, url_for, make_response, send_file, \ Response from base64 import b64decode from spaceauth import SpaceAuth, login_required, current_user, cap_required # device infomation stored in database class DeviceInfo(NamedTuple): hwaddr: str name: str owner: str ignored: bool def v4addr(): if request.headers.getlist("X-Forwarded-For"): r_addr = request.headers.getlist("X-Forwarded-For")[-1] else: r_addr = request.remote_addr if r_addr.startswith('::ffff:'): r_addr = r_addr[7:] return r_addr def req_to_ctx(): return dict(iter(request.form.items())) def get_device_info(conn: sqlite3.Connection, hwaddr: str) -> DeviceInfo: return list(get_device_infos(conn, (hwaddr,)))[0] def get_device_infos(conn: sqlite3.Connection, hwaddrs: Iterable[str]) -> Iterator[DeviceInfo]: hwaddrs = list(hwaddrs) in_clause = '({})'.format(', '.join(['?'] * len(hwaddrs))) stmt = '''select hwaddr, name, ignored, owner from devices where devices.hwaddr in ''' + in_clause for row in conn.execute(stmt, hwaddrs): owner = row['owner'] or '' ignored = row['ignored'] yield DeviceInfo(row['hwaddr'], row['name'], owner, ignored) def app(instance_path, devices_api, config): app = Flask('at', instance_path=instance_path, instance_relative_config=True) app.config.update(config) app.jinja_env.add_extension('jinja2.ext.i18n') app.jinja_env.install_null_translations() app.updater = devices_api if app.config.get('PROXY_FIX'): from werkzeug.middleware.proxy_fix import ProxyFix app.wsgi_app = ProxyFix(app.wsgi_app) app.space_auth = SpaceAuth(app) def auth_get_user(): if config.get('DEBUG', False): if "User" in request.headers: return request.headers.get("User") if "Authorization" in request.headers: raw = b64decode(request.headers.get('Authorization').split(' ')[1]) app.logger.info(f'Raw authorization: {raw!s}') return raw.decode().split(':')[0] app.logger.info(request.headers) raise Exception('username not supplied') else: return current_user.id def auth_login_required(f): if config.get('DEBUG', False): @wraps(f) def wrapper(*args, **kwargs): try: auth_get_user() except Exception: app.logger.exception("auth get exception") response = make_response('', 401) response.headers['WWW-Authenticate'] = 'Basic realm="at.hackerspace.pl", charset="UTF-8"' return response return f(*args, **kwargs) return wrapper else: return login_required(f) def restrict_ip(prefixes : List[str] = [], exclude : List[str] = []): def decorator(f): @wraps(f) def func(*a, **kw): r_addr = v4addr() if r_addr in exclude: app.logger.info('got IP %s, rejecting', r_addr) return render_template('invalid_ip.html', ip_address=r_addr), 403 for prefix in prefixes: if r_addr.startswith(prefix): break else: app.logger.info('got IP %s, rejecting', r_addr) return render_template('invalid_ip.html', ip_address=r_addr), 403 return f(*a, **kw) return func return decorator @app.template_filter('strfts') def strfts(ts, format='%Y-%m-%d %H:%M'): return datetime.utcfromtimestamp(ts).strftime(format) @app.template_filter('utcisoformat') def utcisoformat(ts): return datetime.utcfromtimestamp(ts).replace( tzinfo=timezone.utc).isoformat() @app.template_filter('wikiurl') def wikiurl(user): return app.config['WIKI_URL'].replace("${login}", user) @app.before_request def make_connection(): conn = sqlite3.connect(app.config['DB']) conn.row_factory = sqlite3.Row conn.isolation_level = None # for autocommit mode g.db = conn @app.teardown_request def close_connection(exception): g.db.close() @app.route('/') def main_view(): return render_template('main.html', **now_at()) @app.route('/metrics') def metrics_view(): """Render count of different entities, per kind, in Prometheus format.""" now = now_at() lines = [ "# HELP entities present at the hackerspace according to checkinator / DHCP", "# TYPE people gauge", ] for kind, devices in now.items(): # The kind is being directly text-pasted into the metric below - # let's make sure a new kind with special characters doesn't mess # things up too much. if '"' in kind: continue # Not using formatting, as the Prometheus format contains '{' and # '}' characters which throw off Python's .format(). line = 'checkinator_now_present_entities{entity_kind="' + kind + '"} ' line += str(len(devices)) lines.append(line) return Response('\n'.join(lines), mimetype='text/plain') @app.route('/api') def list_all(): data = now_at() def prettify_user(xxx_todo_changeme): (user, atime) = xxx_todo_changeme if user == 'greenmaker': user = 'dreammaker' return { 'login': user, 'timestamp': atime, 'pretty_time': strfts(atime), } result = {} result['users'] = list(map(prettify_user, data.pop('users'))) result.update((k, len(v)) for k, v in list(data.items())) res = make_response(json.dumps(result), 200) res.headers['Access-Control-Allow-Origin'] = '*' return res def now_at(): result = dict() devices = app.updater.get_active_devices() macs = list(devices.keys()) identified_devices = list(get_device_infos(g.db, macs)) unknown = set(macs) - set(d.hwaddr for d in identified_devices) # das kektop sorting maschine # identify special devices for name, prefixes in app.config['SPECIAL_DEVICES'].items(): result[name] = set() prefixes = tuple(prefixes) # startswith accepts tuple as argument for hwaddr in list(unknown): if hwaddr.startswith(prefixes): result[name].add(hwaddr) unknown.discard(hwaddr) result['unknown'] = unknown users = {} for info in identified_devices: # append device to user last_seen = users.get(info.owner, 0) if not info.ignored: last_seen = max(last_seen, devices[info.hwaddr].atime) users[info.owner] = last_seen result['users'] = sorted(users.items(), key=lambda u_l: (u_l[1], u_l[0]), reverse=True) return result restrict_to_hs = restrict_ip(prefixes=app.config['CLAIMABLE_PREFIXES'], exclude=app.config['CLAIMABLE_EXCLUDE']) @app.route('/claim', methods=['GET']) @restrict_to_hs @auth_login_required def claim_form(): hwaddr, name = app.updater.get_device(v4addr()) return render_template('claim.html', hwaddr=hwaddr, name=name) @app.route('/my_ip', methods=['GET']) def get_my_ip(): ip = v4addr() hwaddr, name = app.updater.get_device(ip) return f'ip: {ip!r}\nhwaddr: {hwaddr!r}\nhostname: {name!r}\n' @app.route('/claim', methods=['POST']) @restrict_to_hs @auth_login_required def claim(): hwaddr, lease_name = app.updater.get_device(v4addr()) ctx = None if not hwaddr: ctx = dict(error='Invalid device.') else: login = auth_get_user() try: g.db.execute(''' insert into devices (hwaddr, name, owner, ignored) values (?, ?, ?, ?)''', [hwaddr, request.form['name'], login, False]) ctx = {} except sqlite3.Error: error = 'Could not add device! Perhaps someone claimed it?' ctx = dict(error=error) return render_template('post_claim.html', **ctx) def get_user_devices(conn, user): devs = conn.execute('select hwaddr, name, ignored from devices where\ owner = ?', [user]) device_info = [] for row in devs: di = DeviceInfo(row['hwaddr'], row['name'], user, row['ignored']) device_info.append(di) return device_info @app.route('/account', methods=['GET']) @auth_login_required def account(): devices = get_user_devices(g.db, auth_get_user()) return render_template('account.html', devices=devices) def set_ignored(conn, hwaddr, user, value): return conn.execute(''' update devices set ignored = ? where hwaddr = ? and owner = ?''', [value, hwaddr, user]) def delete_device(conn, hwaddr, user): return conn.execute(''' delete from devices where hwaddr = ? and owner = ?''', [hwaddr, user]) @app.route('/devices///') @auth_login_required def device(id, action): user = auth_get_user() if action == 'hide': set_ignored(g.db, id, user, True) if action == 'show': set_ignored(g.db, id, user, False) if action == 'delete': delete_device(g.db, id, user) return redirect(url_for('account')) @app.route('/admin') @cap_required('staff') def admin(): data = now_at() return render_template('admin.html', data=data) @app.route('/static/css/basic.css') def css(): return send_file(str(Path('./static/css/basic.css').absolute())) return app