Compare commits

...

3 commits

Author SHA1 Message Date
1f51fbea85 *: blacken 2024-07-07 21:46:29 +02:00
f4c6620007 ci.sh: create 2024-07-07 21:45:55 +02:00
c6688ec8cb *: type annotate 2024-07-07 21:45:54 +02:00
24 changed files with 1129 additions and 651 deletions

8
ci.sh Executable file
View file

@ -0,0 +1,8 @@
#!/usr/bin/env bash
set -e -x
# Use shell.nix if possible.
MYPYPATH=$(pwd)/stubs mypy --strict webapp/__init__.py
black webapp
black stubs

35
shell.nix Normal file
View file

@ -0,0 +1,35 @@
with import <nixpkgs> {};
let
types-wtforms = ps: ps.buildPythonPackage rec {
pname = "types-wtforms";
version = "3.1.0.20240425";
format = "setuptools";
src = fetchPypi {
inherit version;
pname = "types-WTForms";
hash = "sha256-Sf/B/lV26gc1t2P/935wYN057MZhJ2y9C0cJmSGzpvI=";
};
doCheck = false;
};
in pkgs.mkShell {
nativeBuildInputs = [
poetry
(python311.withPackages (ps: with ps; [
black
ldap
flask
flask-wtf
pillow
kerberos
(types-wtforms ps)
]))
cyrus_sasl
openldap
libkrb5
mypy
];
}

7
stubs/flask_wtf.pyi Normal file
View file

@ -0,0 +1,7 @@
import wtforms
from typing import Protocol
class FlaskForm(wtforms.Form):
def validate_on_submit(self) -> bool: ...
def is_submitted(self) -> bool: ...

1
stubs/kerberos.pyi Normal file
View file

@ -0,0 +1 @@
def changePassword(principal: str, current: str, new: str) -> bool: ...

40
stubs/ldap.pyi Normal file
View file

@ -0,0 +1,40 @@
from enum import Enum
from typing import Protocol, List, Tuple, Dict, Optional
class Scope(Enum):
BASE = "base"
ONELEVEL = "onelevel"
SUBTREE = "subtree"
SCOPE_BASE = Scope.BASE
SCOPE_ONELEVEL = Scope.ONELEVEL
SCOPE_SUBTREE = Scope.SUBTREE
class Mod(Enum):
ADD = "add"
DELETE = "delete"
REPLACE = "replace"
MOD_ADD = Mod.ADD
MOD_DELETE = Mod.DELETE
MOD_REPLACE = Mod.REPLACE
ModEntry = Tuple[Mod, str, bytes | List[bytes]]
class LDAPError(Exception): ...
class NO_SUCH_OBJECT(LDAPError): ...
class ldapobject(Protocol):
def start_tls_s(self) -> None: ...
def simple_bind_s(self, dn: str, password: str) -> None: ...
def search_s(
self,
base: str,
scope: Scope,
filterstr: str = "(objectClass=*)",
attrlist: Optional[List[str]] = None,
attrsonly: int = 0,
) -> List[Tuple[str, Dict[str, List[bytes]]]]: ...
def modify_s(self, dn: str, modlist: List[ModEntry]) -> None: ...
def initialize(url: str) -> ldapobject: ...

View file

@ -1,62 +1,103 @@
import ldap
import flask import flask
import flask_wtf import flask_wtf
import wtforms import wtforms
from functools import reduce from functools import reduce
app = flask.Flask(__name__) from webapp import validation, pools, config, context
from webapp import validation, pools, config from typing import Dict, Any, Protocol, Optional
if hasattr(config, "secret_key"):
app.secret_key = config.secret_key
if hasattr(config, "debug"): class App(flask.Flask):
app.debug = config.debug connections: pools.LDAPConnectionPool
profiles: Dict[str, context.Profile]
def __init__(self, name: str):
super().__init__(name)
if hasattr(config, "secret_key"):
self.secret_key = config.secret_key
if hasattr(config, "debug"):
self.debug = config.debug
def get_dn(self) -> Optional[str]:
return flask.session.get("dn")
def get_connection(self, dn: Optional[str] = None) -> Optional[ldap.ldapobject]:
dn = dn or self.get_dn()
if dn is None:
return None
return self.connections[dn]
def get_admin_connection(self) -> ldap.ldapobject:
conn = self.connections[config.ldap_admin_dn]
if not conn:
conn = self.connections.bind(
config.ldap_admin_dn, config.ldap_admin_password
)
return conn
def get_profile(self, dn: Optional[str] = None) -> Optional[context.Profile]:
dn = dn or self.get_dn()
if dn is None:
return None
return self.profiles.get(dn)
def refresh_profile(
self, conn: ldap.ldapobject, dn: Optional[str] = None
) -> Optional[context.Profile]:
dn = dn or self.get_dn()
if dn is None:
return None
profile = context.Profile(conn, dn)
self.profiles[dn] = profile
return profile
app = App(__name__)
@app.context_processor @app.context_processor
def inject_readable(): def inject_readable() -> Dict[str, Any]:
return dict(readable_names=config.readable_names) return dict(readable_names=config.readable_names)
@app.context_processor @app.context_processor
def inject_hackerspace_name(): def inject_hackerspace_name() -> Dict[str, Any]:
return dict(hackerspace_name=config.hackerspace_name) return dict(hackerspace_name=config.hackerspace_name)
@app.template_filter('first')
def ldap_first(v): @app.template_filter("first")
def ldap_first(v: str) -> str:
return v and v[0] return v and v[0]
@app.template_filter('readable')
def readable_tf(n): @app.template_filter("readable")
def readable_tf(n: str) -> str:
return config.readable_names.get(n, n) return config.readable_names.get(n, n)
def initialize_forms():
forms = {}
for f in reduce(lambda a,b: a | b, config.can.values()):
cls, attrs = config.fields.get(f, config.default_field)
class AddForm(flask_wtf.FlaskForm):
value = cls(label=config.readable_names.get(f), **attrs)
AddForm.__name__ == 'Add' + f
forms[f] = AddForm
return forms
def start(): def start() -> None:
validation.sanitize_perms() validation.sanitize_perms()
validation.sanitize_readable() validation.sanitize_readable()
from webapp import views from webapp import views
from webapp import auth, admin, avatar, vcard, passwd from webapp import auth, admin, avatar, vcard, passwd
for module in (auth, admin, avatar, vcard, passwd): for module in (auth, admin, avatar, vcard, passwd):
app.register_blueprint(module.bp) app.register_blueprint(module.bp)
app.connections = pools.LDAPConnectionPool(config.ldap_url, timeout=300.0) app.connections = pools.LDAPConnectionPool(config.ldap_url, timeout=300.0)
def drop_profile(dn):
def drop_profile(dn: str) -> None:
if dn != config.ldap_admin_dn: if dn != config.ldap_admin_dn:
del app.profiles[dn] del app.profiles[dn]
app.connections.register_callback('drop', drop_profile)
app.connections.start()
app.connections.register_callback("drop", drop_profile)
app.connections.start()
app.profiles = {} app.profiles = {}
app.forms = initialize_forms()
start() start()

View file

