*: type annotate

This commit is contained in:
q3k 2024-07-07 16:44:39 +02:00
parent e84974ac74
commit c6688ec8cb
22 changed files with 743 additions and 483 deletions

9
stubs/flask_wtf.pyi Normal file
View file

@ -0,0 +1,9 @@
import wtforms
from typing import Protocol
class FlaskForm(wtforms.Form):
def validate_on_submit(self) -> bool:
...
def is_submitted(self) -> bool:
...

2
stubs/kerberos.pyi Normal file
View file

@ -0,0 +1,2 @@
def changePassword(principal: str, current: str, new: str) -> bool:
...

43
stubs/ldap.pyi Normal file
View file

@ -0,0 +1,43 @@
from enum import Enum
from typing import Protocol, List, Tuple, Dict, Optional
class Scope(Enum):
BASE = 'base'
ONELEVEL = 'onelevel'
SUBTREE = 'subtree'
SCOPE_BASE = Scope.BASE
SCOPE_ONELEVEL = Scope.ONELEVEL
SCOPE_SUBTREE = Scope.SUBTREE
class Mod(Enum):
ADD = 'add'
DELETE = 'delete'
REPLACE = 'replace'
MOD_ADD = Mod.ADD
MOD_DELETE = Mod.DELETE
MOD_REPLACE = Mod.REPLACE
ModEntry = Tuple[Mod, str, bytes | List[bytes]]
class LDAPError(Exception):
...
class NO_SUCH_OBJECT(LDAPError):
...
class ldapobject(Protocol):
def start_tls_s(self) -> None:
...
def simple_bind_s(self, dn: str, password: str) -> None:
...
def search_s(self, base: str, scope: Scope, filterstr: str = '(objectClass=*)', attrlist: Optional[List[str]] = None, attrsonly: int = 0) -> List[Tuple[str, Dict[str, List[bytes]]]]:
...
def modify_s(self, dn: str, modlist: List[ModEntry]) -> None:
...
def initialize(url: str) -> ldapobject:
...

View file

@ -1,46 +1,81 @@
import ldap
import flask
import flask_wtf
import wtforms
from functools import reduce
app = flask.Flask(__name__)
from webapp import validation, pools, config, context
from webapp import validation, pools, config
from typing import Dict, Any, Protocol, Optional
if hasattr(config, "secret_key"):
app.secret_key = config.secret_key
if hasattr(config, "debug"):
app.debug = config.debug
class App(flask.Flask):
connections: pools.LDAPConnectionPool
profiles: Dict[str, context.Profile]
def __init__(self, name: str):
super().__init__(name)
if hasattr(config, "secret_key"):
self.secret_key = config.secret_key
if hasattr(config, "debug"):
self.debug = config.debug
def get_dn(self) -> Optional[str]:
return flask.session.get('dn')
def get_connection(self, dn: Optional[str] = None) -> Optional[ldap.ldapobject]:
dn = dn or self.get_dn()
if dn is None:
return None
return self.connections[dn]
def get_admin_connection(self) -> ldap.ldapobject:
conn = self.connections[config.ldap_admin_dn]
if not conn:
conn = self.connections.bind(config.ldap_admin_dn, config.ldap_admin_password)
return conn
def get_profile(self, dn: Optional[str] = None) -> Optional[context.Profile]:
dn = dn or self.get_dn()
if dn is None:
return None
return self.profiles.get(dn)
def refresh_profile(self, conn: ldap.ldapobject, dn: Optional[str] = None) -> Optional[context.Profile]:
dn = dn or self.get_dn()
if dn is None:
return None
profile = context.Profile(conn, dn)
self.profiles[dn] = profile
return profile
app = App(__name__)
@app.context_processor
def inject_readable():
def inject_readable() -> Dict[str, Any]:
return dict(readable_names=config.readable_names)
@app.context_processor
def inject_hackerspace_name():
def inject_hackerspace_name() -> Dict[str, Any]:
return dict(hackerspace_name=config.hackerspace_name)
@app.template_filter('first')
def ldap_first(v):
def ldap_first(v: str) -> str:
return v and v[0]
@app.template_filter('readable')
def readable_tf(n):
def readable_tf(n: str) -> str:
return config.readable_names.get(n, n)
def initialize_forms():
forms = {}
for f in reduce(lambda a,b: a | b, config.can.values()):
cls, attrs = config.fields.get(f, config.default_field)
class AddForm(flask_wtf.FlaskForm):
value = cls(label=config.readable_names.get(f), **attrs)
AddForm.__name__ == 'Add' + f
forms[f] = AddForm
return forms
def start():
def start() -> None:
validation.sanitize_perms()
validation.sanitize_readable()
@ -50,13 +85,11 @@ def start():
app.register_blueprint(module.bp)
app.connections = pools.LDAPConnectionPool(config.ldap_url, timeout=300.0)
def drop_profile(dn):
def drop_profile(dn: str) -> None:
if dn != config.ldap_admin_dn:
del app.profiles[dn]
app.connections.register_callback('drop', drop_profile)
app.connections.start()
app.profiles = {}
app.forms = initialize_forms()
start()

View file

