Merge pull request 'Dust off the codebase.' (#8) from q3k/dustoff into master
Reviewed-on: #8
This commit is contained in:
commit
05cbc7163a
24 changed files with 1129 additions and 651 deletions
8
ci.sh
Executable file
8
ci.sh
Executable file
|
@ -0,0 +1,8 @@
|
|||
#!/usr/bin/env bash
|
||||
set -e -x
|
||||
|
||||
# Use shell.nix if possible.
|
||||
|
||||
MYPYPATH=$(pwd)/stubs mypy --strict webapp/__init__.py
|
||||
black webapp
|
||||
black stubs
|
35
shell.nix
Normal file
35
shell.nix
Normal file
|
@ -0,0 +1,35 @@
|
|||
with import <nixpkgs> {};
|
||||
|
||||
let
|
||||
types-wtforms = ps: ps.buildPythonPackage rec {
|
||||
pname = "types-wtforms";
|
||||
version = "3.1.0.20240425";
|
||||
format = "setuptools";
|
||||
|
||||
src = fetchPypi {
|
||||
inherit version;
|
||||
pname = "types-WTForms";
|
||||
hash = "sha256-Sf/B/lV26gc1t2P/935wYN057MZhJ2y9C0cJmSGzpvI=";
|
||||
};
|
||||
|
||||
doCheck = false;
|
||||
};
|
||||
|
||||
in pkgs.mkShell {
|
||||
nativeBuildInputs = [
|
||||
poetry
|
||||
(python311.withPackages (ps: with ps; [
|
||||
black
|
||||
ldap
|
||||
flask
|
||||
flask-wtf
|
||||
pillow
|
||||
kerberos
|
||||
(types-wtforms ps)
|
||||
]))
|
||||
cyrus_sasl
|
||||
openldap
|
||||
libkrb5
|
||||
mypy
|
||||
];
|
||||
}
|
7
stubs/flask_wtf.pyi
Normal file
7
stubs/flask_wtf.pyi
Normal file
|
@ -0,0 +1,7 @@
|
|||
import wtforms
|
||||
|
||||
from typing import Protocol
|
||||
|
||||
class FlaskForm(wtforms.Form):
|
||||
def validate_on_submit(self) -> bool: ...
|
||||
def is_submitted(self) -> bool: ...
|
1
stubs/kerberos.pyi
Normal file
1
stubs/kerberos.pyi
Normal file
|
@ -0,0 +1 @@
|
|||
def changePassword(principal: str, current: str, new: str) -> bool: ...
|
40
stubs/ldap.pyi
Normal file
40
stubs/ldap.pyi
Normal file
|
@ -0,0 +1,40 @@
|
|||
from enum import Enum
|
||||
from typing import Protocol, List, Tuple, Dict, Optional
|
||||
|
||||
class Scope(Enum):
|
||||
BASE = "base"
|
||||
ONELEVEL = "onelevel"
|
||||
SUBTREE = "subtree"
|
||||
|
||||
SCOPE_BASE = Scope.BASE
|
||||
SCOPE_ONELEVEL = Scope.ONELEVEL
|
||||
SCOPE_SUBTREE = Scope.SUBTREE
|
||||
|
||||
class Mod(Enum):
|
||||
ADD = "add"
|
||||
DELETE = "delete"
|
||||
REPLACE = "replace"
|
||||
|
||||
MOD_ADD = Mod.ADD
|
||||
MOD_DELETE = Mod.DELETE
|
||||
MOD_REPLACE = Mod.REPLACE
|
||||
|
||||
ModEntry = Tuple[Mod, str, bytes | List[bytes]]
|
||||
|
||||
class LDAPError(Exception): ...
|
||||
class NO_SUCH_OBJECT(LDAPError): ...
|
||||
|
||||
class ldapobject(Protocol):
|
||||
def start_tls_s(self) -> None: ...
|
||||
def simple_bind_s(self, dn: str, password: str) -> None: ...
|
||||
def search_s(
|
||||
self,
|
||||
base: str,
|
||||
scope: Scope,
|
||||
filterstr: str = "(objectClass=*)",
|
||||
attrlist: Optional[List[str]] = None,
|
||||
attrsonly: int = 0,
|
||||
) -> List[Tuple[str, Dict[str, List[bytes]]]]: ...
|
||||
def modify_s(self, dn: str, modlist: List[ModEntry]) -> None: ...
|
||||
|
||||
def initialize(url: str) -> ldapobject: ...
|
|
@ -1,62 +1,103 @@
|
|||
import ldap
|
||||
import flask
|
||||
import flask_wtf
|
||||
import wtforms
|
||||
from functools import reduce
|
||||
|
||||
app = flask.Flask(__name__)
|
||||
from webapp import validation, pools, config, context
|
||||
|
||||
from webapp import validation, pools, config
|
||||
from typing import Dict, Any, Protocol, Optional
|
||||
|
||||
if hasattr(config, "secret_key"):
|
||||
app.secret_key = config.secret_key
|
||||
|
||||
if hasattr(config, "debug"):
|
||||
app.debug = config.debug
|
||||
class App(flask.Flask):
|
||||
connections: pools.LDAPConnectionPool
|
||||
profiles: Dict[str, context.Profile]
|
||||
|
||||
def __init__(self, name: str):
|
||||
super().__init__(name)
|
||||
if hasattr(config, "secret_key"):
|
||||
self.secret_key = config.secret_key
|
||||
|
||||
if hasattr(config, "debug"):
|
||||
self.debug = config.debug
|
||||
|
||||
def get_dn(self) -> Optional[str]:
|
||||
return flask.session.get("dn")
|
||||
|
||||
def get_connection(self, dn: Optional[str] = None) -> Optional[ldap.ldapobject]:
|
||||
dn = dn or self.get_dn()
|
||||
if dn is None:
|
||||
return None
|
||||
return self.connections[dn]
|
||||
|
||||
def get_admin_connection(self) -> ldap.ldapobject:
|
||||
conn = self.connections[config.ldap_admin_dn]
|
||||
if not conn:
|
||||
conn = self.connections.bind(
|
||||
config.ldap_admin_dn, config.ldap_admin_password
|
||||
)
|
||||
return conn
|
||||
|
||||
def get_profile(self, dn: Optional[str] = None) -> Optional[context.Profile]:
|
||||
dn = dn or self.get_dn()
|
||||
if dn is None:
|
||||
return None
|
||||
return self.profiles.get(dn)
|
||||
|
||||
def refresh_profile(
|
||||
self, conn: ldap.ldapobject, dn: Optional[str] = None
|
||||
) -> Optional[context.Profile]:
|
||||
dn = dn or self.get_dn()
|
||||
if dn is None:
|
||||
return None
|
||||
|
||||
profile = context.Profile(conn, dn)
|
||||
self.profiles[dn] = profile
|
||||
return profile
|
||||
|
||||
|
||||
app = App(__name__)
|
||||
|
||||
|
||||
@app.context_processor
|
||||
def inject_readable():
|
||||
def inject_readable() -> Dict[str, Any]:
|
||||
return dict(readable_names=config.readable_names)
|
||||
|
||||
|
||||
@app.context_processor
|
||||
def inject_hackerspace_name():
|
||||
def inject_hackerspace_name() -> Dict[str, Any]:
|
||||
return dict(hackerspace_name=config.hackerspace_name)
|
||||
|
||||
@app.template_filter('first')
|
||||
def ldap_first(v):
|
||||
|
||||
@app.template_filter("first")
|
||||
def ldap_first(v: str) -> str:
|
||||
return v and v[0]
|
||||
|
||||
@app.template_filter('readable')
|
||||
def readable_tf(n):
|
||||
|
||||
@app.template_filter("readable")
|
||||
def readable_tf(n: str) -> str:
|
||||
return config.readable_names.get(n, n)
|
||||
|
||||
def initialize_forms():
|
||||
forms = {}
|
||||
for f in reduce(lambda a,b: a | b, config.can.values()):
|
||||
cls, attrs = config.fields.get(f, config.default_field)
|
||||
class AddForm(flask_wtf.FlaskForm):
|
||||
value = cls(label=config.readable_names.get(f), **attrs)
|
||||
AddForm.__name__ == 'Add' + f
|
||||
forms[f] = AddForm
|
||||
return forms
|
||||
|
||||
def start():
|
||||
def start() -> None:
|
||||
validation.sanitize_perms()
|
||||
validation.sanitize_readable()
|
||||
|
||||
from webapp import views
|
||||
from webapp import auth, admin, avatar, vcard, passwd
|
||||
|
||||
for module in (auth, admin, avatar, vcard, passwd):
|
||||
app.register_blueprint(module.bp)
|
||||
|
||||
app.connections = pools.LDAPConnectionPool(config.ldap_url, timeout=300.0)
|
||||
def drop_profile(dn):
|
||||
|
||||
def drop_profile(dn: str) -> None:
|
||||
if dn != config.ldap_admin_dn:
|
||||
del app.profiles[dn]
|
||||
app.connections.register_callback('drop', drop_profile)
|
||||
app.connections.start()
|
||||
|
||||
app.connections.register_callback("drop", drop_profile)
|
||||
app.connections.start()
|
||||
app.profiles = {}
|
||||
app.forms = initialize_forms()
|
||||
|
||||
|
||||
start()
|
||||
|
|
307
webapp/admin.py
307
webapp/admin.py
|
@ -4,203 +4,186 @@ import re
|
|||
import flask
|
||||
import flask_wtf
|
||||
import wtforms
|
||||
import werkzeug
|
||||
|
||||
import webapp
|
||||
from webapp import app, context, config, ldaputils, email
|
||||
from webapp import app, context, config, ldaputils, email, vcard
|
||||
|
||||
bp = flask.Blueprint('admin', __name__)
|
||||
from typing import Callable, ParamSpec, List, Tuple, Optional, Dict, Protocol
|
||||
|
||||
def admin_required_impl(f):
|
||||
bp = flask.Blueprint("admin", __name__)
|
||||
|
||||
|
||||
Entry = Tuple[str, List[Tuple[str, str]]]
|
||||
|
||||
P = ParamSpec("P")
|
||||
|
||||
|
||||
def admin_required_impl(
|
||||
f: Callable[P, werkzeug.Response]
|
||||
) -> Callable[P, werkzeug.Response]:
|
||||
@functools.wraps(f)
|
||||
def func(*a, **kw):
|
||||
def func(*a: P.args, **kw: P.kwargs) -> werkzeug.Response:
|
||||
# TODO: Actually check for admin perms
|
||||
if not flask.session['is_admin']:
|
||||
if not flask.session["is_admin"]:
|
||||
flask.abort(403)
|
||||
|
||||
return f(*a, **kw)
|
||||
|
||||
return func
|
||||
|
||||
def admin_required(f):
|
||||
|
||||
def admin_required(f: Callable[P, werkzeug.Response]) -> Callable[P, werkzeug.Response]:
|
||||
return webapp.auth.login_required(admin_required_impl(f))
|
||||
|
||||
def _get_user_list(conn, query='&'):
|
||||
"""Returns List[Tuple[username, full name]] for query"""
|
||||
|
||||
def _get_user_list(conn: ldap.ldapobject, query: str = "&") -> List[Tuple[str, str]]:
|
||||
"""
|
||||
Returns List[Tuple[username, full name]] for query.
|
||||
"""
|
||||
all_users = []
|
||||
|
||||
results = conn.search_s(config.ldap_people, ldap.SCOPE_SUBTREE, f'(&(uid=*)(cn=*){ldaputils.wrap(query)})', attrlist=['uid', 'cn'])
|
||||
results = conn.search_s(
|
||||
config.ldap_people,
|
||||
ldap.SCOPE_SUBTREE,
|
||||
f"(&(uid=*)(cn=*){ldaputils.wrap(query)})",
|
||||
attrlist=["uid", "cn"],
|
||||
)
|
||||
for user, attrs in results:
|
||||
user_uid = attrs['uid'][0].decode()
|
||||
user_cn = attrs['cn'][0].decode()
|
||||
user_uid = attrs["uid"][0].decode()
|
||||
user_cn = attrs["cn"][0].decode()
|
||||
all_users.append((user_uid, user_cn))
|
||||
|
||||
all_users.sort(key=lambda user: user[0].lower())
|
||||
return all_users
|
||||
|
||||
def _get_groupped_user_list(conn):
|
||||
"""Returns all users (uid, full name), groupped by active groups"""
|
||||
|
||||
def _get_groupped_user_list(
|
||||
conn: ldap.ldapobject,
|
||||
) -> List[Tuple[str, List[Tuple[str, str]]]]:
|
||||
"""
|
||||
Returns all users (uid, full name), groupped by active groups.
|
||||
"""
|
||||
groupped_users = [
|
||||
(group.capitalize(), _get_user_list(conn, f'memberOf={ldaputils.group_dn(group)}'))
|
||||
(
|
||||
group.capitalize(),
|
||||
_get_user_list(conn, f"memberOf={ldaputils.group_dn(group)}"),
|
||||
)
|
||||
for group in config.ldap_active_groups
|
||||
]
|
||||
|
||||
inactive_filter = ldaputils._not(
|
||||
ldaputils.member_of_any(config.ldap_active_groups)
|
||||
)
|
||||
inactive_filter = ldaputils._not(ldaputils.member_of_any(config.ldap_active_groups))
|
||||
|
||||
groupped_users.append(
|
||||
('Inactive users', _get_user_list(conn, inactive_filter))
|
||||
)
|
||||
groupped_users.append(("Inactive users", _get_user_list(conn, inactive_filter)))
|
||||
|
||||
return groupped_users
|
||||
|
||||
@bp.route('/admin/')
|
||||
@admin_required
|
||||
def admin_view():
|
||||
return flask.render_template('admin/index.html')
|
||||
|
||||
@bp.route('/admin/users/')
|
||||
@admin_required
|
||||
def admin_users_view():
|
||||
conn = context.get_connection()
|
||||
groups = _get_groupped_user_list(conn)
|
||||
|
||||
return flask.render_template('admin/users.html', groups=groups)
|
||||
|
||||
def _get_profile(conn, uid):
|
||||
results = conn.search_s(ldaputils.user_dn(uid), ldap.SCOPE_SUBTREE)
|
||||
return ldaputils.normalized_entries(results)[0]
|
||||
|
||||
def _format_profile(profile):
|
||||
rendered = []
|
||||
dn, attrs = profile
|
||||
for attr, value in attrs:
|
||||
attr_sanitized = attr.lower()
|
||||
attr_full_name = config.full_name.get(attr_sanitized, attr_sanitized)
|
||||
attr_readable_name = config.readable_names.get(attr_full_name)
|
||||
rendered.append((attr, attr_readable_name, value))
|
||||
|
||||
# Attributes with readable names first, then by name
|
||||
rendered.sort(key=lambda profile: profile[0])
|
||||
rendered.sort(key=lambda profile: profile[1] is None)
|
||||
return rendered
|
||||
|
||||
def _get_groups_of(conn, uid):
|
||||
filter = ldaputils.groups_of_user(uid)
|
||||
def _get_groups_of(conn: ldap.ldapobject, dn: str) -> List[str]:
|
||||
filter = f"(&(objectClass=groupOfUniqueNames)(uniqueMember={dn}))"
|
||||
groups = [
|
||||
attrs['cn'][0].decode()
|
||||
for group_dn, attrs in
|
||||
conn.search_s(config.ldap_base, ldap.SCOPE_SUBTREE, filter)
|
||||
attrs["cn"][0].decode()
|
||||
for group_dn, attrs in conn.search_s(
|
||||
config.ldap_base, ldap.SCOPE_SUBTREE, filter
|
||||
)
|
||||
]
|
||||
|
||||
return groups
|
||||
|
||||
def _is_user_protected(conn, uid, groups):
|
||||
|
||||
def _is_user_protected(conn: ldap.ldapobject, groups: List[str]) -> bool:
|
||||
return any(group in config.ldap_protected_groups for group in groups)
|
||||
|
||||
@bp.route('/admin/users/<uid>')
|
||||
|
||||
@bp.route("/admin/")
|
||||
@admin_required
|
||||
def admin_user_view(uid):
|
||||
ldaputils.validate_name(uid)
|
||||
conn = context.get_connection()
|
||||
def admin_view() -> werkzeug.Response:
|
||||
return flask.Response(flask.render_template("admin/index.html"))
|
||||
|
||||
profile = _get_profile(conn, uid)
|
||||
groups = _get_groups_of(conn, uid)
|
||||
is_protected = _is_user_protected(conn, uid, groups)
|
||||
|
||||
return flask.render_template('admin/user.html', uid=uid, profile=_format_profile(profile), groups=groups, is_protected=is_protected)
|
||||
@bp.route("/admin/users/")
|
||||
@admin_required
|
||||
def admin_users_view() -> werkzeug.Response:
|
||||
conn = app.get_connection()
|
||||
assert conn is not None
|
||||
groups = _get_groupped_user_list(conn)
|
||||
|
||||
# TODO: Deduplicate this modification logic with webapp/vcard.py
|
||||
return flask.Response(flask.render_template("admin/users.html", groups=groups))
|
||||
|
||||
class AddMifareIDHash(flask_wtf.FlaskForm):
|
||||
value = wtforms.fields.StringField(label=config.readable_names.get('mifareidhash'))
|
||||
|
||||
class DelForm(flask_wtf.FlaskForm):
|
||||
@bp.route("/admin/users/<username>")
|
||||
@admin_required
|
||||
def admin_user_view(username: str) -> werkzeug.Response:
|
||||
ldaputils.validate_name(username)
|
||||
dn = ldaputils.user_dn(username)
|
||||
|
||||
conn = app.get_connection()
|
||||
assert conn is not None
|
||||
|
||||
profile = context.Profile(conn, dn)
|
||||
groups = _get_groups_of(conn, dn)
|
||||
is_protected = _is_user_protected(conn, groups)
|
||||
|
||||
return flask.Response(
|
||||
flask.render_template(
|
||||
"admin/user.html", profile=profile, groups=groups, is_protected=is_protected
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
class AdminMixin:
|
||||
def _allowed(self, subject_dn: str) -> Optional[str]:
|
||||
conn = app.get_connection()
|
||||
assert conn is not None
|
||||
groups = _get_groups_of(conn, subject_dn)
|
||||
if _is_user_protected(conn, groups):
|
||||
return "User is protected"
|
||||
return None
|
||||
|
||||
|
||||
class AdminOperationAdd(AdminMixin, vcard.OperationAdd):
|
||||
pass
|
||||
|
||||
@bp.route('/admin/users/<uid>/add_mifareidhash')
|
||||
|
||||
class AdminOperationModify(AdminMixin, vcard.OperationModify):
|
||||
pass
|
||||
|
||||
|
||||
class AdminOperationDelete(AdminMixin, vcard.OperationDelete):
|
||||
pass
|
||||
|
||||
|
||||
@bp.route("/admin/users/<username>/add_mifareidhash", methods=["GET", "POST"])
|
||||
@admin_required
|
||||
def admin_user_view_add_mifareidhash(uid):
|
||||
form = AddMifareIDHash()
|
||||
return flask.render_template('admin/ops/add_mifareidhash.html', uid=uid, form=form)
|
||||
def admin_user_view_add_mifareidhash(username: str) -> werkzeug.Response:
|
||||
dn = ldaputils.user_dn(username)
|
||||
op = AdminOperationAdd(dn, "mifareidhash")
|
||||
redirect = f"/admin/users/{username}"
|
||||
return op.perform(
|
||||
success_redirect=redirect,
|
||||
fatal_redirect=redirect,
|
||||
action=f"/admin/users/{username}/add_mifareidhash",
|
||||
)
|
||||
|
||||
@bp.route('/admin/users/<uid>/del_mifareidhash')
|
||||
|
||||
@bp.route("/admin/users/<username>/del_mifareidhash/<uid>", methods=["GET", "POST"])
|
||||
@admin_required
|
||||
def admin_user_view_del_mifareidhash(uid):
|
||||
form = DelForm()
|
||||
value = flask.request.args.get('value')
|
||||
return flask.render_template('admin/ops/del_mifareidhash.html', uid=uid, form=form, value=value)
|
||||
|
||||
def _modify_mifareidhash(uid, form, modify_func):
|
||||
ldaputils.validate_name(uid)
|
||||
conn = context.get_connection()
|
||||
|
||||
groups = _get_groups_of(conn, uid)
|
||||
is_protected = _is_user_protected(conn, uid, groups)
|
||||
|
||||
redirect_url = flask.url_for('admin.admin_user_view', uid=uid)
|
||||
if is_protected:
|
||||
flask.flash('Cannot modify protected user', 'danger')
|
||||
return flask.redirect(redirect_url)
|
||||
|
||||
try:
|
||||
if form.validate_on_submit():
|
||||
dn = ldaputils.user_dn(uid)
|
||||
modify_func(conn, dn)
|
||||
|
||||
context.refresh_profile(dn)
|
||||
flask.flash('Added mifareidhash', category='info')
|
||||
return flask.redirect(redirect_url)
|
||||
|
||||
for field, errors in form.errors.items():
|
||||
for error in errors:
|
||||
flask.flash("Error in the {} field - {}".format(
|
||||
getattr(form, field).label.text,
|
||||
error
|
||||
), 'danger')
|
||||
|
||||
return flask.redirect(redirect_url)
|
||||
except ldap.LDAPError as e:
|
||||
print('LDAP error:', e)
|
||||
flask.flash(f'Could not modify profile due to LDAP error: {e}', 'danger')
|
||||
return flask.redirect(redirect_url)
|
||||
def admin_user_view_del_mifareidhash(username: str, uid: str) -> werkzeug.Response:
|
||||
dn = ldaputils.user_dn(username)
|
||||
op = AdminOperationDelete(dn, uid)
|
||||
redirect = f"/admin/users/{username}"
|
||||
return op.perform(
|
||||
success_redirect=redirect,
|
||||
fatal_redirect=redirect,
|
||||
action=f"/admin/users/{username}/del_mifareidhash/{uid}",
|
||||
)
|
||||
|
||||
|
||||
@bp.route('/admin/users/<uid>/add_mifareidhash', methods=['POST'])
|
||||
@bp.route("/admin/groups/")
|
||||
@admin_required
|
||||
def admin_user_add_mifareidhash(uid):
|
||||
form = AddMifareIDHash()
|
||||
def modify_func(conn, dn):
|
||||
new_value = form.value.data
|
||||
|
||||
email.send_papertrail(
|
||||
f'Adding mifareIDHash for user {uid}',
|
||||
f'New mifareIDHash: {new_value}'
|
||||
)
|
||||
|
||||
conn.modify_s(dn, [(ldap.MOD_ADD, 'mifareidhash', new_value.encode('utf-8'))])
|
||||
|
||||
return _modify_mifareidhash(uid, form, modify_func)
|
||||
|
||||
@bp.route('/admin/users/<uid>/del_mifareidhash', methods=['POST'])
|
||||
@admin_required
|
||||
def admin_user_del_mifareidhash(uid):
|
||||
form = DelForm()
|
||||
def modify_func(conn, dn):
|
||||
old_value = flask.request.args.get('value')
|
||||
|
||||
email.send_papertrail(
|
||||
f'Deleting mifareIDHash for user {uid}',
|
||||
f'Deleted mifareIDHash: {old_value}'
|
||||
)
|
||||
|
||||
conn.modify_s(dn, [(ldap.MOD_DELETE, 'mifareidhash', old_value.encode('utf-8'))])
|
||||
|
||||
return _modify_mifareidhash(uid, form, modify_func)
|
||||
|
||||
@bp.route('/admin/groups/')
|
||||
@admin_required
|
||||
def admin_groups_view():
|
||||
conn = context.get_connection()
|
||||
def admin_groups_view() -> werkzeug.Response:
|
||||
conn = app.get_connection()
|
||||
assert conn is not None
|
||||
|
||||
# no obvious way to filter out groups that are just a per-user-group
|
||||
# (not super useful to look at them)
|
||||
|
@ -209,28 +192,34 @@ def admin_groups_view():
|
|||
all_uids = set([uid for uid, cn in all_users])
|
||||
|
||||
groups = [
|
||||
attrs['cn'][0].decode()
|
||||
for group_dn, attrs in
|
||||
conn.search_s(config.ldap_base, ldap.SCOPE_SUBTREE, 'objectClass=groupOfUniqueNames')
|
||||
attrs["cn"][0].decode()
|
||||
for group_dn, attrs in conn.search_s(
|
||||
config.ldap_base, ldap.SCOPE_SUBTREE, "objectClass=groupOfUniqueNames"
|
||||
)
|
||||
]
|
||||
|
||||
filter_groups = filter((lambda cn: cn not in all_uids), groups)
|
||||
|
||||
return flask.render_template('admin/groups.html', groups=filter_groups)
|
||||
return flask.Response(
|
||||
flask.render_template("admin/groups.html", groups=filter_groups)
|
||||
)
|
||||
|
||||
def _get_group(conn, name):
|
||||
results = conn.search_s(ldaputils.group_dn(name), ldap.SCOPE_SUBTREE)
|
||||
return ldaputils.normalized_entries(results)[0]
|
||||
|
||||
@bp.route('/admin/groups/<name>')
|
||||
@bp.route("/admin/groups/<name>")
|
||||
@admin_required
|
||||
def admin_group_view(name):
|
||||
def admin_group_view(name: str) -> werkzeug.Response:
|
||||
ldaputils.validate_name(name)
|
||||
conn = context.get_connection()
|
||||
dn = ldaputils.group_dn(name)
|
||||
conn = app.get_connection()
|
||||
assert conn is not None
|
||||
|
||||
group_attrs = _get_group(conn, name)
|
||||
members = _get_user_list(conn, f'memberOf={ldaputils.group_dn(name)}')
|
||||
group = context.LDAPEntry(conn, dn)
|
||||
members = _get_user_list(conn, f"memberOf={ldaputils.group_dn(name)}")
|
||||
|
||||
is_protected = name in config.ldap_protected_groups
|
||||
|
||||
return flask.render_template('admin/group.html', name=name, attributes=_format_profile(group_attrs), members=members, is_protected=is_protected)
|
||||
return flask.Response(
|
||||
flask.render_template(
|
||||
"admin/group.html", group=group, members=members, is_protected=is_protected
|
||||
)
|
||||
)
|
||||
|
|
|
@ -2,38 +2,51 @@ import functools
|
|||
import ldap
|
||||
import flask
|
||||
import urllib
|
||||
import werkzeug
|
||||
|
||||
from webapp import app, context, config, ldaputils
|
||||
from webapp import app, avatar, config, ldaputils
|
||||
|
||||
bp = flask.Blueprint('auth', __name__)
|
||||
from typing import TypeVar, Callable, ParamSpec, Dict, Any, Optional
|
||||
|
||||
def login_required(f):
|
||||
bp = flask.Blueprint("auth", __name__)
|
||||
|
||||
P = ParamSpec("P")
|
||||
|
||||
|
||||
def login_required(f: Callable[P, werkzeug.Response]) -> Callable[P, werkzeug.Response]:
|
||||
@functools.wraps(f)
|
||||
def func(*a, **kw):
|
||||
conn = context.get_connection()
|
||||
def func(*a: P.args, **kw: P.kwargs) -> werkzeug.Response:
|
||||
conn = app.get_connection()
|
||||
if not conn:
|
||||
flask.session.clear()
|
||||
flask.flash('You must log in to continue', category='warning')
|
||||
return flask.redirect('/login?' + urllib.parse.urlencode({'goto': flask.request.path}))
|
||||
flask.flash("You must log in to continue", category="warning")
|
||||
return flask.redirect(
|
||||
"/login?" + urllib.parse.urlencode({"goto": flask.request.path})
|
||||
)
|
||||
return f(*a, **kw)
|
||||
|
||||
return func
|
||||
|
||||
def req_to_ctx():
|
||||
|
||||
def req_to_ctx() -> Dict[str, Any]:
|
||||
return dict(flask.request.form.items())
|
||||
|
||||
@bp.route('/login', methods=["GET"])
|
||||
def login_form():
|
||||
return flask.render_template('login.html', **req_to_ctx())
|
||||
|
||||
def _connect_to_ldap(dn, password):
|
||||
@bp.route("/login", methods=["GET"])
|
||||
def login_form() -> werkzeug.Response:
|
||||
return flask.Response(flask.render_template("login.html", **req_to_ctx()))
|
||||
|
||||
|
||||
def _connect_to_ldap(dn: str, password: str) -> Optional[ldap.ldapobject]:
|
||||
try:
|
||||
return app.connections.bind(dn, password)
|
||||
except ldap.LDAPError as error_message:
|
||||
print("Could not connect to server:", error_message)
|
||||
return None
|
||||
|
||||
@bp.route('/login', methods=["POST"])
|
||||
def login_action():
|
||||
|
||||
@bp.route("/login", methods=["POST"])
|
||||
def login_action() -> werkzeug.Response:
|
||||
# LDAP usernames/DNs are case-insensitive, so we normalize them just in
|
||||
# case,
|
||||
username = flask.request.form.get("username", "").lower()
|
||||
|
@ -46,25 +59,34 @@ def login_action():
|
|||
# Now that we have logged in, we can retrieve the 'real' username (which
|
||||
# might be cased differently from the login name).
|
||||
res = conn.search_s(dn, ldap.SCOPE_SUBTREE)
|
||||
for (k, vs) in res[0][1].items():
|
||||
if k == 'uid':
|
||||
for k, vs in res[0][1].items():
|
||||
if k == "uid":
|
||||
username = vs[0].decode()
|
||||
|
||||
# Check if user belongs to admin group
|
||||
is_admin = bool(conn.search_s(dn, ldap.SCOPE_SUBTREE, ldaputils.member_of_any(config.ldap_admin_groups)))
|
||||
is_admin = bool(
|
||||
conn.search_s(
|
||||
dn,
|
||||
ldap.SCOPE_SUBTREE,
|
||||
ldaputils.member_of_any(config.ldap_admin_groups),
|
||||
)
|
||||
)
|
||||
|
||||
flask.session["username"] = username
|
||||
flask.session['dn'] = dn
|
||||
flask.session['is_admin'] = is_admin
|
||||
context.refresh_profile()
|
||||
flask.session["dn"] = dn
|
||||
flask.session["is_admin"] = is_admin
|
||||
app.refresh_profile(conn)
|
||||
avatar.cache.reset_user(username)
|
||||
avatar.hash_cache.reset()
|
||||
return flask.redirect(goto)
|
||||
else:
|
||||
flask.flash("Invalid credentials.", category='danger')
|
||||
flask.flash("Invalid credentials.", category="danger")
|
||||
return login_form()
|
||||
|
||||
@bp.route('/logout')
|
||||
|
||||
@bp.route("/logout")
|
||||
@login_required
|
||||
def logout_action():
|
||||
app.connections.unbind(flask.session['dn'])
|
||||
def logout_action() -> werkzeug.Response:
|
||||
app.connections.unbind(flask.session["dn"])
|
||||
flask.session.clear()
|
||||
return flask.redirect('/')
|
||||
return flask.redirect("/")
|
||||
|
|
130
webapp/avatar.py
130
webapp/avatar.py
|
@ -17,14 +17,16 @@ from PIL import Image, ImageDraw
|
|||
import flask
|
||||
import ldap
|
||||
|
||||
from webapp import context, ldaputils, config
|
||||
from webapp import app, ldaputils, config
|
||||
|
||||
bp = flask.Blueprint('avatar', __name__)
|
||||
log = logging.getLogger('ldap-web.avatar')
|
||||
from typing import List
|
||||
|
||||
bp = flask.Blueprint("avatar", __name__)
|
||||
log = logging.getLogger("ldap-web.avatar")
|
||||
|
||||
|
||||
# Stolen from https://stackoverflow.com/questions/43512615/reshaping-rectangular-image-to-square
|
||||
def resize_image(image: Image, length: int) -> Image:
|
||||
def resize_image(image: Image.Image, length: int) -> Image.Image:
|
||||
"""
|
||||
Resize an image to a square. Can make an image bigger to make it fit or smaller if it doesn't fit. It also crops
|
||||
part of the image.
|
||||
|
@ -44,14 +46,22 @@ def resize_image(image: Image, length: int) -> Image:
|
|||
# The image is in portrait mode. Height is bigger than width.
|
||||
|
||||
# This makes the width fit the LENGTH in pixels while conserving the ration.
|
||||
resized_image = image.resize((length, int(image.size[1] * (length / image.size[0]))))
|
||||
resized_image = image.resize(
|
||||
(length, int(image.size[1] * (length / image.size[0])))
|
||||
)
|
||||
|
||||
# Amount of pixel to lose in total on the height of the image.
|
||||
required_loss = (resized_image.size[1] - length)
|
||||
required_loss = resized_image.size[1] - length
|
||||
|
||||
# Crop the height of the image so as to keep the center part.
|
||||
resized_image = resized_image.crop(
|
||||
box=(0, required_loss / 2, length, resized_image.size[1] - required_loss / 2))
|
||||
box=(
|
||||
0,
|
||||
required_loss // 2,
|
||||
length,
|
||||
resized_image.size[1] - required_loss // 2,
|
||||
)
|
||||
)
|
||||
|
||||
# We now have a length*length pixels image.
|
||||
return resized_image
|
||||
|
@ -59,45 +69,55 @@ def resize_image(image: Image, length: int) -> Image:
|
|||
# This image is in landscape mode or already squared. The width is bigger than the heihgt.
|
||||
|
||||
# This makes the height fit the LENGTH in pixels while conserving the ration.
|
||||
resized_image = image.resize((int(image.size[0] * (length / image.size[1])), length))
|
||||
resized_image = image.resize(
|
||||
(int(image.size[0] * (length / image.size[1])), length)
|
||||
)
|
||||
|
||||
# Amount of pixel to lose in total on the width of the image.
|
||||
required_loss = resized_image.size[0] - length
|
||||
|
||||
# Crop the width of the image so as to keep 1080 pixels of the center part.
|
||||
resized_image = resized_image.crop(
|
||||
box=(required_loss / 2, 0, resized_image.size[0] - required_loss / 2, length))
|
||||
box=(
|
||||
required_loss // 2,
|
||||
0,
|
||||
resized_image.size[0] - required_loss // 2,
|
||||
length,
|
||||
)
|
||||
)
|
||||
|
||||
# We now have a length*length pixels image.
|
||||
return resized_image
|
||||
|
||||
|
||||
def process_upload(data: bytes) -> bytes:
|
||||
img = Image.open(io.BytesIO(data))
|
||||
img = resize_image(img, 256)
|
||||
res = io.BytesIO()
|
||||
img.save(res, 'PNG')
|
||||
img.save(res, "PNG")
|
||||
return base64.b64encode(res.getvalue())
|
||||
|
||||
|
||||
syrenka = Image.open("syrenka.png")
|
||||
|
||||
|
||||
def default_avatar(uid: str) -> Image:
|
||||
def default_avatar(uid: str) -> bytes:
|
||||
"""
|
||||
Create little generative avatar for people who don't have a custom one
|
||||
configured.
|
||||
"""
|
||||
img = Image.new('RGBA', (256, 256), (255, 255, 255, 0))
|
||||
img = Image.new("RGBA", (256, 256), (255, 255, 255, 0))
|
||||
draw = ImageDraw.Draw(img)
|
||||
|
||||
# Deterministic rng for stable output.
|
||||
rng = random.Random(uid)
|
||||
|
||||
# Pick a nice random neon color.
|
||||
n_h, n_s, n_l = rng.random(), 0.5 + rng.random()/2.0, 0.4 + rng.random()/5.0
|
||||
n_h, n_s, n_l = rng.random(), 0.5 + rng.random() / 2.0, 0.4 + rng.random() / 5.0
|
||||
|
||||
# Use muted version for background.
|
||||
r, g, b = [int(256*i) for i in colorsys.hls_to_rgb(n_h, n_l+0.3, n_s-0.1)]
|
||||
draw.rectangle([(0, 0), (256, 256)], fill=(r,g,b,255))
|
||||
r, g, b = [int(256 * i) for i in colorsys.hls_to_rgb(n_h, n_l + 0.3, n_s - 0.1)]
|
||||
draw.rectangle([(0, 0), (256, 256)], fill=(r, g, b, 255))
|
||||
|
||||
# Scale logo by randomized factor.
|
||||
factor = 0.7 + 0.1 * rng.random()
|
||||
|
@ -108,20 +128,20 @@ def default_avatar(uid: str) -> Image:
|
|||
overlay = overlay.crop(box=(0, 0, w, w))
|
||||
|
||||
# Give it a little nudge.
|
||||
overlay = overlay.rotate((rng.random() - 0.5) * 100)
|
||||
overlay = overlay.rotate((rng.random() - 0.5) * 100) # type: ignore
|
||||
|
||||
# Colorize with full neon color.
|
||||
r, g, b = [int(256*i) for i in colorsys.hls_to_rgb(n_h,n_l,n_s)]
|
||||
pixels = overlay.load()
|
||||
r, g, b = [int(256 * i) for i in colorsys.hls_to_rgb(n_h, n_l, n_s)]
|
||||
pixels = overlay.load() # type: ignore
|
||||
for x in range(img.size[0]):
|
||||
for y in range(img.size[1]):
|
||||
alpha = pixels[x, y][3]
|
||||
pixels[x, y] = (r, g, b, alpha)
|
||||
|
||||
img.alpha_composite(overlay)
|
||||
img.alpha_composite(overlay) # type: ignore
|
||||
|
||||
res = io.BytesIO()
|
||||
img.save(res, 'PNG')
|
||||
img.save(res, "PNG")
|
||||
return res.getvalue()
|
||||
|
||||
|
||||
|
@ -135,13 +155,13 @@ class AvatarCacheEntry:
|
|||
# Cached converted bytes
|
||||
_converted: bytes
|
||||
|
||||
def __init__(self, uid: str, data: bytes):
|
||||
def __init__(self, uid: str, data: bytes) -> None:
|
||||
self.uid = uid
|
||||
self.deadline = time.time() + config.avatar_cache_timeout
|
||||
self.data = data
|
||||
self._converted = b""
|
||||
|
||||
def serve(self):
|
||||
def serve(self) -> flask.Response:
|
||||
"""
|
||||
Serve sanitized image. Always re-encode to PNG 256x256.
|
||||
"""
|
||||
|
@ -156,27 +176,27 @@ class AvatarCacheEntry:
|
|||
|
||||
res = io.BytesIO()
|
||||
img = resize_image(img, 256)
|
||||
img.save(res, 'PNG')
|
||||
img.save(res, "PNG")
|
||||
self._converted = res.getvalue()
|
||||
|
||||
return flask.Response(self._converted, mimetype='image/png')
|
||||
return flask.Response(self._converted, mimetype="image/png")
|
||||
|
||||
|
||||
class AvatarCache:
|
||||
# keyed by uid
|
||||
entries: dict[str, AvatarCacheEntry]
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
self.entries = {}
|
||||
|
||||
def reset(self):
|
||||
def reset(self) -> None:
|
||||
self.entries = {}
|
||||
|
||||
def reset_user(self, uid: str):
|
||||
def reset_user(self, uid: str) -> None:
|
||||
if uid in self.entries:
|
||||
del self.entries[uid]
|
||||
|
||||
def get(self, uid: str, bust: bool = False) -> AvatarCacheEntry:
|
||||
def get(self, uid: str, bust: bool = False) -> flask.Response:
|
||||
"""
|
||||
Get avatar, either from cache or from LDAP on cache miss. If 'bust' is
|
||||
set to True, the cache will not be consulted and the newest result from
|
||||
|
@ -193,7 +213,9 @@ class AvatarCache:
|
|||
del self.entries[uid]
|
||||
|
||||
# Otherwise, retrieve from LDAP.
|
||||
conn = context.get_admin_connection()
|
||||
conn = app.get_connection()
|
||||
if conn is None:
|
||||
conn = app.get_admin_connection()
|
||||
try:
|
||||
dn = ldaputils.user_dn(uid)
|
||||
res = conn.search_s(dn, ldap.SCOPE_SUBTREE)
|
||||
|
@ -204,10 +226,10 @@ class AvatarCache:
|
|||
is_user_found = len(res) == 1
|
||||
if is_user_found:
|
||||
for attr, vs in res[0][1].items():
|
||||
if attr == 'jpegPhoto':
|
||||
if attr == "jpegPhoto":
|
||||
for v in vs:
|
||||
# Temporary workaround: treat empty jpegPhoto as no avatar.
|
||||
if v == b'':
|
||||
if v == b"":
|
||||
avatar = None
|
||||
break
|
||||
|
||||
|
@ -224,7 +246,7 @@ class AvatarCache:
|
|||
if avatar is None:
|
||||
# don't generate avatars for non-users to reduce DoS potential
|
||||
# (note: capacifier already leaks existence of users, so whatever)
|
||||
avatar = default_avatar(uid if is_user_found else 'default')
|
||||
avatar = default_avatar(uid if is_user_found else "default")
|
||||
|
||||
# Save avatar in cache.
|
||||
entry = AvatarCacheEntry(uid, avatar)
|
||||
|
@ -233,25 +255,31 @@ class AvatarCache:
|
|||
# And serve the entry.
|
||||
return entry.serve()
|
||||
|
||||
|
||||
cache = AvatarCache()
|
||||
|
||||
def hash_for_uid(uid):
|
||||
|
||||
def hash_for_uid(uid: str) -> str:
|
||||
# NOTE: Gravatar documentation says to use SHA256, but everyone passes MD5 instead
|
||||
email = f'{uid}@hackerspace.pl'.strip().lower()
|
||||
email = f"{uid}@hackerspace.pl".strip().lower()
|
||||
hasher = hashlib.md5()
|
||||
hasher.update(email.encode())
|
||||
return hasher.hexdigest()
|
||||
|
||||
def get_all_user_uids(conn):
|
||||
|
||||
def get_all_user_uids(conn: ldap.ldapobject) -> List[str]:
|
||||
all_uids = []
|
||||
|
||||
results = conn.search_s(config.ldap_people, ldap.SCOPE_SUBTREE, 'uid=*', attrlist=['uid'])
|
||||
results = conn.search_s(
|
||||
config.ldap_people, ldap.SCOPE_SUBTREE, "uid=*", attrlist=["uid"]
|
||||
)
|
||||
for user, attrs in results:
|
||||
uid = attrs['uid'][0].decode()
|
||||
uid = attrs["uid"][0].decode()
|
||||
all_uids.append(uid)
|
||||
|
||||
return all_uids
|
||||
|
||||
|
||||
class HashCache:
|
||||
# email hash -> uid mapping
|
||||
entries: dict[str, str] = {}
|
||||
|
@ -260,43 +288,47 @@ class HashCache:
|
|||
|
||||
def get(self, email_hash: str) -> str:
|
||||
self.rebuild_if_needed()
|
||||
return self.entries.get(email_hash, 'default')
|
||||
return self.entries.get(email_hash, "default")
|
||||
|
||||
def reset(self):
|
||||
def reset(self) -> None:
|
||||
self.entries = {}
|
||||
self.deadline = 0
|
||||
|
||||
def rebuild_if_needed(self):
|
||||
def rebuild_if_needed(self) -> None:
|
||||
now = time.time()
|
||||
if now > self.deadline:
|
||||
self.rebuild()
|
||||
|
||||
def rebuild(self):
|
||||
def rebuild(self) -> None:
|
||||
log.info("Rebuilding email hash cache")
|
||||
conn = context.get_admin_connection()
|
||||
conn = app.get_admin_connection()
|
||||
users = get_all_user_uids(conn)
|
||||
self.deadline = time.time() + config.avatar_cache_timeout
|
||||
self.entries = { hash_for_uid(uid): uid for uid in users }
|
||||
self.entries = {hash_for_uid(uid): uid for uid in users}
|
||||
|
||||
|
||||
hash_cache = HashCache()
|
||||
|
||||
def sanitize_email_hash(hash: str):
|
||||
|
||||
def sanitize_email_hash(hash: str) -> str:
|
||||
"""
|
||||
lowercases, removes file extension (probably)
|
||||
"""
|
||||
hash = hash.lower()
|
||||
if hash.endswith('.png') or hash.endswith('.jpg'):
|
||||
if hash.endswith(".png") or hash.endswith(".jpg"):
|
||||
hash = hash[:-4]
|
||||
return hash
|
||||
|
||||
@bp.route('/avatar/<email_hash>', methods=['GET'])
|
||||
def gravatar_serve(email_hash):
|
||||
|
||||
@bp.route("/avatar/<email_hash>", methods=["GET"])
|
||||
def gravatar_serve(email_hash: str) -> flask.Response:
|
||||
"""
|
||||
Serves avatar in a Gravatar-compatible(ish) way, i.e. by email hash, not user name.
|
||||
"""
|
||||
uid = hash_cache.get(sanitize_email_hash(email_hash))
|
||||
return cache.get(uid)
|
||||
|
||||
@bp.route('/avatar/user/<uid>', methods=['GET'])
|
||||
def avatar_serve(uid):
|
||||
|
||||
@bp.route("/avatar/user/<uid>", methods=["GET"])
|
||||
def avatar_serve(uid: str) -> flask.Response:
|
||||
return cache.get(uid)
|
||||
|
|
148
webapp/config.py
148
webapp/config.py
|
@ -3,92 +3,114 @@ import wtforms
|
|||
import secrets
|
||||
import os
|
||||
|
||||
hackerspace_name = 'Warsaw Hackerspace'
|
||||
secret_key = secrets.token_hex(32)
|
||||
from typing import Dict, Set, List, Tuple, Any, TypeVar
|
||||
|
||||
hackerspace_name: str = "Warsaw Hackerspace"
|
||||
secret_key: str = secrets.token_hex(32)
|
||||
|
||||
# Kerberos configuration
|
||||
kadmin_principal_map = "{}@HACKERSPACE.PL"
|
||||
kadmin_principal_map: str = "{}@HACKERSPACE.PL"
|
||||
|
||||
# LDAP configuration
|
||||
ldap_url = 'ldap://ldap.hackerspace.pl'
|
||||
ldap_base = 'dc=hackerspace,dc=pl'
|
||||
ldap_people = 'ou=people,dc=hackerspace,dc=pl'
|
||||
ldap_user_dn_format = 'uid={},ou=people,dc=hackerspace,dc=pl'
|
||||
ldap_group_dn_format = 'cn={},ou=group,dc=hackerspace,dc=pl'
|
||||
|
||||
ldap_url: str = "ldap://ldap.hackerspace.pl"
|
||||
ldap_base: str = "dc=hackerspace,dc=pl"
|
||||
ldap_people: str = "ou=people,dc=hackerspace,dc=pl"
|
||||
ldap_user_dn_format: str = "uid={},ou=people,dc=hackerspace,dc=pl"
|
||||
ldap_group_dn_format: str = "cn={},ou=group,dc=hackerspace,dc=pl"
|
||||
|
||||
# LDAP user groups allowed to see /admin
|
||||
ldap_admin_groups = os.getenv('LDAPWEB_ADMIN_GROUPS', 'ldap-admin,staff,zarzad').split(',')
|
||||
ldap_admin_groups: List[str] = os.getenv(
|
||||
"LDAPWEB_ADMIN_GROUPS", "ldap-admin,staff,zarzad"
|
||||
).split(",")
|
||||
|
||||
# LDAP user groups indicating that a user is active
|
||||
ldap_active_groups = os.getenv('LDAPWEB_ACTIVE_GROUPS', 'fatty,starving,potato').split(',')
|
||||
ldap_active_groups: List[str] = os.getenv(
|
||||
"LDAPWEB_ACTIVE_GROUPS", "fatty,starving,potato"
|
||||
).split(",")
|
||||
|
||||
# LDAP service user with admin privileges (for admin listings, creating new users)
|
||||
ldap_admin_dn = os.getenv('LDAPWEB_ADMIN_DN', 'cn=ldapweb,ou=services,dc=hackerspace,dc=pl')
|
||||
ldap_admin_password = os.getenv('LDAPWEB_ADMIN_PASSWORD', 'unused')
|
||||
ldap_admin_dn: str = os.getenv(
|
||||
"LDAPWEB_ADMIN_DN", "cn=ldapweb,ou=services,dc=hackerspace,dc=pl"
|
||||
)
|
||||
ldap_admin_password: str = os.getenv("LDAPWEB_ADMIN_PASSWORD", "unused")
|
||||
|
||||
# Protected LDAP user groups
|
||||
# These groups (and their members) cannot be modified by admin UI
|
||||
ldap_protected_groups = (
|
||||
'staff,zarzad,ldap-admin'.split(',') +
|
||||
os.getenv('LDAPWEB_PROTECTED_GROUPS', '').split(',')
|
||||
)
|
||||
ldap_protected_groups: List[str] = "staff,zarzad,ldap-admin".split(",") + os.getenv(
|
||||
"LDAPWEB_PROTECTED_GROUPS", ""
|
||||
).split(",")
|
||||
|
||||
# Email notification (paper trail) configuration
|
||||
smtp_server = 'mail.hackerspace.pl'
|
||||
smtp_format = '{}@hackerspace.pl'
|
||||
smtp_user = os.getenv('LDAPWEB_SMTP_USER', 'ldapweb')
|
||||
smtp_password = os.getenv('LDAPWEB_SMTP_PASSWORD', 'unused')
|
||||
smtp_server: str = "mail.hackerspace.pl"
|
||||
smtp_format: str = "{}@hackerspace.pl"
|
||||
smtp_user: str = os.getenv("LDAPWEB_SMTP_USER", "ldapweb")
|
||||
smtp_password: str = os.getenv("LDAPWEB_SMTP_PASSWORD", "unused")
|
||||
|
||||
papertrail_recipients = os.getenv('LDAPWEB_PAPERTRAIL_RECIPIENTS', 'zarzad@hackerspace.pl')
|
||||
papertrail_recipients: str = os.getenv(
|
||||
"LDAPWEB_PAPERTRAIL_RECIPIENTS", "zarzad@hackerspace.pl"
|
||||
)
|
||||
|
||||
# Avatar server
|
||||
avatar_cache_timeout = int(os.getenv('LDAPWEB_AVATAR_CACHE_TIMEOUT', '1800'))
|
||||
avatar_cache_timeout: int = int(os.getenv("LDAPWEB_AVATAR_CACHE_TIMEOUT", "1800"))
|
||||
|
||||
# LDAP attribute configuration
|
||||
|
||||
readable_names = {
|
||||
'jpegphoto': 'Avatar',
|
||||
'commonname': 'Common Name',
|
||||
'givenname': 'Given Name',
|
||||
'gecos': 'GECOS (public name)',
|
||||
'surname': 'Surname',
|
||||
'loginshell': 'Shell',
|
||||
'telephonenumber': 'Phone Number',
|
||||
'mobiletelephonenumber': 'Mobile Number',
|
||||
'sshpublickey': 'SSH Public Key',
|
||||
'mifareidhash': 'MIFARE ID Hash',
|
||||
'mail': 'Email Adress',
|
||||
'mailroutingaddress': 'Email Adress (external)',
|
||||
readable_names: Dict[str, str] = {
|
||||
"jpegphoto": "Avatar",
|
||||
"commonname": "Common Name",
|
||||
"givenname": "Given Name",
|
||||
"gecos": "GECOS (public name)",
|
||||
"surname": "Surname",
|
||||
"loginshell": "Shell",
|
||||
"telephonenumber": "Phone Number",
|
||||
"mobiletelephonenumber": "Mobile Number",
|
||||
"sshpublickey": "SSH Public Key",
|
||||
"mifareidhash": "MIFARE ID Hash",
|
||||
"mail": "Email Adress",
|
||||
"mailroutingaddress": "Email Adress (external)",
|
||||
}
|
||||
|
||||
full_name = {
|
||||
'cn': 'commonname',
|
||||
'gecos': 'gecos',
|
||||
'sn': 'surname',
|
||||
'mobile': 'mobiletelephonenumber',
|
||||
'l': 'locality',
|
||||
full_name: Dict[str, str] = {
|
||||
"cn": "commonname",
|
||||
"gecos": "gecos",
|
||||
"sn": "surname",
|
||||
"mobile": "mobiletelephonenumber",
|
||||
"l": "locality",
|
||||
}
|
||||
|
||||
can_add = set([
|
||||
'jpegphoto',
|
||||
'telephonenumber',
|
||||
'mobiletelephonenumber',
|
||||
'sshpublickey',
|
||||
])
|
||||
can_delete = can_add
|
||||
can_modify = can_add | set([
|
||||
'jpegphoto',
|
||||
'givenname',
|
||||
'surname',
|
||||
'commonname',
|
||||
'gecos',
|
||||
])
|
||||
can = { 'add': can_add, 'mod': can_modify, 'del': can_delete }
|
||||
admin_required = set()
|
||||
|
||||
default_field = (wtforms.fields.StringField, {})
|
||||
fields = {
|
||||
'jpegphoto': (wtforms.fields.FileField, {'validators': []}),
|
||||
'mobiletelephonenumber': (wtforms.fields.StringField, {'validators': [wtforms.validators.Regexp(r'[+0-9 ]+')]}),
|
||||
'telephonenumber': (wtforms.fields.StringField, {'validators': [wtforms.validators.Regexp(r'[+0-9 ]+')]}),
|
||||
can_add: Set[str] = {
|
||||
"jpegphoto",
|
||||
"telephonenumber",
|
||||
"mobiletelephonenumber",
|
||||
"sshpublickey",
|
||||
}
|
||||
can_delete: Set[str] = can_add
|
||||
can_modify: Set[str] = can_add | {
|
||||
"jpegphoto",
|
||||
"givenname",
|
||||
"surname",
|
||||
"commonname",
|
||||
"gecos",
|
||||
}
|
||||
can: Dict[str, Set[str]] = {
|
||||
"add": can_add,
|
||||
"mod": can_modify,
|
||||
"del": can_delete,
|
||||
"admin": {"mifareidhash"},
|
||||
}
|
||||
|
||||
FormField = Tuple[type[wtforms.Field], Dict[str, Any]]
|
||||
|
||||
default_field: FormField = (wtforms.fields.StringField, {})
|
||||
fields: Dict[str, FormField] = {
|
||||
"jpegphoto": (wtforms.fields.FileField, {"validators": []}),
|
||||
"mobiletelephonenumber": (
|
||||
wtforms.fields.StringField,
|
||||
{"validators": [wtforms.validators.Regexp(r"[+0-9 ]+")]},
|
||||
),
|
||||
"telephonenumber": (
|
||||
wtforms.fields.StringField,
|
||||
{"validators": [wtforms.validators.Regexp(r"[+0-9 ]+")]},
|
||||
),
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
from dataclasses import dataclass
|
||||
import random
|
||||
import string
|
||||
import hashlib
|
||||
|
@ -5,53 +6,87 @@ import hashlib
|
|||
import flask
|
||||
import ldap
|
||||
|
||||
from webapp import app, config, validation, avatar
|
||||
from webapp import config, validation
|
||||
|
||||
class Attr(object):
|
||||
def __init__(self, name, value):
|
||||
from typing import Optional, Dict, List
|
||||
|
||||
|
||||
class Attr:
|
||||
"""
|
||||
A concrete attribute (with value) on a Profile.
|
||||
"""
|
||||
|
||||
name: str
|
||||
readable_name: Optional[str]
|
||||
value: bytes
|
||||
# Hash of name + value, used to uniquely identify an attribute across HTTP
|
||||
# calls.
|
||||
uid: str
|
||||
|
||||
def __init__(self, name: str, value: bytes) -> None:
|
||||
name = validation.sanitize_ldap(name)
|
||||
self.name = name
|
||||
self.readable_name = config.readable_names.get(name, name)
|
||||
self.readable_name = config.readable_names.get(name)
|
||||
self.value = value
|
||||
self.uid = hashlib.sha1(name.encode('utf-8') + value).hexdigest()
|
||||
def __str__(self):
|
||||
return self.value.decode('utf-8')
|
||||
self.uid = hashlib.sha1(name.encode("utf-8") + value).hexdigest()
|
||||
|
||||
def get_dn():
|
||||
return flask.session.get('dn')
|
||||
def __str__(self) -> str:
|
||||
return self.value.decode("utf-8")
|
||||
|
||||
def get_connection(dn = None):
|
||||
dn = dn or get_dn()
|
||||
return app.connections[dn]
|
||||
|
||||
def get_admin_connection():
|
||||
conn = app.connections[config.ldap_admin_dn]
|
||||
if not conn:
|
||||
conn = app.connections.bind(config.ldap_admin_dn, config.ldap_admin_password)
|
||||
return conn
|
||||
@dataclass
|
||||
class LDAPEntry:
|
||||
"""
|
||||
An LDAP entry, eg. a user profile or a group.
|
||||
"""
|
||||
|
||||
def get_profile():
|
||||
return app.profiles[get_dn()]
|
||||
# Map from uid/hash to attr.
|
||||
fields: Dict[str, Attr]
|
||||
# DN of this entry
|
||||
dn: str
|
||||
|
||||
def refresh_profile(dn=None):
|
||||
dn = dn or get_dn()
|
||||
conn = get_connection(dn)
|
||||
if not conn:
|
||||
return # no session, nothing to refresh i guess
|
||||
res = conn.search_s(dn, ldap.SCOPE_SUBTREE)
|
||||
assert(len(res) == 1)
|
||||
profile = {}
|
||||
for attr, vs in res[0][1].items():
|
||||
for v in vs:
|
||||
a = Attr(attr, v)
|
||||
profile[a.uid] = a
|
||||
if attr == 'uid':
|
||||
user_uid = v.decode('utf-8')
|
||||
app.profiles[dn] = profile
|
||||
def __init__(self, conn: ldap.ldapobject, dn: str):
|
||||
res = conn.search_s(dn, ldap.SCOPE_SUBTREE)
|
||||
assert len(res) == 1
|
||||
self.dn = dn
|
||||
self.fields = {}
|
||||
for attr, vs in res[0][1].items():
|
||||
for v in vs:
|
||||
a = Attr(attr, v)
|
||||
self.fields[a.uid] = a
|
||||
|
||||
# bust avatar cache
|
||||
if user_uid:
|
||||
avatar.cache.reset_user(user_uid)
|
||||
avatar.hash_cache.reset()
|
||||
def get_attr(self, attr: str) -> Optional[Attr]:
|
||||
for v in self.fields.values():
|
||||
if v.name == attr:
|
||||
return v
|
||||
return None
|
||||
|
||||
return profile
|
||||
@property
|
||||
def fields_sorted(self) -> List[Attr]:
|
||||
fields = list(self.fields.values())
|
||||
fields.sort(key=lambda attr: attr.name)
|
||||
fields.sort(key=lambda attr: attr.readable_name is None)
|
||||
return fields
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
res = self.get_attr("commonname")
|
||||
assert res is not None
|
||||
return res.value.decode()
|
||||
|
||||
|
||||
class Profile(LDAPEntry):
|
||||
"""
|
||||
A user profile.
|
||||
"""
|
||||
|
||||
# Map from uid/hash to attr.
|
||||
fields: Dict[str, Attr]
|
||||
# DN of this profile
|
||||
dn: str
|
||||
|
||||
@property
|
||||
def username |