@ -4,203 +4,186 @@ import re
import flask import flask
import flask_wtf import flask_wtf
import wtforms import wtforms
import werkzeug
import webapp import webapp
from webapp import app, context, config, ldaputils, email from webapp import app, context, config, ldaputils, email, vcard
bp = flask.Blueprint('admin', __name__) from typing import Callable, ParamSpec, List, Tuple, Optional, Dict, Protocol
def admin_required_impl(f): bp = flask.Blueprint("admin", __name__)
Entry = Tuple[str, List[Tuple[str, str]]]
P = ParamSpec("P")
def admin_required_impl(
f: Callable[P, werkzeug.Response]
) -> Callable[P, werkzeug.Response]:
@functools.wraps(f) @functools.wraps(f)
def func(*a, **kw): def func(*a: P.args, **kw: P.kwargs) -> werkzeug.Response:
# TODO: Actually check for admin perms # TODO: Actually check for admin perms
if not flask.session['is_admin']: if not flask.session["is_admin"]:
flask.abort(403) flask.abort(403)
return f(*a, **kw) return f(*a, **kw)
return func return func
def admin_required(f):
def admin_required(f: Callable[P, werkzeug.Response]) -> Callable[P, werkzeug.Response]:
return webapp.auth.login_required(admin_required_impl(f)) return webapp.auth.login_required(admin_required_impl(f))
def _get_user_list(conn, query='&'):
"""Returns List[Tuple[username, full name]] for query""" def _get_user_list(conn: ldap.ldapobject, query: str = "&") -> List[Tuple[str, str]]:
"""
Returns List[Tuple[username, full name]] for query.
"""
all_users = [] all_users = []
results = conn.search_s(config.ldap_people, ldap.SCOPE_SUBTREE, f'(&(uid=*)(cn=*){ldaputils.wrap(query)})', attrlist=['uid', 'cn']) results = conn.search_s(
config.ldap_people,
ldap.SCOPE_SUBTREE,
f"(&(uid=*)(cn=*){ldaputils.wrap(query)})",
attrlist=["uid", "cn"],
)
for user, attrs in results: for user, attrs in results:
user_uid = attrs['uid'][0].decode() user_uid = attrs["uid"][0].decode()
user_cn = attrs['cn'][0].decode() user_cn = attrs["cn"][0].decode()
all_users.append((user_uid, user_cn)) all_users.append((user_uid, user_cn))
all_users.sort(key=lambda user: user[0].lower()) all_users.sort(key=lambda user: user[0].lower())
return all_users return all_users
def _get_groupped_user_list(conn):
"""Returns all users (uid, full name), groupped by active groups""" def _get_groupped_user_list(
conn: ldap.ldapobject,
) -> List[Tuple[str, List[Tuple[str, str]]]]:
"""
Returns all users (uid, full name), groupped by active groups.
"""
groupped_users = [ groupped_users = [
(group.capitalize(), _get_user_list(conn, f'memberOf={ldaputils.group_dn(group)}')) (
group.capitalize(),
_get_user_list(conn, f"memberOf={ldaputils.group_dn(group)}"),
)
for group in config.ldap_active_groups for group in config.ldap_active_groups
] ]
inactive_filter = ldaputils._not( inactive_filter = ldaputils._not(ldaputils.member_of_any(config.ldap_active_groups))
ldaputils.member_of_any(config.ldap_active_groups)
)
groupped_users.append( groupped_users.append(("Inactive users", _get_user_list(conn, inactive_filter)))
('Inactive users', _get_user_list(conn, inactive_filter))
)
return groupped_users return groupped_users
@bp.route('/admin/')
@admin_required
def admin_view():
return flask.render_template('admin/index.html')
@bp.route('/admin/users/') def _get_groups_of(conn: ldap.ldapobject, dn: str) -> List[str]:
@admin_required filter = f"(&(objectClass=groupOfUniqueNames)(uniqueMember={dn}))"
def admin_users_view():
conn = context.get_connection()
groups = _get_groupped_user_list(conn)
return flask.render_template('admin/users.html', groups=groups)
def _get_profile(conn, uid):
results = conn.search_s(ldaputils.user_dn(uid), ldap.SCOPE_SUBTREE)
return ldaputils.normalized_entries(results)[0]
def _format_profile(profile):
rendered = []
dn, attrs = profile
for attr, value in attrs:
attr_sanitized = attr.lower()
attr_full_name = config.full_name.get(attr_sanitized, attr_sanitized)
attr_readable_name = config.readable_names.get(attr_full_name)
rendered.append((attr, attr_readable_name, value))
# Attributes with readable names first, then by name
rendered.sort(key=lambda profile: profile[0])
rendered.sort(key=lambda profile: profile[1] is None)
return rendered
def _get_groups_of(conn, uid):
filter = ldaputils.groups_of_user(uid)
groups = [ groups = [
attrs['cn'][0].decode() attrs["cn"][0].decode()
for group_dn, attrs in for group_dn, attrs in conn.search_s(
conn.search_s(config.ldap_base, ldap.SCOPE_SUBTREE, filter) config.ldap_base, ldap.SCOPE_SUBTREE, filter
)
] ]
return groups return groups
def _is_user_protected(conn, uid, groups):
def _is_user_protected(conn: ldap.ldapobject, groups: List[str]) -> bool:
return any(group in config.ldap_protected_groups for group in groups) return any(group in config.ldap_protected_groups for group in groups)
@bp.route('/admin/users/<uid>')
@bp.route("/admin/")
@admin_required @admin_required
def admin_user_view(uid): def admin_view() -> werkzeug.Response:
ldaputils.validate_name(uid) return flask.Response(flask.render_template("admin/index.html"))
conn = context.get_connection()
profile = _get_profile(conn, uid)
groups = _get_groups_of(conn, uid)
is_protected = _is_user_protected(conn, uid, groups)
return flask.render_template('admin/user.html', uid=uid, profile=_format_profile(profile), groups=groups, is_protected=is_protected) @bp.route("/admin/users/")
@admin_required
def admin_users_view() -> werkzeug.Response:
conn = app.get_connection()
assert conn is not None
groups = _get_groupped_user_list(conn)
# TODO: Deduplicate this modification logic with webapp/vcard.py return flask.Response(flask.render_template("admin/users.html", groups=groups))
class AddMifareIDHash(flask_wtf.FlaskForm):
value = wtforms.fields.StringField(label=config.readable_names.get('mifareidhash'))
class DelForm(flask_wtf.FlaskForm): @bp.route("/admin/users/<username>")
@admin_required
def admin_user_view(username: str) -> werkzeug.Response:
ldaputils.validate_name(username)
dn = ldaputils.user_dn(username)
conn = app.get_connection()
assert conn is not None
profile = context.Profile(conn, dn)
groups = _get_groups_of(conn, dn)
is_protected = _is_user_protected(conn, groups)
return flask.Response(
flask.render_template(
"admin/user.html", profile=profile, groups=groups, is_protected=is_protected
)
)
class AdminMixin:
def _allowed(self, subject_dn: str) -> Optional[str]:
conn = app.get_connection()
assert conn is not None
groups = _get_groups_of(conn, subject_dn)
if _is_user_protected(conn, groups):
return "User is protected"
return None
class AdminOperationAdd(AdminMixin, vcard.OperationAdd):
pass pass
@bp.route('/admin/users/<uid>/add_mifareidhash')
class AdminOperationModify(AdminMixin, vcard.OperationModify):
pass
class AdminOperationDelete(AdminMixin, vcard.OperationDelete):
pass
@bp.route("/admin/users/<username>/add_mifareidhash", methods=["GET", "POST"])
@admin_required @admin_required
def admin_user_view_add_mifareidhash(uid): def admin_user_view_add_mifareidhash(username: str) -> werkzeug.Response:
form = AddMifareIDHash() dn = ldaputils.user_dn(username)
return flask.render_template('admin/ops/add_mifareidhash.html', uid=uid, form=form) op = AdminOperationAdd(dn, "mifareidhash")
redirect = f"/admin/users/{username}"
return op.perform(
success_redirect=redirect,
fatal_redirect=redirect,
action=f"/admin/users/{username}/add_mifareidhash",
)
@bp.route('/admin/users/<uid>/del_mifareidhash')
@bp.route("/admin/users/<username>/del_mifareidhash/<uid>", methods=["GET", "POST"])
@admin_required @admin_required
def admin_user_view_del_mifareidhash(uid): def admin_user_view_del_mifareidhash(username: str, uid: str) -> werkzeug.Response:
form = DelForm() dn = ldaputils.user_dn(username)
value = flask.request.args.get('value') op = AdminOperationDelete(dn, uid)
return flask.render_template('admin/ops/del_mifareidhash.html', uid=uid, form=form, value=value) redirect = f"/admin/users/{username}"
return op.perform(
def _modify_mifareidhash(uid, form, modify_func): success_redirect=redirect,
ldaputils.validate_name(uid) fatal_redirect=redirect,
conn = context.get_connection() action=f"/admin/users/{username}/del_mifareidhash/{uid}",
)
groups = _get_groups_of(conn, uid)
is_protected = _is_user_protected(conn, uid, groups)
redirect_url = flask.url_for('admin.admin_user_view', uid=uid)
if is_protected:
flask.flash('Cannot modify protected user', 'danger')
return flask.redirect(redirect_url)
try:
if form.validate_on_submit():
dn = ldaputils.user_dn(uid)
modify_func(conn, dn)
context.refresh_profile(dn)
flask.flash('Added mifareidhash', category='info')
return flask.redirect(redirect_url)
for field, errors in form.errors.items():
for error in errors:
flask.flash("Error in the {} field - {}".format(
getattr(form, field).label.text,
error
), 'danger')
return flask.redirect(redirect_url)
except ldap.LDAPError as e:
print('LDAP error:', e)
flask.flash(f'Could not modify profile due to LDAP error: {e}', 'danger')
return flask.redirect(redirect_url)
@bp.route('/admin/users/<uid>/add_mifareidhash', methods=['POST']) @bp.route("/admin/groups/")
@admin_required @admin_required
def admin_user_add_mifareidhash(uid): def admin_groups_view() -> werkzeug.Response:
form = AddMifareIDHash() conn = app.get_connection()
def modify_func(conn, dn): assert conn is not None
new_value = form.value.data
email.send_papertrail(
f'Adding mifareIDHash for user {uid}',
f'New mifareIDHash: {new_value}'
)
conn.modify_s(dn, [(ldap.MOD_ADD, 'mifareidhash', new_value.encode('utf-8'))])
return _modify_mifareidhash(uid, form, modify_func)
@bp.route('/admin/users/<uid>/del_mifareidhash', methods=['POST'])
@admin_required
def admin_user_del_mifareidhash(uid):
form = DelForm()
def modify_func(conn, dn):
old_value = flask.request.args.get('value')
email.send_papertrail(
f'Deleting mifareIDHash for user {uid}',
f'Deleted mifareIDHash: {old_value}'
)
conn.modify_s(dn, [(ldap.MOD_DELETE, 'mifareidhash', old_value.encode('utf-8'))])
return _modify_mifareidhash(uid, form, modify_func)
@bp.route('/admin/groups/')
@admin_required
def admin_groups_view():
conn = context.get_connection()
# no obvious way to filter out groups that are just a per-user-group # no obvious way to filter out groups that are just a per-user-group
# (not super useful to look at them) # (not super useful to look at them)
@ -209,28 +192,34 @@ def admin_groups_view():
all_uids = set([uid for uid, cn in all_users]) all_uids = set([uid for uid, cn in all_users])
groups = [ groups = [
attrs['cn'][0].decode() attrs["cn"][0].decode()
for group_dn, attrs in for group_dn, attrs in conn.search_s(
conn.search_s(config.ldap_base, ldap.SCOPE_SUBTREE, 'objectClass=groupOfUniqueNames') config.ldap_base, ldap.SCOPE_SUBTREE, "objectClass=groupOfUniqueNames"
)
] ]
filter_groups = filter((lambda cn: cn not in all_uids), groups) filter_groups = filter((lambda cn: cn not in all_uids), groups)
return flask.render_template('admin/groups.html', groups=filter_groups) return flask.Response(
flask.render_template("admin/groups.html", groups=filter_groups)
)
def _get_group(conn, name):
results = conn.search_s(ldaputils.group_dn(name), ldap.SCOPE_SUBTREE)
return ldaputils.normalized_entries(results)[0]
@bp.route('/admin/groups/<name>') @bp.route("/admin/groups/<name>")
@admin_required @admin_required
def admin_group_view(name): def admin_group_view(name: str) -> werkzeug.Response:
ldaputils.validate_name(name) ldaputils.validate_name(name)
conn = context.get_connection() dn = ldaputils.group_dn(name)
conn = app.get_connection()
assert conn is not None
group_attrs = _get_group(conn, name) group = context.LDAPEntry(conn, dn)
members = _get_user_list(conn, f'memberOf={ldaputils.group_dn(name)}') members = _get_user_list(conn, f"memberOf={ldaputils.group_dn(name)}")
is_protected = name in config.ldap_protected_groups is_protected = name in config.ldap_protected_groups
return flask.render_template('admin/group.html', name=name, attributes=_format_profile(group_attrs), members=members, is_protected=is_protected) return flask.Response(
flask.render_template(
"admin/group.html", group=group, members=members, is_protected=is_protected
)
)

View file