@ -4,15 +4,23 @@ import re
import flask
import flask_wtf
import wtforms
import werkzeug
import webapp
from webapp import app, context, config, ldaputils, email
from webapp import app, context, config, ldaputils, email, vcard
from typing import Callable, ParamSpec, List, Tuple, Optional, Dict, Protocol
bp = flask.Blueprint('admin', __name__)
def admin_required_impl(f):
Entry = Tuple[str, List[Tuple[str, str]]]
P = ParamSpec('P')
def admin_required_impl(f: Callable[P, werkzeug.Response]) -> Callable[P, werkzeug.Response]:
@functools.wraps(f)
def func(*a, **kw):
def func(*a: P.args, **kw: P.kwargs) -> werkzeug.Response:
# TODO: Actually check for admin perms
if not flask.session['is_admin']:
flask.abort(403)
@ -20,11 +28,13 @@ def admin_required_impl(f):
return f(*a, **kw)
return func
def admin_required(f):
def admin_required(f: Callable[P, werkzeug.Response]) -> Callable[P, werkzeug.Response]:
return webapp.auth.login_required(admin_required_impl(f))
def _get_user_list(conn, query='&'):
"""Returns List[Tuple[username, full name]] for query"""
def _get_user_list(conn: ldap.ldapobject, query: str = '&') -> List[Tuple[str, str]]:
"""
Returns List[Tuple[username, full name]] for query.
"""
all_users = []
results = conn.search_s(config.ldap_people, ldap.SCOPE_SUBTREE, f'(&(uid=*)(cn=*){ldaputils.wrap(query)})', attrlist=['uid', 'cn'])
@ -36,8 +46,10 @@ def _get_user_list(conn, query='&'):
all_users.sort(key=lambda user: user[0].lower())
return all_users
def _get_groupped_user_list(conn):
"""Returns all users (uid, full name), groupped by active groups"""
def _get_groupped_user_list(conn: ldap.ldapobject) -> List[Tuple[str, List[Tuple[str, str]]]]:
"""
Returns all users (uid, full name), groupped by active groups.
"""
groupped_users = [
(group.capitalize(), _get_user_list(conn, f'memberOf={ldaputils.group_dn(group)}'))
for group in config.ldap_active_groups
@ -53,39 +65,8 @@ def _get_groupped_user_list(conn):
return groupped_users
@bp.route('/admin/')
@admin_required
def admin_view():
return flask.render_template('admin/index.html')
@bp.route('/admin/users/')
@admin_required
def admin_users_view():
conn = context.get_connection()
groups = _get_groupped_user_list(conn)
return flask.render_template('admin/users.html', groups=groups)
def _get_profile(conn, uid):
results = conn.search_s(ldaputils.user_dn(uid), ldap.SCOPE_SUBTREE)
return ldaputils.normalized_entries(results)[0]
def _format_profile(profile):
rendered = []
dn, attrs = profile
for attr, value in attrs:
attr_sanitized = attr.lower()
attr_full_name = config.full_name.get(attr_sanitized, attr_sanitized)
attr_readable_name = config.readable_names.get(attr_full_name)
rendered.append((attr, attr_readable_name, value))
# Attributes with readable names first, then by name
rendered.sort(key=lambda profile: profile[0])
rendered.sort(key=lambda profile: profile[1] is None)
return rendered
def _get_groups_of(conn, uid):
filter = ldaputils.groups_of_user(uid)
def _get_groups_of(conn: ldap.ldapobject, dn: str) -> List[str]:
filter =f'(&(objectClass=groupOfUniqueNames)(uniqueMember={dn}))'
groups = [
attrs['cn'][0].decode()
for group_dn, attrs in
@ -94,113 +75,82 @@ def _get_groups_of(conn, uid):
return groups
def _is_user_protected(conn, uid, groups):
def _is_user_protected(conn: ldap.ldapobject, groups: List[str]) -> bool:
return any(group in config.ldap_protected_groups for group in groups)
@bp.route('/admin/users/<uid>')
@bp.route('/admin/')
@admin_required
def admin_user_view(uid):
ldaputils.validate_name(uid)
conn = context.get_connection()
def admin_view() -> werkzeug.Response:
return flask.Response(flask.render_template('admin/index.html'))
profile = _get_profile(conn, uid)
groups = _get_groups_of(conn, uid)
is_protected = _is_user_protected(conn, uid, groups)
@bp.route('/admin/users/')
@admin_required
def admin_users_view() -> werkzeug.Response:
conn = app.get_connection()
assert conn is not None
groups = _get_groupped_user_list(conn)
return flask.render_template('admin/user.html', uid=uid, profile=_format_profile(profile), groups=groups, is_protected=is_protected)
return flask.Response(flask.render_template('admin/users.html', groups=groups))
# TODO: Deduplicate this modification logic with webapp/vcard.py
@bp.route('/admin/users/<username>')
@admin_required
def admin_user_view(username: str) -> werkzeug.Response:
ldaputils.validate_name(username)
dn = ldaputils.user_dn(username)
class AddMifareIDHash(flask_wtf.FlaskForm):
value = wtforms.fields.StringField(label=config.readable_names.get('mifareidhash'))
conn = app.get_connection()
assert conn is not None
class DelForm(flask_wtf.FlaskForm):
profile = context.Profile(conn, dn)
groups = _get_groups_of(conn, dn)
is_protected = _is_user_protected(conn, groups)
return flask.Response(flask.render_template('admin/user.html', profile=profile, groups=groups, is_protected=is_protected))
class AdminMixin:
def _allowed(self, subject_dn: str) -> Optional[str]:
conn = app.get_connection()
assert conn is not None
groups = _get_groups_of(conn, subject_dn)
if _is_user_protected(conn, groups):
return "User is protected"
return None
class AdminOperationAdd(AdminMixin, vcard.OperationAdd):
pass
@bp.route('/admin/users/<uid>/add_mifareidhash')
class AdminOperationModify(AdminMixin, vcard.OperationModify):
pass
class AdminOperationDelete(AdminMixin, vcard.OperationDelete):
pass
@bp.route('/admin/users/<username>/add_mifareidhash', methods=["GET", "POST"])
@admin_required
def admin_user_view_add_mifareidhash(uid):
form = AddMifareIDHash()
return flask.render_template('admin/ops/add_mifareidhash.html', uid=uid, form=form)
def admin_user_view_add_mifareidhash(username: str) -> werkzeug.Response:
dn = ldaputils.user_dn(username)
op = AdminOperationAdd(dn, "mifareidhash")
redirect = f'/admin/users/{username}'
return op.perform(success_redirect=redirect, fatal_redirect=redirect, action=f'/admin/users/{username}/add_mifareidhash')
@bp.route('/admin/users/<uid>/del_mifareidhash')
@bp.route('/admin/users/<username>/del_mifareidhash/<uid>', methods=["GET", "POST"])
@admin_required
def admin_user_view_del_mifareidhash(uid):
form = DelForm()
value = flask.request.args.get('value')
return flask.render_template('admin/ops/del_mifareidhash.html', uid=uid, form=form, value=value)
def admin_user_view_del_mifareidhash(username: str, uid: str) -> werkzeug.Response:
dn = ldaputils.user_dn(username)
op = AdminOperationDelete(dn, uid)
redirect = f'/admin/users/{username}'
return op.perform(success_redirect=redirect, fatal_redirect=redirect, action=f'/admin/users/{username}/del_mifareidhash/{uid}')
def _modify_mifareidhash(uid, form, modify_func):
ldaputils.validate_name(uid)
conn = context.get_connection()
groups = _get_groups_of(conn, uid)
is_protected = _is_user_protected(conn, uid, groups)
redirect_url = flask.url_for('admin.admin_user_view', uid=uid)
if is_protected:
flask.flash('Cannot modify protected user', 'danger')
return flask.redirect(redirect_url)
try:
if form.validate_on_submit():
dn = ldaputils.user_dn(uid)
modify_func(conn, dn)
context.refresh_profile(dn)
flask.flash('Added mifareidhash', category='info')
return flask.redirect(redirect_url)
for field, errors in form.errors.items():
for error in errors:
flask.flash("Error in the {} field - {}".format(
getattr(form, field).label.text,
error
), 'danger')
return flask.redirect(redirect_url)
except ldap.LDAPError as e:
print('LDAP error:', e)
flask.flash(f'Could not modify profile due to LDAP error: {e}', 'danger')
return flask.redirect(redirect_url)
@bp.route('/admin/users/<uid>/add_mifareidhash', methods=['POST'])
@admin_required
def admin_user_add_mifareidhash(uid):
form = AddMifareIDHash()
def modify_func(conn, dn):
new_value = form.value.data
email.send_papertrail(
f'Adding mifareIDHash for user {uid}',
f'New mifareIDHash: {new_value}'
)
conn.modify_s(dn, [(ldap.MOD_ADD, 'mifareidhash', new_value.encode('utf-8'))])
return _modify_mifareidhash(uid, form, modify_func)
@bp.route('/admin/users/<uid>/del_mifareidhash', methods=['POST'])
@admin_required
def admin_user_del_mifareidhash(uid):
form = DelForm()
def modify_func(conn, dn):
old_value = flask.request.args.get('value')
email.send_papertrail(
f'Deleting mifareIDHash for user {uid}',
f'Deleted mifareIDHash: {old_value}'
)
conn.modify_s(dn, [(ldap.MOD_DELETE, 'mifareidhash', old_value.encode('utf-8'))])
return _modify_mifareidhash(uid, form, modify_func)
@bp.route('/admin/groups/')
@admin_required
def admin_groups_view():
conn = context.get_connection()
def admin_groups_view() -> werkzeug.Response:
conn = app.get_connection()
assert conn is not None
# no obvious way to filter out groups that are just a per-user-group
# (not super useful to look at them)
@ -216,21 +166,19 @@ def admin_groups_view():
filter_groups = filter((lambda cn: cn not in all_uids), groups)
return flask.render_template('admin/groups.html', groups=filter_groups)
def _get_group(conn, name):
results = conn.search_s(ldaputils.group_dn(name), ldap.SCOPE_SUBTREE)
return ldaputils.normalized_entries(results)[0]
return flask.Response(flask.render_template('admin/groups.html', groups=filter_groups))
@bp.route('/admin/groups/<name>')
@admin_required
def admin_group_view(name):
def admin_group_view(name: str) -> werkzeug.Response:
ldaputils.validate_name(name)
conn = context.get_connection()
dn = ldaputils.group_dn(name)
conn = app.get_connection()
assert conn is not None
group_attrs = _get_group(conn, name)
group = context.LDAPEntry(conn, dn)
members = _get_user_list(conn, f'memberOf={ldaputils.group_dn(name)}')
is_protected = name in config.ldap_protected_groups
return flask.render_template('admin/group.html', name=name, attributes=_format_profile(group_attrs), members=members, is_protected=is_protected)
return flask.Response(flask.render_template('admin/group.html', group=group, members=members, is_protected=is_protected))

View file

@ -2,15 +2,20 @@ import functools
import ldap
import flask
import urllib
import werkzeug
from webapp import app, context, config, ldaputils
from webapp import app, avatar, config, ldaputils
from typing import TypeVar, Callable, ParamSpec, Dict, Any, Optional
bp = flask.Blueprint('auth', __name__)
def login_required(f):
P = ParamSpec('P')
def login_required(f: Callable[P, werkzeug.Response]) -> Callable[P, werkzeug.Response]:
@functools.wraps(f)
def func(*a, **kw):
conn = context.get_connection()
def func(*a: P.args, **kw: P.kwargs) -> werkzeug.Response:
conn = app.get_connection()
if not conn:
flask.session.clear()
flask.flash('You must log in to continue', category='warning')
@ -18,14 +23,14 @@ def login_required(f):
return f(*a, **kw)
return func
def req_to_ctx():
def req_to_ctx() -> Dict[str, Any]:
return dict(flask.request.form.items())
@bp.route('/login', methods=["GET"])
def login_form():
return flask.render_template('login.html', **req_to_ctx())
def login_form() -> werkzeug.Response:
return flask.Response(flask.render_template('login.html', **req_to_ctx()))
def _connect_to_ldap(dn, password):
def _connect_to_ldap(dn: str, password: str) -> Optional[ldap.ldapobject]:
try:
return app.connections.bind(dn, password)
except ldap.LDAPError as error_message:
@ -33,7 +38,7 @@ def _connect_to_ldap(dn, password):
return None
@bp.route('/login', methods=["POST"])
def login_action():
def login_action() -> werkzeug.Response:
# LDAP usernames/DNs are case-insensitive, so we normalize them just in
# case,
username = flask.request.form.get("username", "").lower()
@ -56,7 +61,9 @@ def login_action():
flask.session["username"] = username
flask.session['dn'] = dn
flask.session['is_admin'] = is_admin
context.refresh_profile()
app.refresh_profile(conn)
avatar.cache.reset_user(username)
avatar.hash_cache.reset()
return flask.redirect(goto)
else:
flask.flash("Invalid credentials.", category='danger')
@ -64,7 +71,7 @@ def login_action():
@bp.route('/logout')
@login_required
def logout_action():
def logout_action() -> werkzeug.Response:
app.connections.unbind(flask.session['dn'])
flask.session.clear()
return flask.redirect('/')

View file

@ -17,14 +17,16 @@ from PIL import Image, ImageDraw
import flask
import ldap
from webapp import context, ldaputils, config
from webapp import app, ldaputils, config
from typing import List
bp = flask.Blueprint('avatar', __name__)
log = logging.getLogger('ldap-web.avatar')
# Stolen from https://stackoverflow.com/questions/43512615/reshaping-rectangular-image-to-square
def resize_image(image: Image, length: int) -> Image:
def resize_image(image: Image.Image, length: int) -> Image.Image:
"""
Resize an image to a square. Can make an image bigger to make it fit or smaller if it doesn't fit. It also crops
part of the image.
@ -51,7 +53,7 @@ def resize_image(image: Image, length: int) -> Image:
# Crop the height of the image so as to keep the center part.
resized_image = resized_image.crop(
box=(0, required_loss / 2, length, resized_image.size[1] - required_loss / 2))
box=(0, required_loss // 2, length, resized_image.size[1] - required_loss // 2))
# We now have a length*length pixels image.
return resized_image
@ -66,11 +68,12 @@ def resize_image(image: Image, length: int) -> Image:
# Crop the width of the image so as to keep 1080 pixels of the center part.
resized_image = resized_image.crop(
box=(required_loss / 2, 0, resized_image.size[0] - required_loss / 2, length))
box=(required_loss // 2, 0, resized_image.size[0] - required_loss // 2, length))
# We now have a length*length pixels image.
return resized_image
def process_upload(data: bytes) -> bytes:
img = Image.open(io.BytesIO(data))
img = resize_image(img, 256)
@ -78,10 +81,11 @@ def process_upload(data: bytes) -> bytes:
img.save(res, 'PNG')
return base64.b64encode(res.getvalue())
syrenka = Image.open("syrenka.png")
def default_avatar(uid: str) -> Image:
def default_avatar(uid: str) -> bytes:
"""
Create little generative avatar for people who don't have a custom one
configured.
@ -108,17 +112,17 @@ def default_avatar(uid: str) -> Image:
overlay = overlay.crop(box=(0, 0, w, w))
# Give it a little nudge.
overlay = overlay.rotate((rng.random() - 0.5) * 100)
overlay = overlay.rotate((rng.random() - 0.5) * 100) # type: ignore
# Colorize with full neon color.
r, g, b = [int(256*i) for i in colorsys.hls_to_rgb(n_h,n_l,n_s)]
pixels = overlay.load()
pixels = overlay.load() # type: ignore
for x in range(img.size[0]):
for y in range(img.size[1]):
alpha = pixels[x, y][3]
pixels[x, y] = (r, g, b, alpha)
img.alpha_composite(overlay)
img.alpha_composite(overlay) # type: ignore
res = io.BytesIO()
img.save(res, 'PNG')
@ -135,13 +139,13 @@ class AvatarCacheEntry:
# Cached converted bytes
_converted: bytes
def __init__(self, uid: str, data: bytes):
def __init__(self, uid: str, data: bytes) -> None:
self.uid = uid
self.deadline = time.time() + config.avatar_cache_timeout
self.data = data
self._converted = b""
def serve(self):
def serve(self) -> flask.Response:
"""
Serve sanitized image. Always re-encode to PNG 256x256.
"""
@ -166,17 +170,17 @@ class AvatarCache:
# keyed by uid
entries: dict[str, AvatarCacheEntry]
def __init__(self):
def __init__(self) -> None:
self.entries = {}
def reset(self):
def reset(self) -> None:
self.entries = {}
def reset_user(self, uid: str):
def reset_user(self, uid: str) -> None:
if uid in self.entries:
del self.entries[uid]
def get(self, uid: str, bust: bool = False) -> AvatarCacheEntry:
def get(self, uid: str, bust: bool = False) -> flask.Response:
"""
Get avatar, either from cache or from LDAP on cache miss. If 'bust' is
set to True, the cache will not be consulted and the newest result from
@ -193,7 +197,9 @@ class AvatarCache:
del self.entries[uid]
# Otherwise, retrieve from LDAP.
conn = context.get_admin_connection()
conn = app.get_connection()
if conn is None:
conn = app.get_admin_connection()
try:
dn = ldaputils.user_dn(uid)
res = conn.search_s(dn, ldap.SCOPE_SUBTREE)
@ -235,14 +241,16 @@ class AvatarCache:
cache = AvatarCache()
def hash_for_uid(uid):
def hash_for_uid(uid: str) -> str:
# NOTE: Gravatar documentation says to use SHA256, but everyone passes MD5 instead
email = f'{uid}@hackerspace.pl'.strip().lower()
hasher = hashlib.md5()
hasher.update(email.encode())
return hasher.hexdigest()
def get_all_user_uids(conn):
def get_all_user_uids(conn: ldap.ldapobject) -> List[str]:
all_uids = []
results = conn.search_s(config.ldap_people, ldap.SCOPE_SUBTREE, 'uid=*', attrlist=['uid'])
@ -252,6 +260,7 @@ def get_all_user_uids(conn):
return all_uids
class HashCache:
# email hash -> uid mapping
entries: dict[str, str] = {}
@ -262,25 +271,27 @@ class HashCache:
self.rebuild_if_needed()
return self.entries.get(email_hash, 'default')
def reset(self):
def reset(self) -> None:
self.entries = {}
self.deadline = 0
def rebuild_if_needed(self):
def rebuild_if_needed(self) -> None:
now = time.time()
if now > self.deadline:
self.rebuild()
def rebuild(self):
def rebuild(self) -> None:
log.info("Rebuilding email hash cache")
conn = context.get_admin_connection()
conn = app.get_admin_connection()
users = get_all_user_uids(conn)
self.deadline = time.time() + config.avatar_cache_timeout
self.entries = { hash_for_uid(uid): uid for uid in users }
hash_cache = HashCache()
def sanitize_email_hash(hash: str):
def sanitize_email_hash(hash: str) -> str:
"""
lowercases, removes file extension (probably)
"""
@ -289,14 +300,16 @@ def sanitize_email_hash(hash: str):
hash = hash[:-4]
return hash
@bp.route('/avatar/<email_hash>', methods=['GET'])
def gravatar_serve(email_hash):
def gravatar_serve(email_hash: str) -> flask.Response:
"""
Serves avatar in a Gravatar-compatible(ish) way, i.e. by email hash, not user name.
"""
uid = hash_cache.get(sanitize_email_hash(email_hash))
return cache.get(uid)
@bp.route('/avatar/user/<uid>', methods=['GET'])
def avatar_serve(uid):
def avatar_serve(uid: str) -> flask.Response:
return cache.get(uid)

View file

@ -3,50 +3,53 @@ import wtforms
import secrets
import os
hackerspace_name = 'Warsaw Hackerspace'
secret_key = secrets.token_hex(32)
from typing import Dict, Set, List, Tuple, Any, TypeVar
hackerspace_name: str = 'Warsaw Hackerspace'
secret_key: str = secrets.token_hex(32)
# Kerberos configuration
kadmin_principal_map = "{}@HACKERSPACE.PL"
kadmin_principal_map: str = "{}@HACKERSPACE.PL"
# LDAP configuration
ldap_url = 'ldap://ldap.hackerspace.pl'
ldap_base = 'dc=hackerspace,dc=pl'
ldap_people = 'ou=people,dc=hackerspace,dc=pl'
ldap_user_dn_format = 'uid={},ou=people,dc=hackerspace,dc=pl'
ldap_group_dn_format = 'cn={},ou=group,dc=hackerspace,dc=pl'
ldap_url: str = 'ldap://ldap.hackerspace.pl'
ldap_base: str = 'dc=hackerspace,dc=pl'
ldap_people: str = 'ou=people,dc=hackerspace,dc=pl'
ldap_user_dn_format: str = 'uid={},ou=people,dc=hackerspace,dc=pl'
ldap_group_dn_format: str = 'cn={},ou=group,dc=hackerspace,dc=pl'
# LDAP user groups allowed to see /admin
ldap_admin_groups = os.getenv('LDAPWEB_ADMIN_GROUPS', 'ldap-admin,staff,zarzad').split(',')
ldap_admin_groups: List[str] = os.getenv('LDAPWEB_ADMIN_GROUPS', 'ldap-admin,staff,zarzad').split(',')
# LDAP user groups indicating that a user is active
ldap_active_groups = os.getenv('LDAPWEB_ACTIVE_GROUPS', 'fatty,starving,potato').split(',')
ldap_active_groups: List[str] = os.getenv('LDAPWEB_ACTIVE_GROUPS', 'fatty,starving,potato').split(',')
# LDAP service user with admin privileges (for admin listings, creating new users)
ldap_admin_dn = os.getenv('LDAPWEB_ADMIN_DN', 'cn=ldapweb,ou=services,dc=hackerspace,dc=pl')
ldap_admin_password = os.getenv('LDAPWEB_ADMIN_PASSWORD', 'unused')
ldap_admin_dn: str = os.getenv('LDAPWEB_ADMIN_DN', 'cn=ldapweb,ou=services,dc=hackerspace,dc=pl')
ldap_admin_password: str = os.getenv('LDAPWEB_ADMIN_PASSWORD', 'unused')
# Protected LDAP user groups
# These groups (and their members) cannot be modified by admin UI
ldap_protected_groups = (
ldap_protected_groups: List[str] = (
'staff,zarzad,ldap-admin'.split(',') +
os.getenv('LDAPWEB_PROTECTED_GROUPS', '').split(',')
)
# Email notification (paper trail) configuration
smtp_server = 'mail.hackerspace.pl'
smtp_format = '{}@hackerspace.pl'
smtp_user = os.getenv('LDAPWEB_SMTP_USER', 'ldapweb')
smtp_password = os.getenv('LDAPWEB_SMTP_PASSWORD', 'unused')
smtp_server: str = 'mail.hackerspace.pl'
smtp_format: str = '{}@hackerspace.pl'
smtp_user: str = os.getenv('LDAPWEB_SMTP_USER', 'ldapweb')
smtp_password: str = os.getenv('LDAPWEB_SMTP_PASSWORD', 'unused')
papertrail_recipients = os.getenv('LDAPWEB_PAPERTRAIL_RECIPIENTS', 'zarzad@hackerspace.pl')
papertrail_recipients: str = os.getenv('LDAPWEB_PAPERTRAIL_RECIPIENTS', 'zarzad@hackerspace.pl')
# Avatar server
avatar_cache_timeout = int(os.getenv('LDAPWEB_AVATAR_CACHE_TIMEOUT', '1800'))
avatar_cache_timeout: int = int(os.getenv('LDAPWEB_AVATAR_CACHE_TIMEOUT', '1800'))
# LDAP attribute configuration
readable_names = {
readable_names: Dict[str, str] = {
'jpegphoto': 'Avatar',
'commonname': 'Common Name',
'givenname': 'Given Name',
@ -61,7 +64,7 @@ readable_names = {
'mailroutingaddress': 'Email Adress (external)',
}
full_name = {
full_name: Dict[str, str] = {
'cn': 'commonname',
'gecos': 'gecos',
'sn': 'surname',
@ -69,25 +72,26 @@ full_name = {
'l': 'locality',
}
can_add = set([
can_add: Set[str] = {
'jpegphoto',
'telephonenumber',
'mobiletelephonenumber',
'sshpublickey',
])
can_delete = can_add
can_modify = can_add | set([
}
can_delete: Set[str] = can_add
can_modify: Set[str] = can_add | {
'jpegphoto',
'givenname',
'surname',
'commonname',
'gecos',
])
can = { 'add': can_add, 'mod': can_modify, 'del': can_delete }
admin_required = set()
}
can: Dict[str, Set[str]] = { 'add': can_add, 'mod': can_modify, 'del': can_delete, 'admin': {'mifareidhash'} }
default_field = (wtforms.fields.StringField, {})
fields = {
FormField = Tuple[type[wtforms.Field], Dict[str, Any]]
default_field: FormField = (wtforms.fields.StringField, {})
fields: Dict[str, FormField] = {
'jpegphoto': (wtforms.fields.FileField, {'validators': []}),
'mobiletelephonenumber': (wtforms.fields.StringField, {'validators': [wtforms.validators.Regexp(r'[+0-9 ]+')]}),
'telephonenumber': (wtforms.fields.StringField, {'validators': [wtforms.validators.Regexp(r'[+0-9 ]+')]}),

View file

@ -1,3 +1,4 @@
from dataclasses import dataclass
import random
import string
import hashlib
@ -5,53 +6,84 @@ import hashlib
import flask
import ldap
from webapp import app, config, validation, avatar
from webapp import config, validation
class Attr(object):
def __init__(self, name, value):
from typing import Optional, Dict, List
class Attr:
"""
A concrete attribute (with value) on a Profile.
"""
name: str
readable_name: Optional[str]
value: bytes
# Hash of name + value, used to uniquely identify an attribute across HTTP
# calls.
uid: str
def __init__(self, name: str, value: bytes) -> None:
name = validation.sanitize_ldap(name)
self.name = name
self.readable_name = config.readable_names.get(name, name)
self.readable_name = config.readable_names.get(name)
self.value = value
self.uid = hashlib.sha1(name.encode('utf-8') + value).hexdigest()
def __str__(self):
def __str__(self) -> str:
return self.value.decode('utf-8')
def get_dn():
return flask.session.get('dn')
def get_connection(dn = None):
dn = dn or get_dn()
return app.connections[dn]
@dataclass
class LDAPEntry:
"""
An LDAP entry, eg. a user profile or a group.
"""
# Map from uid/hash to attr.
fields: Dict[str, Attr]
# DN of this entry
dn: str
def get_admin_connection():
conn = app.connections[config.ldap_admin_dn]
if not conn:
conn = app.connections.bind(config.ldap_admin_dn, config.ldap_admin_password)
return conn
def __init__(self, conn: ldap.ldapobject, dn: str):
res = conn.search_s(dn, ldap.SCOPE_SUBTREE)
assert(len(res) == 1)
self.dn = dn
self.fields = {}
for attr, vs in res[0][1].items():
for v in vs:
a = Attr(attr, v)
self.fields[a.uid] = a
def get_profile():
return app.profiles[get_dn()]
def refresh_profile(dn=None):
dn = dn or get_dn()
conn = get_connection(dn)
if not conn:
return # no session, nothing to refresh i guess
res = conn.search_s(dn, ldap.SCOPE_SUBTREE)
assert(len(res) == 1)
profile = {}
for attr, vs in res[0][1].items():
for v in vs:
a = Attr(attr, v)
profile[a.uid] = a
if attr == 'uid':
user_uid = v.decode('utf-8')
app.profiles[dn] = profile
def get_attr(self, attr: str) -> Optional[Attr]:
for v in self.fields.values():
if v.name == attr:
return v
return None
# bust avatar cache
if user_uid:
avatar.cache.reset_user(user_uid)
avatar.hash_cache.reset()
@property
def fields_sorted(self) -> List[Attr]:
fields = list(self.fields.values())
fields.sort(key=lambda attr: attr.name)
fields.sort(key=lambda attr: attr.readable_name is None)
return fields
return profile
@property
def name(self) -> str:
res = self.get_attr('commonname')
assert res is not None
return res.value.decode()
class Profile(LDAPEntry):
"""
A user profile.
"""
# Map from uid/hash to attr.
fields: Dict[str, Attr]
# DN of this profile
dn: str
@property
def username(self) -> str:
res = self.get_attr('uid')
assert res is not None
return res.value.decode()

View file

@ -2,32 +2,33 @@ import smtplib
from email.message import EmailMessage
import flask
import datetime
from typing import Optional
from webapp import config, context
cached_connection = None
cached_connection: Optional[smtplib.SMTP] = None
def test_connection_open(conn):
def test_connection_open(conn: smtplib.SMTP) -> bool:
try:
status = conn.noop()[0]
except:
status = -1
return True if status == 250 else False
def create_connection():
def create_connection() -> smtplib.SMTP:
conn = smtplib.SMTP_SSL(config.smtp_server)
conn.login(config.smtp_user, config.smtp_password)
return conn
def get_connection():
def get_connection() -> smtplib.SMTP:
global cached_connection
if test_connection_open(cached_connection):
if cached_connection is not None and test_connection_open(cached_connection):
return cached_connection
cached_connection = create_connection()
return cached_connection
def send_email(conn, subject, body, recipient_emails):
def send_email(conn: smtplib.SMTP, subject: str, body: str, recipient_emails: str) -> None:
msg = EmailMessage()
msg.set_content(body)
msg['Subject'] = subject
@ -38,7 +39,7 @@ def send_email(conn, subject, body, recipient_emails):
conn.send_message(msg)
def send_papertrail(title, description):
def send_papertrail(title: str, description: str) -> None:
username = flask.session.get('username')
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")

View file

@ -1,66 +1,49 @@
import re
import ldap
from webapp import config
def is_valid_name(name):
from typing import List, Tuple, Any, Dict
def is_valid_name(name: str) -> bool:
"""`true` if `name` is a safe ldap uid/cn"""
return re.match(r'^[a-zA-Z_][a-zA-Z0-9-_\.]*\Z', name) is not None
def validate_name(name):
def validate_name(name: str) -> None:
"""Raises `RuntimeError` if `name` is not a safe ldap uid/cn"""
if not is_valid_name(name):
raise RuntimeError('Invalid name')
def user_dn(uid):
def user_dn(uid: str) -> str:
validate_name(uid)
return config.ldap_user_dn_format.format(uid)
def group_dn(cn):
def group_dn(cn: str) -> str:
validate_name(cn)
return config.ldap_group_dn_format.format(cn)
def wrap(filter):
def wrap(filter: str) -> str:
if len(filter) and filter[0] == '(' and filter[-1] == ')':
return filter
else:
return f'({filter})'
def _or(*filters):
def _or(*filters: str) -> str:
wrapped = ''.join(wrap(f) for f in filters)
return f'(|{wrapped})'
def _and(*filters):
def _and(*filters: str) -> str:
wrapped = ''.join(wrap(f) for f in filters)
return f'(&{wrapped})'
def _not(filter):
def _not(filter: str) -> str:
wrapped = wrap(filter)
return f'(!{wrapped})'
def member_of_any(groups):
def member_of_any(groups: List[str]) -> str:
"""Returns a filter that matches users that are a member of any of the given group names"""
return _or(*(f'memberOf={group_dn(group)}' for group in groups))
def groups_of_user(uid):
def groups_of_user(uid: str) -> str:
"""Returns a filter that matches groups that have the given user as a member"""
return f'(&(objectClass=groupOfUniqueNames)(uniqueMember={user_dn(uid)}))'
def normalized_entries(entries):
"""
Converts ldap entries from python-ldap format into a more convenient
List[Tuple[
dn,
List[tuple[attr_name, attr_value]]
]]
"""
normalized = []
for dn, attrs in entries:
normalized_attrs = []
for attr, values in attrs.items():
for value in values:
normalized_attrs.append((attr, value.decode()))
normalized.append((dn, normalized_attrs))
return normalized

View file

@ -4,67 +4,89 @@ import sys
import threading
import time
from dataclasses import dataclass
from typing import Dict, Generic, TypeVar, List, Callable, Optional
log = logging.getLogger('ldap-web.lru')
def locked(f):
@functools.wraps(f)
def func(self, *a, **kw):
self.lock.acquire()
try:
return f(self, *a, **kw)
finally:
self.lock.release()
return func
K = TypeVar('K')
V = TypeVar('V')
Cb = Callable[[K], None]
class LRUPool(threading.Thread):
@dataclass
class _Entry(Generic[V]):
c: V
atime: float
class LRUPool(threading.Thread, Generic[K, V]):
"""A key-value pool to store objects with a timeout.
Consumers of objects can register callbacks which will be called on
object expiry. Expiry is least-recently-used - any access reset the
time counter."""
def __init__(self, timeout=60.0, **kw):
threading.Thread.__init__(self, **kw)
def __init__(self, timeout: float = 60.0) -> None:
super().__init__()
self.setDaemon(True)
self.pool = {}
self.pool: Dict[K, _Entry[V]] = {}
self.timeout = timeout
self.lock = threading.Lock()
self.callbacks = {}
def run(self):
self.callbacks: Dict[str, List[Cb[K]]] = {}
def run(self) -> None:
log.info("Pool {} starting.", self)
while True:
time.sleep(self.timeout / 2)
self.lock.acquire()
now = time.time()
drop = set()
for k, [c, atime] in self.pool.items():
if now - atime > self.timeout:
for k, entry in self.pool.items():
if now - entry.atime > self.timeout:
log.info("Pool {} dropping {}.", self, k)
drop.add(k)
for k in list(drop):
self._drop(k)
self.lock.release()
def register_callback(self, action, cb):
def register_callback(self, action: str, cb: Cb[K]) -> None:
self.callbacks.setdefault(action, []).append(cb)
@locked
def __getitem__(self, key):
item = self.pool.get(key)
if not item:
return
item[1] = time.time()
return item[0]
def _insert(self, key, item):
self.pool[key] = [item, time.time()]
def __getitem__(self, key: K) -> Optional[V]:
self.lock.acquire()
try:
item = self.pool.get(key)
if not item:
return None
item.atime = time.time()
return item.c
finally:
self.lock.release()
def _insert(self, key: K, item: V) -> V:
self.pool[key] = _Entry(item, time.time())
return item
@locked
def insert(self, key, item):
return self._insert(key, item)
def _drop(self, key):
for f in self.callbacks.get('drop',[]):
def insert(self, key: K, item: V) -> V:
self.lock.acquire()
try:
return self._insert(key, item)
finally:
self.lock.release()
def _drop(self, key: K) -> Optional[V]:
for f in self.callbacks.get('drop', []):
f(key)
return self.pool.pop(key, None)
@locked
def drop(self, key):
return self._drop(key)
res = self.pool.pop(key, None)
if res is None:
return None
return res.c
def drop(self, key: K) -> Optional[V]:
self.lock.acquire()
try:
return self._drop(key)
finally:
self.lock.release()

View file

@ -1,7 +1,9 @@
import ldap
import logging
import kerberos
import flask
import flask_wtf
import werkzeug
from webapp import app, context, config
from webapp.auth import login_required
@ -10,10 +12,11 @@ bp = flask.Blueprint('passwd', __name__)
@bp.route('/passwd', methods=["GET"])
@login_required
def passwd_form():
return flask.render_template('passwd.html')
def passwd_form() -> werkzeug.Response:
return flask.Response(flask.render_template('passwd.html'))
def _passwd_kadmin(current, new):
def _passwd_kadmin(current: str, new: str) -> bool:
username = flask.session.get('username')
try:
principal_name = config.kadmin_principal_map.format(username)
@ -23,16 +26,17 @@ def _passwd_kadmin(current, new):
logging.exception('kpasswd failed')
return False
@bp.route('/passwd', methods=["POST"])
@login_required
def passwd_action():
def passwd_action() -> werkzeug.Response:
current, new, confirm = (flask.request.form[n] for n in ('current', 'new', 'confirm'))
if new != confirm:
flask.flash("New passwords don't match", category='danger')
return flask.render_template('passwd.html')
return flask.Response(flask.render_template('passwd.html'))
if _passwd_kadmin(current, new):
flask.flash('Password changed', category='info')
else:
flask.flash('Wrong password', category='danger')
return flask.render_template('passwd.html')
return flask.Response(flask.render_template('passwd.html'))

View file

@ -2,20 +2,28 @@ import ldap
from webapp import lru, config
class LDAPConnectionPool(lru.LRUPool):
def __init__(self, url, use_tls=True, **kw):
lru.LRUPool.__init__(self, **kw)
from typing import Optional, Any
Pool = lru.LRUPool[str, ldap.ldapobject]
class LDAPConnectionPool(Pool):
def __init__(self, url: str, use_tls: bool = True, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.use_tls = use_tls
self.url = url
@lru.locked
def bind(self, dn, password):
conn = ldap.initialize(self.url)
if(self.use_tls):
conn.start_tls_s()
conn.simple_bind_s(dn, password)
return self._insert(dn, conn)
def unbind(self, dn):
def bind(self, dn: str, password: str) -> ldap.ldapobject:
self.lock.acquire()
try:
conn = ldap.initialize(self.url)
if(self.use_tls):
conn.start_tls_s()
conn.simple_bind_s(dn, password)
return self._insert(dn, conn)
finally:
self.lock.release()
def unbind(self, dn: str) -> Optional[ldap.ldapobject]:
return self.drop(dn)

View file

@ -1,6 +1,6 @@
{% extends 'basic.html' %}
{% block content %}
<h1>Group: {{ name }}</h1>
<h1>Group: {{ group.name }}</h1>
<div style="margin-bottom: 10px">
<a class="btn btn-default" href="/admin/groups" role="button">Back</a>
@ -26,11 +26,11 @@
<th scope="col">Attribute</th>
<th scope="col" class="profile-table-value">Value</th>
</tr>
{% for attr, attr_readable, value in attributes %}
{% for attr in group.fields_sorted %}
<tr>
<td>{{ attr }}</td>
<td>{{ attr_readable if attr_readable else '' }}</td>
<td class="profile-table-value">{{ value }}</td>
<td>{{ attr.name }}</td>
<td>{{ attr.readable_name if attr.readable_name else '' }}</td>
<td class="profile-table-value">{{ attr.value.decode() }}</td>
</tr>
{% endfor %}
</table>

View file

@ -1,14 +1,14 @@
{% extends 'basic.html' %}
{% block content %}
<img src="/avatar/user/{{ uid }}" class="profile-avatar-lmao" />
<img src="/avatar/user/{{ profile.username }}" class="profile-avatar-lmao" />
<h1>User: {{ uid }}</h1>
<h1>User: {{ profile.username }}</h1>
<div style="margin-bottom: 10px">
<a class="btn btn-default" href="/admin/users" role="button">Back</a>
<a class="btn btn-default" href="https://kasownik.hackerspace.pl/admin/member/{{ uid }}" role="button" target="_blank">View user in Kasownik</a>
<a class="btn btn-default" href="https://kasownik.hackerspace.pl/admin/member/{{ profile.username }}" role="button" target="_blank">View user in Kasownik</a>
{% if not is_protected %}
<a class="btn btn-default modalLink" href="/admin/users/{{ uid }}/add_mifareidhash" role="button">Add mifareIDHash</a>
<a class="btn btn-default modalLink" href="/admin/users/{{ profile.username }}/add_mifareidhash" role="button">Add mifareIDHash</a>
{% endif %}
</div>
@ -33,14 +33,14 @@
<th scope="col" class="profile-table-value">Value</th>
<th scope="col" class="profile-table-options">Options</th>
</tr>
{% for attr, attr_readable, value in profile %}
{% for attr in profile.fields_sorted %}
<tr>
<td>{{ attr }}</td>
<td>{{ attr_readable if attr_readable else '' }}</td>
<td class="profile-table-value">{{ '(...omitted...)' if attr == 'jpegPhoto' else value }}</td>
<td>{{ attr.name }}</td>
<td>{{ attr.readable_name if attr.readable_name else '' }}</td>
<td class="profile-table-value">{{ '(...omitted...)' if attr.name == 'jpegphoto' else attr.value.decode() }}</td>
<td class="profile-table-options">
{% if not is_protected and attr == 'mifareIDHash' %}
<a class="modalLink" href="/admin/users/{{ uid }}/del_mifareidhash?value={{ value | urlencode }}"><span class="glyphicon glyphicon-minus-sign" aria-hidden="true"></span> Remove</a>
{% if not is_protected and attr.name == 'mifareidhash' %}
<a class="modalLink" href="/admin/users/{{ profile.username }}/del_mifareidhash/{{ attr.uid }}"><span class="glyphicon glyphicon-minus-sign" aria-hidden="true"></span> Remove</a>
{% endif %}
</td>
</tr>

View file

@ -1,5 +1,5 @@
<div class="modal fade" tabindex="-1" role="dialog">
<form action="/vcard/add/{{ attr_name }}" method="POST" class="form-signin"
<form action="{{ action }}" method="POST" class="form-signin"
{% if attr_name == 'jpegphoto' %}
enctype="multipart/form-data"
{% endif %}

View file

@ -1,5 +1,5 @@
<div class="modal fade" tabindex="-1" role="dialog">
<form action="/vcard/delete/{{ uid }}" method="POST" class="form-signin">
<form action="{{ action }}" method="POST" class="form-signin">
<div class="modal-dialog" role="document">
<div class="modal-content">
<div class="modal-header">

View file

@ -1,5 +1,5 @@
<div class="modal fade" tabindex="-1" role="dialog">
<form action="/vcard/modify/{{ uid }}" method="POST" class="form-signin">
<form action="{{ action }}" method="POST" class="form-signin">
<div class="modal-dialog" role="document">
<div class="modal-content">
<div class="modal-header">

View file

@ -4,26 +4,13 @@ import flask
from webapp import config
def validate(predicate, arg, error='Error!', redirect='/'):
def decorator(f):
@functools.wraps(f)
def func(**kw):
v = kw.get(arg)
if predicate(v):
return f(**kw)
else:
flask.flash(error, category='error')
return flask.redirect(redirect)
return func
return decorator
def sanitize_perms():
def sanitize_perms() -> None:
config.can = { k: set(map(sanitize_ldap, v)) for k,v in config.can.items() }
def sanitize_readable():
def sanitize_readable() -> None:
config.readable_names = { sanitize_ldap(k): v for k, v in config.readable_names.items() }
def sanitize_ldap(k):
def sanitize_ldap(k: str) -> str:
k = k.lower()
return (k in config.full_name and config.full_name[k]) or k

View file

@ -1,117 +1,280 @@
from dataclasses import dataclass
from enum import Enum
from functools import reduce
from typing import Optional, cast, Dict, List, TypedDict, Literal, Union, TypeVar, Generic
import ldap
import flask
import flask_wtf
import wtforms
import werkzeug
from webapp import app, context, config, validation, avatar
from webapp.auth import login_required
bp = flask.Blueprint('vcard', __name__)
perm_errors = {
'add': 'You cannot add this attribute!',
'mod': 'You cannot change this attribute!',
'del': 'You cannot delete this attribute!',
}
templates = {
'add': 'ops/add.html',
'mod': 'ops/mod.html',
'del': 'ops/del.html',
}
# NOTE: this code is quite hairy. Simplifying it further is very possible, but
# would require actually designing this from first principles, instead of
# continuing to hack on a _very_ old codebase.
#
# This started out as a very Lispy codebase with stringly-typed operation
# kinds. It has now be rewritten to be extremely class/type-heavy. There's
# probably a better compromise to be made, but I've been writing too much Rust
# recently. ~q3k
def attr_op(op, attrName, uid = None, success_redirect='/vcard',
fatal_redirect='/vcard'):
try:
attr, old_value = None, None
if uid:
attr = context.get_profile()[uid]
attrName = attr.name
old_value = str(attr)
form = DelForm() if op == 'del' else app.forms[attrName](value=old_value)
form.attr_data = attr
if attrName not in config.can[op]:
flask.flash(perm_errors[op], 'danger')
return flask.redirect(fatal_redirect)
if form.validate_on_submit():
if op in ['add', 'mod']:
new_value = form.value.data
admin = attrName in config.admin_required
conn = context.get_admin_connection() if admin else context.get_connection()
dn = context.get_dn()
# Most fields should be modified by remove/add.
if op == 'mod' and attrName not in ['commonname']:
op = 'modreadd'
if attrName == 'jpegphoto':
# Temporary workaround: deleting jpegPhoto doesn't work, set to empty instead
if op == 'del':
op = 'mod'
new_value = ''
else:
try:
op = 'mod'
processed_avatar = avatar.process_upload(new_value.read())
new_value = processed_avatar
print(f'Uplading avatar (size: {len(processed_avatar)}) for {dn}')
except Exception as e:
flask.flash('Could not process avatar: {}'.format(e), 'danger')
return flask.redirect(fatal_redirect)
def to_ldap(s):
if isinstance(s, bytes):
return s
return s.encode('utf-8')
if op in ['del', 'modreadd']:
conn.modify_s(dn, [(ldap.MOD_DELETE, attrName, to_ldap(old_value))])
if op in ['add', 'modreadd']:
conn.modify_s(dn, [(ldap.MOD_ADD, attrName, to_ldap(new_value))])
if op in ['mod']:
conn.modify_s(dn, [(ldap.MOD_REPLACE, attrName, to_ldap(new_value))])
context.refresh_profile()
return flask.redirect(success_redirect)
if form.is_submitted():
for field, errors in form.errors.items():
for error in errors:
flask.flash("Error in the {} field - {}".format(
getattr(form, field).label.text,
error
), 'danger')
return flask.redirect(success_redirect)
status = 400 if form.is_submitted() else 200
return flask.make_response(flask.render_template(templates[op], fatal_redirect=fatal_redirect,
attr_op=op, form=form, attr_name=attrName, uid=uid), status)
except ldap.LDAPError as e:
print('LDAP error:', e)
flask.flash('Could not modify profile', 'danger')
return flask.redirect(fatal_redirect)
class DelForm(flask_wtf.FlaskForm):
pass
"""
Form used to delete an attribute from a profile.
Used as is.
"""
attr_data: context.Attr
class AddModifyForm(flask_wtf.FlaskForm):
"""
Form used to add or modify an attribute in a profile.
Base class subclassed by dynamically generated classes in initialize_forms.
"""
attr_data: Optional[context.Attr]
value: wtforms.Field
def initialize_forms() -> Dict[str, type[AddModifyForm]]:
"""
Create AddModifyForm subclasses keyed by attribute name, based on config.
"""
forms: Dict[str, type[AddModifyForm]] = {}
for f in reduce(lambda a,b: a | b, config.can.values()):
cls, attrs = config.fields.get(f, config.default_field)
class AddForm(AddModifyForm):
value = cls(label=config.readable_names.get(f), **attrs)
AddForm.__name__ == 'Add' + f
forms[f] = AddForm
return forms
add_modify_forms = initialize_forms()
F = TypeVar('F', bound=flask_wtf.FlaskForm)
class Operation(Generic[F]):
"""
Base class for all LDAP operations.
Subclassed by operation class tree: first by whether the operation referes
to a plain attribute name, or to an already existing profile attribute by
uid.
Then further subclassed by operation kind: add, delete or modify.
"""
kind: str
perm_error: str
dn: str
attr_name: str
@property
def _template_path(self) -> str:
return f'ops/{self.kind}.html'
def _make_form(self) -> F:
"""
Return a FlaskForm to be displayed to the user.
"""
raise NotImplementedError
def _render_form(self, form: F, action: str) -> werkzeug.Response:
"""
Render a FlaskForm with the data specific to the form type.
"""
raise NotImplementedError
def _on_form_submit(self, dn: str, conn: ldap.ldapobject, form: F) -> None:
"""
Act on a submitted and validated form, performing actual LDAP
mutations.
"""
raise NotImplementedError
def _allowed(self, subject_dn: str) -> Optional[str]:
if self.attr_name not in config.can[self.kind]:
return self.perm_error
return None
def perform(self, success_redirect: str = '/vcard', fatal_redirect: str = '/vcard', action: str = '/vcard') -> werkzeug.Response:
"""
Primary entrypoint to operation. To be called from a view.
"""
conn = app.get_connection()
assert conn is not None
# Check permissions per config.
perm_err = self._allowed(self.dn)
if perm_err is not None:
flask.flash(perm_err, 'danger')
return flask.redirect(fatal_redirect)
form = self._make_form()
if form.validate_on_submit():
# If form data is valid, perform LDAP mutation.
try:
self._on_form_submit(self.dn, conn, form)
except ldap.LDAPError as e:
print('LDAP error:', e)
flask.flash('Could not modify profile', 'danger')
return flask.redirect(fatal_redirect)
profile = app.refresh_profile(conn)
assert profile is not None
avatar.cache.reset_user(profile.username)
avatar.hash_cache.reset()
return flask.redirect(success_redirect)
else:
# Otherwise, collect validation errors (if appropriate) and render
# the form.
if form.is_submitted():
for field, errors in form.errors.items():
assert field is not None
for error in errors:
flask.flash("Error in the {} field - {}".format(
getattr(form, field).label.text,
error
), 'danger')
return self._render_form(form, action)
class OperationWithAttrName(Operation[F]):
"""
Operations acting on an attribute name, without a corresponding existing
attribute. This is used when adding profile fields/attributes.
"""
def __init__(self, dn: str, attr_name: str) -> None:
self.dn = dn
self.attr_name = attr_name
def _render_form(self, form: F, action: str) -> werkzeug.Response:
return flask.Response(flask.render_template(self._template_path, form=form, attr_name=self.attr_name, action=action))
class OperationWithUid(Operation[F]):
"""
Operations acting on an existing attribute name and value, ie. an Attr with
'uid'. This is used when modifying existing fields/attributes or removing
them.
"""
attr: context.Attr
def __init__(self, dn: str, uid: str):
self.dn = dn
profile = app.get_profile(dn)
assert profile is not None
assert uid in profile.fields
self.attr = profile.fields[uid]
self.attr_name = self.attr.name
def _render_form(self, form: F, action: str) -> werkzeug.Response:
return flask.Response(flask.render_template(self._template_path, form=form, attr_name=self.attr.name, uid=self.attr.uid, action=action))
class OperationAdd(OperationWithAttrName[AddModifyForm]):
kind = 'add'
perm_error = 'You cannot add this attribute!'
def _make_form(self) -> AddModifyForm:
form = add_modify_forms[self.attr_name]()
return form
def _on_form_submit(self, dn: str, conn: ldap.ldapobject, form: AddModifyForm) -> None:
# Special case for jpegphoto
value = form.value.data
if self.attr_name == 'jpegphoto':
value = avatar.process_upload(form.value.data.read())
print(f'Uploading avatar (size: {len(value)}) for {dn}')
# jpegPhoto should always be REPLACED.
conn.modify_s(dn, [(ldap.MOD_REPLACE, self.attr_name, value)])
return
assert value is not None
conn.modify_s(dn, [(ldap.MOD_ADD, self.attr_name, value.encode())])
class OperationModify(OperationWithUid[AddModifyForm]):
kind = 'mod'
perm_error = 'You cannot modify this attribute!'
def _make_form(self) -> AddModifyForm:
form = add_modify_forms[self.attr_name](value=str(self.attr))
return form
def _on_form_submit(self, dn: str, conn: ldap.ldapobject, form: AddModifyForm) -> None:
# Special case for jpegphoto
value = form.value.data
if self.attr_name == 'jpegphoto':
value = avatar.process_upload(form.value.data.read())
print(f'Uploading avatar (size: {len(value)}) for {dn}')
assert value is not None
if self.attr_name in ['commonname']:
# Modify directly
conn.modify_s(dn, [(ldap.MOD_REPLACE, self.attr_name, value.encode())])
else:
# Remove and add again.
conn.modify_s(dn, [(ldap.MOD_DELETE, self.attr_name, self.attr.value)])
conn.modify_s(dn, [(ldap.MOD_ADD, self.attr_name, value.encode())])
class OperationDelete(OperationWithUid[DelForm]):
kind = 'del'
perm_error = 'You cannot delete this attribute!'
def _make_form(self) -> DelForm:
res = DelForm()
res.attr_data = self.attr
return res
def _on_form_submit(self, dn: str, conn: ldap.ldapobject, form: flask_wtf.FlaskForm) -> None:
if self.attr_name == 'jpegphoto':
# We apparently can't remove these, so just set it empty.
conn.modify_s(dn, [(ldap.MOD_REPLACE, self.attr_name, b'')])
return
conn.modify_s(dn, [(ldap.MOD_DELETE, self.attr_name, self.attr.value)])
@bp.route('/vcard', methods=['GET'])
@login_required
def vcard():
data = {}
for v in context.get_profile().values():
def vcard() -> werkzeug.Response:
data: Dict[str, List[context.Attr]] = {}
profile = app.get_profile()
assert profile is not None
for v in profile.fields.values():
data.setdefault(v.name, []).append(v)
return flask.render_template('vcard.html', can_add=config.can['add'],
can_modify=config.can['mod'], can_delete=config.can['del'], profile=data)
return flask.Response(flask.render_template('vcard.html', can_add=config.can['add'],
can_modify=config.can['mod'], can_delete=config.can['del'], profile=data))
@bp.route('/vcard/add/<attrName>', methods=['GET', 'POST'])
@bp.route('/vcard/add/<attr_name>', methods=['GET', 'POST'])
@login_required
def add_attr(attrName):
return attr_op('add', attrName)
def add_attr(attr_name: str) -> werkzeug.Response:
dn = app.get_dn()
assert dn is not None
op = OperationAdd(dn, attr_name)
return op.perform(action=f'/vcard/add/{attr_name}')
@bp.route('/vcard/delete/<uid>', methods=['GET', 'POST'])
@login_required
def del_attr(uid):
return attr_op('del', None, uid)
def del_attr(uid: str) -> werkzeug.Response:
dn = app.get_dn()
assert dn is not None
op = OperationDelete(dn, uid)
return op.perform(action=f'/vcard/delete/{uid}')
@bp.route('/vcard/modify/<uid>', methods=['GET', 'POST'])
@login_required
def mod_attr(uid):
return attr_op('mod', None, uid)
def mod_attr(uid: str) -> werkzeug.Response:
dn = app.get_dn()
assert dn is not None
op = OperationModify(dn, uid)
return op.perform(action=f'/vcard/modify/{uid}')

View file

@ -1,10 +1,11 @@
import flask
import werkzeug
from webapp import app, context, config
from webapp.auth import login_required
@app.route("/")
@login_required
def root():
return flask.render_template('root.html', **flask.session)
def root() -> werkzeug.Response:
return flask.Response(flask.render_template('root.html', **flask.session))