@ -2,38 +2,51 @@ import functools
import ldap import ldap
import flask import flask
import urllib import urllib
import werkzeug
from webapp import app, context, config, ldaputils from webapp import app, avatar, config, ldaputils
bp = flask.Blueprint('auth', __name__) from typing import TypeVar, Callable, ParamSpec, Dict, Any, Optional
def login_required(f): bp = flask.Blueprint("auth", __name__)
P = ParamSpec("P")
def login_required(f: Callable[P, werkzeug.Response]) -> Callable[P, werkzeug.Response]:
@functools.wraps(f) @functools.wraps(f)
def func(*a, **kw): def func(*a: P.args, **kw: P.kwargs) -> werkzeug.Response:
conn = context.get_connection() conn = app.get_connection()
if not conn: if not conn:
flask.session.clear() flask.session.clear()
flask.flash('You must log in to continue', category='warning') flask.flash("You must log in to continue", category="warning")
return flask.redirect('/login?' + urllib.parse.urlencode({'goto': flask.request.path})) return flask.redirect(
"/login?" + urllib.parse.urlencode({"goto": flask.request.path})
)
return f(*a, **kw) return f(*a, **kw)
return func return func
def req_to_ctx():
def req_to_ctx() -> Dict[str, Any]:
return dict(flask.request.form.items()) return dict(flask.request.form.items())
@bp.route('/login', methods=["GET"])
def login_form():
return flask.render_template('login.html', **req_to_ctx())
def _connect_to_ldap(dn, password): @bp.route("/login", methods=["GET"])
def login_form() -> werkzeug.Response:
return flask.Response(flask.render_template("login.html", **req_to_ctx()))
def _connect_to_ldap(dn: str, password: str) -> Optional[ldap.ldapobject]:
try: try:
return app.connections.bind(dn, password) return app.connections.bind(dn, password)
except ldap.LDAPError as error_message: except ldap.LDAPError as error_message:
print("Could not connect to server:", error_message) print("Could not connect to server:", error_message)
return None return None
@bp.route('/login', methods=["POST"])
def login_action(): @bp.route("/login", methods=["POST"])
def login_action() -> werkzeug.Response:
# LDAP usernames/DNs are case-insensitive, so we normalize them just in # LDAP usernames/DNs are case-insensitive, so we normalize them just in
# case, # case,
username = flask.request.form.get("username", "").lower() username = flask.request.form.get("username", "").lower()
@ -46,25 +59,34 @@ def login_action():
# Now that we have logged in, we can retrieve the 'real' username (which # Now that we have logged in, we can retrieve the 'real' username (which
# might be cased differently from the login name). # might be cased differently from the login name).
res = conn.search_s(dn, ldap.SCOPE_SUBTREE) res = conn.search_s(dn, ldap.SCOPE_SUBTREE)
for (k, vs) in res[0][1].items(): for k, vs in res[0][1].items():
if k == 'uid': if k == "uid":
username = vs[0].decode() username = vs[0].decode()
# Check if user belongs to admin group # Check if user belongs to admin group
is_admin = bool(conn.search_s(dn, ldap.SCOPE_SUBTREE, ldaputils.member_of_any(config.ldap_admin_groups))) is_admin = bool(
conn.search_s(
dn,
ldap.SCOPE_SUBTREE,
ldaputils.member_of_any(config.ldap_admin_groups),
)
)
flask.session["username"] = username flask.session["username"] = username
flask.session['dn'] = dn flask.session["dn"] = dn
flask.session['is_admin'] = is_admin flask.session["is_admin"] = is_admin
context.refresh_profile() app.refresh_profile(conn)
avatar.cache.reset_user(username)
avatar.hash_cache.reset()
return flask.redirect(goto) return flask.redirect(goto)
else: else:
flask.flash("Invalid credentials.", category='danger') flask.flash("Invalid credentials.", category="danger")
return login_form() return login_form()
@bp.route('/logout')
@bp.route("/logout")
@login_required @login_required
def logout_action(): def logout_action() -> werkzeug.Response:
app.connections.unbind(flask.session['dn']) app.connections.unbind(flask.session["dn"])
flask.session.clear() flask.session.clear()
return flask.redirect('/') return flask.redirect("/")

View file

@ -17,14 +17,16 @@ from PIL import Image, ImageDraw
import flask import flask
import ldap import ldap
from webapp import context, ldaputils, config from webapp import app, ldaputils, config
bp = flask.Blueprint('avatar', __name__) from typing import List
log = logging.getLogger('ldap-web.avatar')
bp = flask.Blueprint("avatar", __name__)
log = logging.getLogger("ldap-web.avatar")
# Stolen from https://stackoverflow.com/questions/43512615/reshaping-rectangular-image-to-square # Stolen from https://stackoverflow.com/questions/43512615/reshaping-rectangular-image-to-square
def resize_image(image: Image, length: int) -> Image: def resize_image(image: Image.Image, length: int) -> Image.Image:
""" """
Resize an image to a square. Can make an image bigger to make it fit or smaller if it doesn't fit. It also crops Resize an image to a square. Can make an image bigger to make it fit or smaller if it doesn't fit. It also crops
part of the image. part of the image.
@ -44,14 +46,22 @@ def resize_image(image: Image, length: int) -> Image:
# The image is in portrait mode. Height is bigger than width. # The image is in portrait mode. Height is bigger than width.
# This makes the width fit the LENGTH in pixels while conserving the ration. # This makes the width fit the LENGTH in pixels while conserving the ration.
resized_image = image.resize((length, int(image.size[1] * (length / image.size[0])))) resized_image = image.resize(
(length, int(image.size[1] * (length / image.size[0])))
)
# Amount of pixel to lose in total on the height of the image. # Amount of pixel to lose in total on the height of the image.
required_loss = (resized_image.size[1] - length) required_loss = resized_image.size[1] - length
# Crop the height of the image so as to keep the center part. # Crop the height of the image so as to keep the center part.
resized_image = resized_image.crop( resized_image = resized_image.crop(
box=(0, required_loss / 2, length, resized_image.size[1] - required_loss / 2)) box=(
0,
required_loss // 2,
length,
resized_image.size[1] - required_loss // 2,
)
)
# We now have a length*length pixels image. # We now have a length*length pixels image.
return resized_image return resized_image
@ -59,45 +69,55 @@ def resize_image(image: Image, length: int) -> Image:
# This image is in landscape mode or already squared. The width is bigger than the heihgt. # This image is in landscape mode or already squared. The width is bigger than the heihgt.
# This makes the height fit the LENGTH in pixels while conserving the ration. # This makes the height fit the LENGTH in pixels while conserving the ration.
resized_image = image.resize((int(image.size[0] * (length / image.size[1])), length)) resized_image = image.resize(
(int(image.size[0] * (length / image.size[1])), length)
)
# Amount of pixel to lose in total on the width of the image. # Amount of pixel to lose in total on the width of the image.
required_loss = resized_image.size[0] - length required_loss = resized_image.size[0] - length
# Crop the width of the image so as to keep 1080 pixels of the center part. # Crop the width of the image so as to keep 1080 pixels of the center part.
resized_image = resized_image.crop( resized_image = resized_image.crop(
box=(required_loss / 2, 0, resized_image.size[0] - required_loss / 2, length)) box=(
required_loss // 2,
0,
resized_image.size[0] - required_loss // 2,
length,
)
)
# We now have a length*length pixels image. # We now have a length*length pixels image.
return resized_image return resized_image
def process_upload(data: bytes) -> bytes: def process_upload(data: bytes) -> bytes:
img = Image.open(io.BytesIO(data)) img = Image.open(io.BytesIO(data))
img = resize_image(img, 256) img = resize_image(img, 256)
res = io.BytesIO() res = io.BytesIO()
img.save(res, 'PNG') img.save(res, "PNG")
return base64.b64encode(res.getvalue()) return base64.b64encode(res.getvalue())
syrenka = Image.open("syrenka.png") syrenka = Image.open("syrenka.png")
def default_avatar(uid: str) -> Image: def default_avatar(uid: str) -> bytes:
""" """
Create little generative avatar for people who don't have a custom one Create little generative avatar for people who don't have a custom one
configured. configured.
""" """
img = Image.new('RGBA', (256, 256), (255, 255, 255, 0)) img = Image.new("RGBA", (256, 256), (255, 255, 255, 0))
draw = ImageDraw.Draw(img) draw = ImageDraw.Draw(img)
# Deterministic rng for stable output. # Deterministic rng for stable output.
rng = random.Random(uid) rng = random.Random(uid)
# Pick a nice random neon color. # Pick a nice random neon color.
n_h, n_s, n_l = rng.random(), 0.5 + rng.random()/2.0, 0.4 + rng.random()/5.0 n_h, n_s, n_l = rng.random(), 0.5 + rng.random() / 2.0, 0.4 + rng.random() / 5.0
# Use muted version for background. # Use muted version for background.
r, g, b = [int(256*i) for i in colorsys.hls_to_rgb(n_h, n_l+0.3, n_s-0.1)] r, g, b = [int(256 * i) for i in colorsys.hls_to_rgb(n_h, n_l + 0.3, n_s - 0.1)]
draw.rectangle([(0, 0), (256, 256)], fill=(r,g,b,255)) draw.rectangle([(0, 0), (256, 256)], fill=(r, g, b, 255))
# Scale logo by randomized factor. # Scale logo by randomized factor.
factor = 0.7 + 0.1 * rng.random() factor = 0.7 + 0.1 * rng.random()
@ -108,20 +128,20 @@ def default_avatar(uid: str) -> Image:
overlay = overlay.crop(box=(0, 0, w, w)) overlay = overlay.crop(box=(0, 0, w, w))
# Give it a little nudge. # Give it a little nudge.
overlay = overlay.rotate((rng.random() - 0.5) * 100) overlay = overlay.rotate((rng.random() - 0.5) * 100) # type: ignore
# Colorize with full neon color. # Colorize with full neon color.
r, g, b = [int(256*i) for i in colorsys.hls_to_rgb(n_h,n_l,n_s)] r, g, b = [int(256 * i) for i in colorsys.hls_to_rgb(n_h, n_l, n_s)]
pixels = overlay.load() pixels = overlay.load() # type: ignore
for x in range(img.size[0]): for x in range(img.size[0]):
for y in range(img.size[1]): for y in range(img.size[1]):
alpha = pixels[x, y][3] alpha = pixels[x, y][3]
pixels[x, y] = (r, g, b, alpha) pixels[x, y] = (r, g, b, alpha)
img.alpha_composite(overlay) img.alpha_composite(overlay) # type: ignore
res = io.BytesIO() res = io.BytesIO()
img.save(res, 'PNG') img.save(res, "PNG")
return res.getvalue() return res.getvalue()
@ -135,13 +155,13 @@ class AvatarCacheEntry:
# Cached converted bytes # Cached converted bytes
_converted: bytes _converted: bytes
def __init__(self, uid: str, data: bytes): def __init__(self, uid: str, data: bytes) -> None:
self.uid = uid self.uid = uid
self.deadline = time.time() + config.avatar_cache_timeout self.deadline = time.time() + config.avatar_cache_timeout
self.data = data self.data = data
self._converted = b"" self._converted = b""
def serve(self): def serve(self) -> flask.Response:
""" """
Serve sanitized image. Always re-encode to PNG 256x256. Serve sanitized image. Always re-encode to PNG 256x256.
""" """
@ -156,27 +176,27 @@ class AvatarCacheEntry:
res = io.BytesIO() res = io.BytesIO()
img = resize_image(img, 256) img = resize_image(img, 256)
img.save(res, 'PNG') img.save(res, "PNG")
self._converted = res.getvalue() self._converted = res.getvalue()
return flask.Response(self._converted, mimetype='image/png') return flask.Response(self._converted, mimetype="image/png")
class AvatarCache: class AvatarCache:
# keyed by uid # keyed by uid
entries: dict[str, AvatarCacheEntry] entries: dict[str, AvatarCacheEntry]
def __init__(self): def __init__(self) -> None:
self.entries = {} self.entries = {}
def reset(self): def reset(self) -> None:
self.entries = {} self.entries = {}
def reset_user(self, uid: str): def reset_user(self, uid: str) -> None:
if uid in self.entries: if uid in self.entries:
del self.entries[uid] del self.entries[uid]
def get(self, uid: str, bust: bool = False) -> AvatarCacheEntry: def get(self, uid: str, bust: bool = False) -> flask.Response:
""" """
Get avatar, either from cache or from LDAP on cache miss. If 'bust' is Get avatar, either from cache or from LDAP on cache miss. If 'bust' is
set to True, the cache will not be consulted and the newest result from set to True, the cache will not be consulted and the newest result from
@ -193,7 +213,9 @@ class AvatarCache:
del self.entries[uid] del self.entries[uid]
# Otherwise, retrieve from LDAP. # Otherwise, retrieve from LDAP.
conn = context.get_admin_connection() conn = app.get_connection()
if conn is None:
conn = app.get_admin_connection()
try: try:
dn = ldaputils.user_dn(uid) dn = ldaputils.user_dn(uid)
res = conn.search_s(dn, ldap.SCOPE_SUBTREE) res = conn.search_s(dn, ldap.SCOPE_SUBTREE)
@ -204,10 +226,10 @@ class AvatarCache:
is_user_found = len(res) == 1 is_user_found = len(res) == 1
if is_user_found: if is_user_found:
for attr, vs in res[0][1].items(): for attr, vs in res[0][1].items():
if attr == 'jpegPhoto': if attr == "jpegPhoto":
for v in vs: for v in vs:
# Temporary workaround: treat empty jpegPhoto as no avatar. # Temporary workaround: treat empty jpegPhoto as no avatar.
if v == b'': if v == b"":
avatar = None avatar = None
break break
@ -224,7 +246,7 @@ class AvatarCache:
if avatar is None: if avatar is None:
# don't generate avatars for non-users to reduce DoS potential # don't generate avatars for non-users to reduce DoS potential
# (note: capacifier already leaks existence of users, so whatever) # (note: capacifier already leaks existence of users, so whatever)
avatar = default_avatar(uid if is_user_found else 'default') avatar = default_avatar(uid if is_user_found else "default")
# Save avatar in cache. # Save avatar in cache.
entry = AvatarCacheEntry(uid, avatar) entry = AvatarCacheEntry(uid, avatar)
@ -233,25 +255,31 @@ class AvatarCache:
# And serve the entry. # And serve the entry.
return entry.serve() return entry.serve()
cache = AvatarCache() cache = AvatarCache()
def hash_for_uid(uid):
def hash_for_uid(uid: str) -> str:
# NOTE: Gravatar documentation says to use SHA256, but everyone passes MD5 instead # NOTE: Gravatar documentation says to use SHA256, but everyone passes MD5 instead
email = f'{uid}@hackerspace.pl'.strip().lower() email = f"{uid}@hackerspace.pl".strip().lower()
hasher = hashlib.md5() hasher = hashlib.md5()
hasher.update(email.encode()) hasher.update(email.encode())
return hasher.hexdigest() return hasher.hexdigest()
def get_all_user_uids(conn):
def get_all_user_uids(conn: ldap.ldapobject) -> List[str]:
all_uids = [] all_uids = []
results = conn.search_s(config.ldap_people, ldap.SCOPE_SUBTREE, 'uid=*', attrlist=['uid']) results = conn.search_s(
config.ldap_people, ldap.SCOPE_SUBTREE, "uid=*", attrlist=["uid"]
)
for user, attrs in results: for user, attrs in results:
uid = attrs['uid'][0].decode() uid = attrs["uid"][0].decode()
all_uids.append(uid) all_uids.append(uid)
return all_uids return all_uids
class HashCache: class HashCache:
# email hash -> uid mapping # email hash -> uid mapping
entries: dict[str, str] = {} entries: dict[str, str] = {}
@ -260,43 +288,47 @@ class HashCache:
def get(self, email_hash: str) -> str: def get(self, email_hash: str) -> str:
self.rebuild_if_needed() self.rebuild_if_needed()
return self.entries.get(email_hash, 'default') return self.entries.get(email_hash, "default")
def reset(self): def reset(self) -> None:
self.entries = {} self.entries = {}
self.deadline = 0 self.deadline = 0
def rebuild_if_needed(self): def rebuild_if_needed(self) -> None:
now = time.time() now = time.time()
if now > self.deadline: if now > self.deadline:
self.rebuild() self.rebuild()
def rebuild(self): def rebuild(self) -> None:
log.info("Rebuilding email hash cache") log.info("Rebuilding email hash cache")
conn = context.get_admin_connection() conn = app.get_admin_connection()
users = get_all_user_uids(conn) users = get_all_user_uids(conn)
self.deadline = time.time() + config.avatar_cache_timeout self.deadline = time.time() + config.avatar_cache_timeout
self.entries = { hash_for_uid(uid): uid for uid in users } self.entries = {hash_for_uid(uid): uid for uid in users}
hash_cache = HashCache() hash_cache = HashCache()
def sanitize_email_hash(hash: str):
def sanitize_email_hash(hash: str) -> str:
""" """
lowercases, removes file extension (probably) lowercases, removes file extension (probably)
""" """
hash = hash.lower() hash = hash.lower()
if hash.endswith('.png') or hash.endswith('.jpg'): if hash.endswith(".png") or hash.endswith(".jpg"):
hash = hash[:-4] hash = hash[:-4]
return hash return hash
@bp.route('/avatar/<email_hash>', methods=['GET'])
def gravatar_serve(email_hash): @bp.route("/avatar/<email_hash>", methods=["GET"])
def gravatar_serve(email_hash: str) -> flask.Response:
""" """
Serves avatar in a Gravatar-compatible(ish) way, i.e. by email hash, not user name. Serves avatar in a Gravatar-compatible(ish) way, i.e. by email hash, not user name.
""" """
uid = hash_cache.get(sanitize_email_hash(email_hash)) uid = hash_cache.get(sanitize_email_hash(email_hash))
return cache.get(uid) return cache.get(uid)
@bp.route('/avatar/user/<uid>', methods=['GET'])
def avatar_serve(uid): @bp.route("/avatar/user/<uid>", methods=["GET"])
def avatar_serve(uid: str) -> flask.Response:
return cache.get(uid) return cache.get(uid)

View file

@ -3,92 +3,114 @@ import wtforms
import secrets import secrets
import os import os
hackerspace_name = 'Warsaw Hackerspace' from typing import Dict, Set, List, Tuple, Any, TypeVar
secret_key = secrets.token_hex(32)
hackerspace_name: str = "Warsaw Hackerspace"
secret_key: str = secrets.token_hex(32)
# Kerberos configuration # Kerberos configuration
kadmin_principal_map = "{}@HACKERSPACE.PL" kadmin_principal_map: str = "{}@HACKERSPACE.PL"
# LDAP configuration # LDAP configuration
ldap_url = 'ldap://ldap.hackerspace.pl'
ldap_base = 'dc=hackerspace,dc=pl' ldap_url: str = "ldap://ldap.hackerspace.pl"
ldap_people = 'ou=people,dc=hackerspace,dc=pl' ldap_base: str = "dc=hackerspace,dc=pl"
ldap_user_dn_format = 'uid={},ou=people,dc=hackerspace,dc=pl' ldap_people: str = "ou=people,dc=hackerspace,dc=pl"
ldap_group_dn_format = 'cn={},ou=group,dc=hackerspace,dc=pl' ldap_user_dn_format: str = "uid={},ou=people,dc=hackerspace,dc=pl"
ldap_group_dn_format: str = "cn={},ou=group,dc=hackerspace,dc=pl"
# LDAP user groups allowed to see /admin # LDAP user groups allowed to see /admin
ldap_admin_groups = os.getenv('LDAPWEB_ADMIN_GROUPS', 'ldap-admin,staff,zarzad').split(',') ldap_admin_groups: List[str] = os.getenv(
"LDAPWEB_ADMIN_GROUPS", "ldap-admin,staff,zarzad"
).split(",")
# LDAP user groups indicating that a user is active # LDAP user groups indicating that a user is active
ldap_active_groups = os.getenv('LDAPWEB_ACTIVE_GROUPS', 'fatty,starving,potato').split(',') ldap_active_groups: List[str] = os.getenv(
"LDAPWEB_ACTIVE_GROUPS", "fatty,starving,potato"
).split(",")
# LDAP service user with admin privileges (for admin listings, creating new users) # LDAP service user with admin privileges (for admin listings, creating new users)
ldap_admin_dn = os.getenv('LDAPWEB_ADMIN_DN', 'cn=ldapweb,ou=services,dc=hackerspace,dc=pl') ldap_admin_dn: str = os.getenv(
ldap_admin_password = os.getenv('LDAPWEB_ADMIN_PASSWORD', 'unused') "LDAPWEB_ADMIN_DN", "cn=ldapweb,ou=services,dc=hackerspace,dc=pl"
)
ldap_admin_password: str = os.getenv("LDAPWEB_ADMIN_PASSWORD", "unused")
# Protected LDAP user groups # Protected LDAP user groups
# These groups (and their members) cannot be modified by admin UI # These groups (and their members) cannot be modified by admin UI
ldap_protected_groups = ( ldap_protected_groups: List[str] = "staff,zarzad,ldap-admin".split(",") + os.getenv(
'staff,zarzad,ldap-admin'.split(',') + "LDAPWEB_PROTECTED_GROUPS", ""
os.getenv('LDAPWEB_PROTECTED_GROUPS', '').split(',') ).split(",")
)
# Email notification (paper trail) configuration # Email notification (paper trail) configuration
smtp_server = 'mail.hackerspace.pl' smtp_server: str = "mail.hackerspace.pl"
smtp_format = '{}@hackerspace.pl' smtp_format: str = "{}@hackerspace.pl"
smtp_user = os.getenv('LDAPWEB_SMTP_USER', 'ldapweb') smtp_user: str = os.getenv("LDAPWEB_SMTP_USER", "ldapweb")
smtp_password = os.getenv('LDAPWEB_SMTP_PASSWORD', 'unused') smtp_password: str = os.getenv("LDAPWEB_SMTP_PASSWORD", "unused")
papertrail_recipients = os.getenv('LDAPWEB_PAPERTRAIL_RECIPIENTS', 'zarzad@hackerspace.pl') papertrail_recipients: str = os.getenv(
"LDAPWEB_PAPERTRAIL_RECIPIENTS", "zarzad@hackerspace.pl"
)
# Avatar server # Avatar server
avatar_cache_timeout = int(os.getenv('LDAPWEB_AVATAR_CACHE_TIMEOUT', '1800')) avatar_cache_timeout: int = int(os.getenv("LDAPWEB_AVATAR_CACHE_TIMEOUT", "1800"))
# LDAP attribute configuration # LDAP attribute configuration
readable_names = { readable_names: Dict[str, str] = {
'jpegphoto': 'Avatar', "jpegphoto": "Avatar",
'commonname': 'Common Name', "commonname": "Common Name",
'givenname': 'Given Name', "givenname": "Given Name",
'gecos': 'GECOS (public name)', "gecos": "GECOS (public name)",
'surname': 'Surname', "surname": "Surname",
'loginshell': 'Shell', "loginshell": "Shell",
'telephonenumber': 'Phone Number', "telephonenumber": "Phone Number",
'mobiletelephonenumber': 'Mobile Number', "mobiletelephonenumber": "Mobile Number",
'sshpublickey': 'SSH Public Key', "sshpublickey": "SSH Public Key",
'mifareidhash': 'MIFARE ID Hash', "mifareidhash": "MIFARE ID Hash",
'mail': 'Email Adress', "mail": "Email Adress",
'mailroutingaddress': 'Email Adress (external)', "mailroutingaddress": "Email Adress (external)",
} }
full_name = { full_name: Dict[str, str] = {
'cn': 'commonname', "cn": "commonname",
'gecos': 'gecos', "gecos": "gecos",
'sn': 'surname', "sn": "surname",
'mobile': 'mobiletelephonenumber', "mobile": "mobiletelephonenumber",
'l': 'locality', "l": "locality",
} }
can_add = set([ can_add: Set[str] = {
'jpegphoto', "jpegphoto",
'telephonenumber', "telephonenumber",
'mobiletelephonenumber', "mobiletelephonenumber",
'sshpublickey', "sshpublickey",
]) }
can_delete = can_add can_delete: Set[str] = can_add
can_modify = can_add | set([ can_modify: Set[str] = can_add | {
'jpegphoto', "jpegphoto",
'givenname', "givenname",
'surname', "surname",
'commonname', "commonname",
'gecos', "gecos",
]) }
can = { 'add': can_add, 'mod': can_modify, 'del': can_delete } can: Dict[str, Set[str]] = {
admin_required = set() "add": can_add,
"mod": can_modify,
default_field = (wtforms.fields.StringField, {}) "del": can_delete,
fields = { "admin": {"mifareidhash"},
'jpegphoto': (wtforms.fields.FileField, {'validators': []}), }
'mobiletelephonenumber': (wtforms.fields.StringField, {'validators': [wtforms.validators.Regexp(r'[+0-9 ]+')]}),
'telephonenumber': (wtforms.fields.StringField, {'validators': [wtforms.validators.Regexp(r'[+0-9 ]+')]}), FormField = Tuple[type[wtforms.Field], Dict[str, Any]]
default_field: FormField = (wtforms.fields.StringField, {})
fields: Dict[str, FormField] = {
"jpegphoto": (wtforms.fields.FileField, {"validators": []}),
"mobiletelephonenumber": (
wtforms.fields.StringField,
{"validators": [wtforms.validators.Regexp(r"[+0-9 ]+")]},
),
"telephonenumber": (
wtforms.fields.StringField,
{"validators": [wtforms.validators.Regexp(r"[+0-9 ]+")]},
),
} }

View file

@ -1,3 +1,4 @@
from dataclasses import dataclass
import random import random
import string import string
import hashlib import hashlib
@ -5,53 +6,87 @@ import hashlib
import flask import flask
import ldap import ldap
from webapp import app, config, validation, avatar from webapp import config, validation
class Attr(object): from typing import Optional, Dict, List
def __init__(self, name, value):
class Attr:
"""
A concrete attribute (with value) on a Profile.
"""
name: str
readable_name: Optional[str]
value: bytes
# Hash of name + value, used to uniquely identify an attribute across HTTP
# calls.
uid: str
def __init__(self, name: str, value: bytes) -> None:
name = validation.sanitize_ldap(name) name = validation.sanitize_ldap(name)
self.name = name self.name = name
self.readable_name = config.readable_names.get(name, name) self.readable_name = config.readable_names.get(name)
self.value = value self.value = value
self.uid = hashlib.sha1(name.encode('utf-8') + value).hexdigest() self.uid = hashlib.sha1(name.encode("utf-8") + value).hexdigest()
def __str__(self):
return self.value.decode('utf-8')
def get_dn(): def __str__(self) -> str:
return flask.session.get('dn') return self.value.decode("utf-8")
def get_connection(dn = None):
dn = dn or get_dn()
return app.connections[dn]
def get_admin_connection(): @dataclass
conn = app.connections[config.ldap_admin_dn] class LDAPEntry:
if not conn: """
conn = app.connections.bind(config.ldap_admin_dn, config.ldap_admin_password) An LDAP entry, eg. a user profile or a group.
return conn """
def get_profile(): # Map from uid/hash to attr.
return app.profiles[get_dn()] fields: Dict[str, Attr]
# DN of this entry
dn: str
def refresh_profile(dn=None): def __init__(self, conn: ldap.ldapobject, dn: str):
dn = dn or get_dn() res = conn.search_s(dn, ldap.SCOPE_SUBTREE)
conn = get_connection(dn) assert len(res) == 1
if not conn: self.dn = dn
return # no session, nothing to refresh i guess self.fields = {}
res = conn.search_s(dn, ldap.SCOPE_SUBTREE) for attr, vs in res[0][1].items():
assert(len(res) == 1) for v in vs:
profile = {} a = Attr(attr, v)
for attr, vs in res[0][1].items(): self.fields[a.uid] = a
for v in vs:
a = Attr(attr, v)
profile[a.uid] = a
if attr == 'uid':
user_uid = v.decode('utf-8')
app.profiles[dn] = profile
# bust avatar cache def get_attr(self, attr: str) -> Optional[Attr]:
if user_uid: for v in self.fields.values():
avatar.cache.reset_user(user_uid) if v.name == attr:
avatar.hash_cache.reset() return v
return None
return profile @property
def fields_sorted(self) -> List[Attr]:
fields = list(self.fields.values())
fields.sort(key=lambda attr: attr.name)
fields.sort(key=lambda attr: attr.readable_name is None)
return fields
@property
def name(self) -> str:
res = self.get_attr("commonname")
assert res is not None
return res.value.decode()
class Profile(LDAPEntry):
"""
A user profile.
"""
# Map from uid/hash to attr.
fields: Dict[str, Attr]
# DN of this profile
dn: str
@property
def username(self) -> str:
res = self.get_attr("uid")
assert res is not None
return res.value.decode()

View file

@ -2,49 +2,57 @@ import smtplib
from email.message import EmailMessage from email.message import EmailMessage
import flask import flask
import datetime import datetime
from typing import Optional
from webapp import config, context from webapp import config, context
cached_connection = None cached_connection: Optional[smtplib.SMTP] = None
def test_connection_open(conn):
def test_connection_open(conn: smtplib.SMTP) -> bool:
try: try:
status = conn.noop()[0] status = conn.noop()[0]
except: except:
status = -1 status = -1
return True if status == 250 else False return True if status == 250 else False
def create_connection():
conn = smtplib.SMTP_SSL(config.smtp_server)
conn.login(config.smtp_user, config.smtp_password)
return conn
def get_connection(): def create_connection() -> smtplib.SMTP:
global cached_connection conn = smtplib.SMTP_SSL(config.smtp_server)
if test_connection_open(cached_connection): conn.login(config.smtp_user, config.smtp_password)
return conn
def get_connection() -> smtplib.SMTP:
global cached_connection
if cached_connection is not None and test_connection_open(cached_connection):
return cached_connection
cached_connection = create_connection()
return cached_connection return cached_connection
cached_connection = create_connection()
return cached_connection
def send_email(conn, subject, body, recipient_emails): def send_email(
msg = EmailMessage() conn: smtplib.SMTP, subject: str, body: str, recipient_emails: str
msg.set_content(body) ) -> None:
msg['Subject'] = subject msg = EmailMessage()
msg.set_content(body)
msg["Subject"] = subject
sender_email = config.smtp_format.format(config.smtp_user) sender_email = config.smtp_format.format(config.smtp_user)
msg['From'] = f'LDAPWeb <{sender_email}>' msg["From"] = f"LDAPWeb <{sender_email}>"
msg['To'] = recipient_emails msg["To"] = recipient_emails
conn.send_message(msg) conn.send_message(msg)
def send_papertrail(title, description):
username = flask.session.get('username')
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
subject = f'[LDAPWeb] {title}' def send_papertrail(title: str, description: str) -> None:
body = f"Changed by {username} at {current_time}:\n\n{description or title}" username = flask.session.get("username")
recipients = config.papertrail_recipients current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
conn = get_connection() subject = f"[LDAPWeb] {title}"
send_email(conn, subject, body, recipients) body = f"Changed by {username} at {current_time}:\n\n{description or title}"
recipients = config.papertrail_recipients
conn = get_connection()
send_email(conn, subject, body, recipients)

View file

@ -1,66 +1,59 @@
import re import re
import ldap import ldap
from webapp import config from webapp import config
def is_valid_name(name): from typing import List, Tuple, Any, Dict
"""`true` if `name` is a safe ldap uid/cn"""
return re.match(r'^[a-zA-Z_][a-zA-Z0-9-_\.]*\Z', name) is not None
def validate_name(name):
"""Raises `RuntimeError` if `name` is not a safe ldap uid/cn"""
if not is_valid_name(name):
raise RuntimeError('Invalid name')
def user_dn(uid):
validate_name(uid)
return config.ldap_user_dn_format.format(uid)
def group_dn(cn):
validate_name(cn)
return config.ldap_group_dn_format.format(cn)
def wrap(filter):
if len(filter) and filter[0] == '(' and filter[-1] == ')':
return filter
else:
return f'({filter})'
def _or(*filters):
wrapped = ''.join(wrap(f) for f in filters)
return f'(|{wrapped})'
def _and(*filters):
wrapped = ''.join(wrap(f) for f in filters)
return f'(&{wrapped})'
def _not(filter):
wrapped = wrap(filter)
return f'(!{wrapped})'
def member_of_any(groups):
"""Returns a filter that matches users that are a member of any of the given group names"""
return _or(*(f'memberOf={group_dn(group)}' for group in groups))
def groups_of_user(uid):
"""Returns a filter that matches groups that have the given user as a member"""
return f'(&(objectClass=groupOfUniqueNames)(uniqueMember={user_dn(uid)}))'
def normalized_entries(entries):
"""
Converts ldap entries from python-ldap format into a more convenient
List[Tuple[
dn,
List[tuple[attr_name, attr_value]]
]]
"""
normalized = []
for dn, attrs in entries:
normalized_attrs = []
for attr, values in attrs.items():
for value in values:
normalized_attrs.append((attr, value.decode()))
normalized.append((dn, normalized_attrs))
return normalized
def is_valid_name(name: str) -> bool:
"""`true` if `name` is a safe ldap uid/cn"""
return re.match(r"^[a-zA-Z_][a-zA-Z0-9-_\.]*\Z", name) is not None
def validate_name(name: str) -> None:
"""Raises `RuntimeError` if `name` is not a safe ldap uid/cn"""
if not is_valid_name(name):
raise RuntimeError("Invalid name")
def user_dn(uid: str) -> str:
validate_name(uid)
return config.ldap_user_dn_format.format(uid)
def group_dn(cn: str) -> str:
validate_name(cn)
return config.ldap_group_dn_format.format(cn)
def wrap(filter: str) -> str:
if len(filter) and filter[0] == "(" and filter[-1] == ")":
return filter
else:
return f"({filter})"
def _or(*filters: str) -> str:
wrapped = "".join(wrap(f) for f in filters)
return f"(|{wrapped})"
def _and(*filters: str) -> str:
wrapped = "".join(wrap(f) for f in filters)
return f"(&{wrapped})"
def _not(filter: str) -> str:
wrapped = wrap(filter)
return f"(!{wrapped})"
def member_of_any(groups: List[str]) -> str:
"""Returns a filter that matches users that are a member of any of the given group names"""
return _or(*(f"memberOf={group_dn(group)}" for group in groups))
def groups_of_user(uid: str) -> str:
"""Returns a filter that matches groups that have the given user as a member"""
return f"(&(objectClass=groupOfUniqueNames)(uniqueMember={user_dn(uid)}))"

View file

@ -4,67 +4,89 @@ import sys
import threading import threading
import time import time
log = logging.getLogger('ldap-web.lru') from dataclasses import dataclass
from typing import Dict, Generic, TypeVar, List, Callable, Optional
def locked(f): log = logging.getLogger("ldap-web.lru")
@functools.wraps(f)
def func(self, *a, **kw):
self.lock.acquire()
try:
return f(self, *a, **kw)
finally:
self.lock.release()
return func
class LRUPool(threading.Thread): K = TypeVar("K")
V = TypeVar("V")
Cb = Callable[[K], None]
@dataclass
class _Entry(Generic[V]):
c: V
atime: float
class LRUPool(threading.Thread, Generic[K, V]):
"""A key-value pool to store objects with a timeout. """A key-value pool to store objects with a timeout.
Consumers of objects can register callbacks which will be called on Consumers of objects can register callbacks which will be called on
object expiry. Expiry is least-recently-used - any access reset the object expiry. Expiry is least-recently-used - any access reset the
time counter.""" time counter."""
def __init__(self, timeout=60.0, **kw):
threading.Thread.__init__(self, **kw) def __init__(self, timeout: float = 60.0) -> None:
super().__init__()
self.setDaemon(True) self.setDaemon(True)
self.pool = {} self.pool: Dict[K, _Entry[V]] = {}
self.timeout = timeout self.timeout = timeout
self.lock = threading.Lock() self.lock = threading.Lock()
self.callbacks = {} self.callbacks: Dict[str, List[Cb[K]]] = {}
def run(self):
def run(self) -> None:
log.info("Pool {} starting.", self) log.info("Pool {} starting.", self)
while True: while True:
time.sleep(self.timeout / 2) time.sleep(self.timeout / 2)
self.lock.acquire() self.lock.acquire()
now = time.time() now = time.time()
drop = set() drop = set()
for k, [c, atime] in self.pool.items(): for k, entry in self.pool.items():
if now - atime > self.timeout: if now - entry.atime > self.timeout:
log.info("Pool {} dropping {}.", self, k) log.info("Pool {} dropping {}.", self, k)
drop.add(k) drop.add(k)
for k in list(drop): for k in list(drop):
self._drop(k) self._drop(k)
self.lock.release() self.lock.release()
def register_callback(self, action, cb):
def register_callback(self, action: str, cb: Cb[K]) -> None:
self.callbacks.setdefault(action, []).append(cb) self.callbacks.setdefault(action, []).append(cb)
@locked
def __getitem__(self, key): def __getitem__(self, key: K) -> Optional[V]:
item = self.pool.get(key) self.lock.acquire()
if not item: try:
return item = self.pool.get(key)
item[1] = time.time() if not item:
return item[0] return None
def _insert(self, key, item): item.atime = time.time()
self.pool[key] = [item, time.time()] return item.c
finally:
self.lock.release()
def _insert(self, key: K, item: V) -> V:
self.pool[key] = _Entry(item, time.time())
return item return item
@locked
def insert(self, key, item): def insert(self, key: K, item: V) -> V:
return self._insert(key, item) self.lock.acquire()
def _drop(self, key): try:
for f in self.callbacks.get('drop',[]): return self._insert(key, item)
finally:
self.lock.release()
def _drop(self, key: K) -> Optional[V]:
for f in self.callbacks.get("drop", []):
f(key) f(key)
return self.pool.pop(key, None) res = self.pool.pop(key, None)
@locked if res is None:
def drop(self, key): return None
return self._drop(key) return res.c
def drop(self, key: K) -> Optional[V]:
self.lock.acquire()
try:
return self._drop(key)
finally:
self.lock.release()

View file

@ -1,38 +1,45 @@
import ldap import ldap
import logging
import kerberos import kerberos
import flask import flask
import flask_wtf import flask_wtf
import werkzeug
from webapp import app, context, config from webapp import app, context, config
from webapp.auth import login_required from webapp.auth import login_required
bp = flask.Blueprint('passwd', __name__) bp = flask.Blueprint("passwd", __name__)
@bp.route('/passwd', methods=["GET"])
@bp.route("/passwd", methods=["GET"])
@login_required @login_required
def passwd_form(): def passwd_form() -> werkzeug.Response:
return flask.render_template('passwd.html') return flask.Response(flask.render_template("passwd.html"))
def _passwd_kadmin(current, new):
username = flask.session.get('username') def _passwd_kadmin(current: str, new: str) -> bool:
username = flask.session.get("username")
try: try:
principal_name = config.kadmin_principal_map.format(username) principal_name = config.kadmin_principal_map.format(username)
return kerberos.changePassword(principal_name, current, new) return kerberos.changePassword(principal_name, current, new)
except Exception as e: except Exception as e:
print('Kerberos error:', e) print("Kerberos error:", e)
logging.exception('kpasswd failed') logging.exception("kpasswd failed")
return False return False
@bp.route('/passwd', methods=["POST"])
@bp.route("/passwd", methods=["POST"])
@login_required @login_required
def passwd_action(): def passwd_action() -> werkzeug.Response:
current, new, confirm = (flask.request.form[n] for n in ('current', 'new', 'confirm')) current, new, confirm = (
flask.request.form[n] for n in ("current", "new", "confirm")
)
if new != confirm: if new != confirm:
flask.flash("New passwords don't match", category='danger') flask.flash("New passwords don't match", category="danger")
return flask.render_template('passwd.html') return flask.Response(flask.render_template("passwd.html"))
if _passwd_kadmin(current, new): if _passwd_kadmin(current, new):
flask.flash('Password changed', category='info') flask.flash("Password changed", category="info")
else: else:
flask.flash('Wrong password', category='danger') flask.flash("Wrong password", category="danger")
return flask.render_template('passwd.html') return flask.Response(flask.render_template("passwd.html"))

View file

@ -2,20 +2,27 @@ import ldap
from webapp import lru, config from webapp import lru, config
class LDAPConnectionPool(lru.LRUPool): from typing import Optional, Any
def __init__(self, url, use_tls=True, **kw):
lru.LRUPool.__init__(self, **kw) Pool = lru.LRUPool[str, ldap.ldapobject]
class LDAPConnectionPool(Pool):
def __init__(self, url: str, use_tls: bool = True, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.use_tls = use_tls self.use_tls = use_tls
self.url = url self.url = url
@lru.locked def bind(self, dn: str, password: str) -> ldap.ldapobject:
def bind(self, dn, password): self.lock.acquire()
conn = ldap.initialize(self.url) try:
if(self.use_tls): conn = ldap.initialize(self.url)
conn.start_tls_s() if self.use_tls:
conn.simple_bind_s(dn, password) conn.start_tls_s()
return self._insert(dn, conn) conn.simple_bind_s(dn, password)
def unbind(self, dn): return self._insert(dn, conn)
finally:
self.lock.release()
def unbind(self, dn: str) -> Optional[ldap.ldapobject]:
return self.drop(dn) return self.drop(dn)

View file

@ -1,6 +1,6 @@
{% extends 'basic.html' %} {% extends 'basic.html' %}
{% block content %} {% block content %}
<h1>Group: {{ name }}</h1> <h1>Group: {{ group.name }}</h1>
<div style="margin-bottom: 10px"> <div style="margin-bottom: 10px">
<a class="btn btn-default" href="/admin/groups" role="button">Back</a> <a class="btn btn-default" href="/admin/groups" role="button">Back</a>
@ -26,11 +26,11 @@
<th scope="col">Attribute</th> <th scope="col">Attribute</th>
<th scope="col" class="profile-table-value">Value</th> <th scope="col" class="profile-table-value">Value</th>
</tr> </tr>
{% for attr, attr_readable, value in attributes %} {% for attr in group.fields_sorted %}
<tr> <tr>
<td>{{ attr }}</td> <td>{{ attr.name }}</td>
<td>{{ attr_readable if attr_readable else '' }}</td> <td>{{ attr.readable_name if attr.readable_name else '' }}</td>
<td class="profile-table-value">{{ value }}</td> <td class="profile-table-value">{{ attr.value.decode() }}</td>
</tr> </tr>
{% endfor %} {% endfor %}
</table> </table>

View file

@ -1,14 +1,14 @@
{% extends 'basic.html' %} {% extends 'basic.html' %}
{% block content %} {% block content %}
<img src="/avatar/user/{{ uid }}" class="profile-avatar-lmao" /> <img src="/avatar/user/{{ profile.username }}" class="profile-avatar-lmao" />
<h1>User: {{ uid }}</h1> <h1>User: {{ profile.username }}</h1>
<div style="margin-bottom: 10px"> <div style="margin-bottom: 10px">
<a class="btn btn-default" href="/admin/users" role="button">Back</a> <a class="btn btn-default" href="/admin/users" role="button">Back</a>
<a class="btn btn-default" href="https://kasownik.hackerspace.pl/admin/member/{{ uid }}" role="button" target="_blank">View user in Kasownik</a> <a class="btn btn-default" href="https://kasownik.hackerspace.pl/admin/member/{{ profile.username }}" role="button" target="_blank">View user in Kasownik</a>
{% if not is_protected %} {% if not is_protected %}
<a class="btn btn-default modalLink" href="/admin/users/{{ uid }}/add_mifareidhash" role="button">Add mifareIDHash</a> <a class="btn btn-default modalLink" href="/admin/users/{{ profile.username }}/add_mifareidhash" role="button">Add mifareIDHash</a>
{% endif %} {% endif %}
</div> </div>
@ -33,14 +33,14 @@
<th scope="col" class="profile-table-value">Value</th> <th scope="col" class="profile-table-value">Value</th>
<th scope="col" class="profile-table-options">Options</th> <th scope="col" class="profile-table-options">Options</th>
</tr> </tr>
{% for attr, attr_readable, value in profile %} {% for attr in profile.fields_sorted %}
<tr> <tr>
<td>{{ attr }}</td> <td>{{ attr.name }}</td>
<td>{{ attr_readable if attr_readable else '' }}</td> <td>{{ attr.readable_name if attr.readable_name else '' }}</td>
<td class="profile-table-value">{{ '(...omitted...)' if attr == 'jpegPhoto' else value }}</td> <td class="profile-table-value">{{ '(...omitted...)' if attr.name == 'jpegphoto' else attr.value.decode() }}</td>
<td class="profile-table-options"> <td class="profile-table-options">
{% if not is_protected and attr == 'mifareIDHash' %} {% if not is_protected and attr.name == 'mifareidhash' %}
<a class="modalLink" href="/admin/users/{{ uid }}/del_mifareidhash?value={{ value | urlencode }}"><span class="glyphicon glyphicon-minus-sign" aria-hidden="true"></span> Remove</a> <a class="modalLink" href="/admin/users/{{ profile.username }}/del_mifareidhash/{{ attr.uid }}"><span class="glyphicon glyphicon-minus-sign" aria-hidden="true"></span> Remove</a>
{% endif %} {% endif %}
</td> </td>
</tr> </tr>

View file

@ -1,5 +1,5 @@
<div class="modal fade" tabindex="-1" role="dialog"> <div class="modal fade" tabindex="-1" role="dialog">
<form action="/vcard/add/{{ attr_name }}" method="POST" class="form-signin" <form action="{{ action }}" method="POST" class="form-signin"
{% if attr_name == 'jpegphoto' %} {% if attr_name == 'jpegphoto' %}
enctype="multipart/form-data" enctype="multipart/form-data"
{% endif %} {% endif %}

View file

@ -1,5 +1,5 @@
<div class="modal fade" tabindex="-1" role="dialog"> <div class="modal fade" tabindex="-1" role="dialog">
<form action="/vcard/delete/{{ uid }}" method="POST" class="form-signin"> <form action="{{ action }}" method="POST" class="form-signin">
<div class="modal-dialog" role="document"> <div class="modal-dialog" role="document">
<div class="modal-content"> <div class="modal-content">
<div class="modal-header"> <div class="modal-header">

View file

@ -1,5 +1,5 @@
<div class="modal fade" tabindex="-1" role="dialog"> <div class="modal fade" tabindex="-1" role="dialog">
<form action="/vcard/modify/{{ uid }}" method="POST" class="form-signin"> <form action="{{ action }}" method="POST" class="form-signin">
<div class="modal-dialog" role="document"> <div class="modal-dialog" role="document">
<div class="modal-content"> <div class="modal-content">
<div class="modal-header"> <div class="modal-header">

View file

@ -4,26 +4,17 @@ import flask
from webapp import config from webapp import config
def validate(predicate, arg, error='Error!', redirect='/'):
def decorator(f):
@functools.wraps(f)
def func(**kw):
v = kw.get(arg)
if predicate(v):
return f(**kw)
else:
flask.flash(error, category='error')
return flask.redirect(redirect)
return func
return decorator
def sanitize_perms(): def sanitize_perms() -> None:
config.can = { k: set(map(sanitize_ldap, v)) for k,v in config.can.items() } config.can = {k: set(map(sanitize_ldap, v)) for k, v in config.can.items()}
def sanitize_readable():
config.readable_names = { sanitize_ldap(k): v for k, v in config.readable_names.items() }
def sanitize_ldap(k): def sanitize_readable() -> None:
config.readable_names = {
sanitize_ldap(k): v for k, v in config.readable_names.items()
}
def sanitize_ldap(k: str) -> str:
k = k.lower() k = k.lower()
return (k in config.full_name and config.full_name[k]) or k return (k in config.full_name and config.full_name[k]) or k

View file

@ -1,117 +1,334 @@
from dataclasses import dataclass
from enum import Enum
from functools import reduce
from typing import (
Optional,
cast,
Dict,
List,
TypedDict,
Literal,
Union,
TypeVar,
Generic,
)
import ldap import ldap
import flask import flask
import flask_wtf import flask_wtf
import wtforms
import werkzeug
from webapp import app, context, config, validation, avatar from webapp import app, context, config, validation, avatar
from webapp.auth import login_required from webapp.auth import login_required
bp = flask.Blueprint('vcard', __name__) bp = flask.Blueprint("vcard", __name__)
perm_errors = {
'add': 'You cannot add this attribute!',
'mod': 'You cannot change this attribute!',
'del': 'You cannot delete this attribute!',
}
templates = { # NOTE: this code is quite hairy. Simplifying it further is very possible, but
'add': 'ops/add.html', # would require actually designing this from first principles, instead of
'mod': 'ops/mod.html', # continuing to hack on a _very_ old codebase.
'del': 'ops/del.html', #
} # This started out as a very Lispy codebase with stringly-typed operation
# kinds. It has now be rewritten to be extremely class/type-heavy. There's
# probably a better compromise to be made, but I've been writing too much Rust
# recently. ~q3k
def attr_op(op, attrName, uid = None, success_redirect='/vcard',
fatal_redirect='/vcard'):
try:
attr, old_value = None, None
if uid:
attr = context.get_profile()[uid]
attrName = attr.name
old_value = str(attr)
form = DelForm() if op == 'del' else app.forms[attrName](value=old_value)
form.attr_data = attr
if attrName not in config.can[op]:
flask.flash(perm_errors[op], 'danger')
return flask.redirect(fatal_redirect)
if form.validate_on_submit():
if op in ['add', 'mod']:
new_value = form.value.data
admin = attrName in config.admin_required
conn = context.get_admin_connection() if admin else context.get_connection()
dn = context.get_dn()
# Most fields should be modified by remove/add.
if op == 'mod' and attrName not in ['commonname']:
op = 'modreadd'
if attrName == 'jpegphoto':
# Temporary workaround: deleting jpegPhoto doesn't work, set to empty instead
if op == 'del':
op = 'mod'
new_value = ''
else:
try:
op = 'mod'
processed_avatar = avatar.process_upload(new_value.read())
new_value = processed_avatar
print(f'Uplading avatar (size: {len(processed_avatar)}) for {dn}')
except Exception as e:
flask.flash('Could not process avatar: {}'.format(e), 'danger')
return flask.redirect(fatal_redirect)
def to_ldap(s):
if isinstance(s, bytes):
return s
return s.encode('utf-8')
if op in ['del', 'modreadd']:
conn.modify_s(dn, [(ldap.MOD_DELETE, attrName, to_ldap(old_value))])
if op in ['add', 'modreadd']:
conn.modify_s(dn, [(ldap.MOD_ADD, attrName, to_ldap(new_value))])
if op in ['mod']:
conn.modify_s(dn, [(ldap.MOD_REPLACE, attrName, to_ldap(new_value))])
context.refresh_profile()
return flask.redirect(success_redirect)
if form.is_submitted():
for field, errors in form.errors.items():
for error in errors:
flask.flash("Error in the {} field - {}".format(
getattr(form, field).label.text,
error
), 'danger')
return flask.redirect(success_redirect)
status = 400 if form.is_submitted() else 200
return flask.make_response(flask.render_template(templates[op], fatal_redirect=fatal_redirect,
attr_op=op, form=form, attr_name=attrName, uid=uid), status)
except ldap.LDAPError as e:
print('LDAP error:', e)
flask.flash('Could not modify profile', 'danger')
return flask.redirect(fatal_redirect)
class DelForm(flask_wtf.FlaskForm): class DelForm(flask_wtf.FlaskForm):
pass """
Form used to delete an attribute from a profile.
@bp.route('/vcard', methods=['GET']) Used as is.
"""
attr_data: context.Attr
class AddModifyForm(flask_wtf.FlaskForm):
"""
Form used to add or modify an attribute in a profile.
Base class subclassed by dynamically generated classes in initialize_forms.
"""
attr_data: Optional[context.Attr]
value: wtforms.Field
def initialize_forms() -> Dict[str, type[AddModifyForm]]:
"""
Create AddModifyForm subclasses keyed by attribute name, based on config.
"""
forms: Dict[str, type[AddModifyForm]] = {}
for f in reduce(lambda a, b: a | b, config.can.values()):
cls, attrs = config.fields.get(f, config.default_field)
class AddForm(AddModifyForm):
value = cls(label=config.readable_names.get(f), **attrs)
AddForm.__name__ == "Add" + f
forms[f] = AddForm
return forms
add_modify_forms = initialize_forms()
F = TypeVar("F", bound=flask_wtf.FlaskForm)
class Operation(Generic[F]):
"""
Base class for all LDAP operations.
Subclassed by operation class tree: first by whether the operation referes
to a plain attribute name, or to an already existing profile attribute by
uid.
Then further subclassed by operation kind: add, delete or modify.
"""
kind: str
perm_error: str
dn: str
attr_name: str
@property
def _template_path(self) -> str:
return f"ops/{self.kind}.html"
def _make_form(self) -> F:
"""
Return a FlaskForm to be displayed to the user.
"""
raise NotImplementedError
def _render_form(self, form: F, action: str) -> werkzeug.Response:
"""
Render a FlaskForm with the data specific to the form type.
"""
raise NotImplementedError
def _on_form_submit(self, dn: str, conn: ldap.ldapobject, form: F) -> None:
"""
Act on a submitted and validated form, performing actual LDAP
mutations.
"""
raise NotImplementedError
def _allowed(self, subject_dn: str) -> Optional[str]:
if self.attr_name not in config.can[self.kind]:
return self.perm_error
return None
def perform(
self,
success_redirect: str = "/vcard",
fatal_redirect: str = "/vcard",
action: str = "/vcard",
) -> werkzeug.Response:
"""
Primary entrypoint to operation. To be called from a view.
"""
conn = app.get_connection()
assert conn is not None
# Check permissions per config.
perm_err = self._allowed(self.dn)
if perm_err is not None:
flask.flash(perm_err, "danger")
return flask.redirect(fatal_redirect)
form = self._make_form()
if form.validate_on_submit():
# If form data is valid, perform LDAP mutation.
try:
self._on_form_submit(self.dn, conn, form)
except ldap.LDAPError as e:
print("LDAP error:", e)
flask.flash("Could not modify profile", "danger")
return flask.redirect(fatal_redirect)
profile = app.refresh_profile(conn)
assert profile is not None
avatar.cache.reset_user(profile.username)
avatar.hash_cache.reset()
return flask.redirect(success_redirect)
else:
# Otherwise, collect validation errors (if appropriate) and render
# the form.
if form.is_submitted():
for field, errors in form.errors.items():
assert field is not None
for error in errors:
flask.flash(
"Error in the {} field - {}".format(
getattr(form, field).label.text, error
),
"danger",
)
return self._render_form(form, action)
class OperationWithAttrName(Operation[F]):
"""
Operations acting on an attribute name, without a corresponding existing
attribute. This is used when adding profile fields/attributes.
"""
def __init__(self, dn: str, attr_name: str) -> None:
self.dn = dn
self.attr_name = attr_name
def _render_form(self, form: F, action: str) -> werkzeug.Response:
return flask.Response(
flask.render_template(
self._template_path, form=form, attr_name=self.attr_name, action=action
)
)
class OperationWithUid(Operation[F]):
"""
Operations acting on an existing attribute name and value, ie. an Attr with
'uid'. This is used when modifying existing fields/attributes or removing
them.
"""
attr: context.Attr
def __init__(self, dn: str, uid: str):
self.dn = dn
profile = app.get_profile(dn)
assert profile is not None
assert uid in profile.fields
self.attr = profile.fields[uid]
self.attr_name = self.attr.name
def _render_form(self, form: F, action: str) -> werkzeug.Response:
return flask.Response(
flask.render_template(
self._template_path,
form=form,
attr_name=self.attr.name,
uid=self.attr.uid,
action=action,
)
)
class OperationAdd(OperationWithAttrName[AddModifyForm]):
kind = "add"
perm_error = "You cannot add this attribute!"
def _make_form(self) -> AddModifyForm:
form = add_modify_forms[self.attr_name]()
return form
def _on_form_submit(
self, dn: str, conn: ldap.ldapobject, form: AddModifyForm
) -> None:
# Special case for jpegphoto
value = form.value.data
if self.attr_name == "jpegphoto":
value = avatar.process_upload(form.value.data.read())
print(f"Uploading avatar (size: {len(value)}) for {dn}")
# jpegPhoto should always be REPLACED.
conn.modify_s(dn, [(ldap.MOD_REPLACE, self.attr_name, value)])
return
assert value is not None
conn.modify_s(dn, [(ldap.MOD_ADD, self.attr_name, value.encode())])
class OperationModify(OperationWithUid[AddModifyForm]):
kind = "mod"
perm_error = "You cannot modify this attribute!"
def _make_form(self) -> AddModifyForm:
form = add_modify_forms[self.attr_name](value=str(self.attr))
return form
def _on_form_submit(
self, dn: str, conn: ldap.ldapobject, form: AddModifyForm
) -> None:
# Special case for jpegphoto
value = form.value.data
if self.attr_name == "jpegphoto":
value = avatar.process_upload(form.value.data.read())
print(f"Uploading avatar (size: {len(value)}) for {dn}")
assert value is not None
if self.attr_name in ["commonname"]:
# Modify directly
conn.modify_s(dn, [(ldap.MOD_REPLACE, self.attr_name, value.encode())])
else:
# Remove and add again.
conn.modify_s(dn, [(ldap.MOD_DELETE, self.attr_name, self.attr.value)])
conn.modify_s(dn, [(ldap.MOD_ADD, self.attr_name, value.encode())])
class OperationDelete(OperationWithUid[DelForm]):
kind = "del"
perm_error = "You cannot delete this attribute!"
def _make_form(self) -> DelForm:
res = DelForm()
res.attr_data = self.attr
return res
def _on_form_submit(
self, dn: str, conn: ldap.ldapobject, form: flask_wtf.FlaskForm
) -> None:
if self.attr_name == "jpegphoto":
# We apparently can't remove these, so just set it empty.
conn.modify_s(dn, [(ldap.MOD_REPLACE, self.attr_name, b"")])
return
conn.modify_s(dn, [(ldap.MOD_DELETE, self.attr_name, self.attr.value)])
@bp.route("/vcard", methods=["GET"])
@login_required @login_required
def vcard(): def vcard() -> werkzeug.Response:
data = {} data: Dict[str, List[context.Attr]] = {}
for v in context.get_profile().values(): profile = app.get_profile()
assert profile is not None
for v in profile.fields.values():
data.setdefault(v.name, []).append(v) data.setdefault(v.name, []).append(v)
return flask.render_template('vcard.html', can_add=config.can['add'], return flask.Response(
can_modify=config.can['mod'], can_delete=config.can['del'], profile=data) flask.render_template(
"vcard.html",
can_add=config.can["add"],
can_modify=config.can["mod"],
can_delete=config.can["del"],
profile=data,
)
)
@bp.route('/vcard/add/<attrName>', methods=['GET', 'POST'])
@login_required
def add_attr(attrName):
return attr_op('add', attrName)
@bp.route('/vcard/delete/<uid>', methods=['GET', 'POST']) @bp.route("/vcard/add/<attr_name>", methods=["GET", "POST"])
@login_required @login_required
def del_attr(uid): def add_attr(attr_name: str) -> werkzeug.Response:
return attr_op('del', None, uid) dn = app.get_dn()
assert dn is not None
op = OperationAdd(dn, attr_name)
return op.perform(action=f"/vcard/add/{attr_name}")
@bp.route('/vcard/modify/<uid>', methods=['GET', 'POST'])
@bp.route("/vcard/delete/<uid>", methods=["GET", "POST"])
@login_required @login_required
def mod_attr(uid): def del_attr(uid: str) -> werkzeug.Response:
return attr_op('mod', None, uid) dn = app.get_dn()
assert dn is not None
op = OperationDelete(dn, uid)
return op.perform(action=f"/vcard/delete/{uid}")
@bp.route("/vcard/modify/<uid>", methods=["GET", "POST"])
@login_required
def mod_attr(uid: str) -> werkzeug.Response:
dn = app.get_dn()
assert dn is not None
op = OperationModify(dn, uid)
return op.perform(action=f"/vcard/modify/{uid}")

View file

@ -1,10 +1,11 @@
import flask import flask
import werkzeug
from webapp import app, context, config from webapp import app, context, config
from webapp.auth import login_required from webapp.auth import login_required
@app.route("/") @app.route("/")
@login_required @login_required
def root(): def root() -> werkzeug.Response:
return flask.render_template('root.html', **flask.session) return flask.Response(flask.render_template("root.html", **flask.session))