split views into blueprints
parent
f82e60e277
commit
3fd36123de
|
@ -5,9 +5,6 @@ from functools import reduce
|
|||
|
||||
app = flask.Flask(__name__)
|
||||
|
||||
# import views to route them
|
||||
import webapp.views
|
||||
|
||||
from webapp import validation, pools, config
|
||||
|
||||
if hasattr(config, "secret_key"):
|
||||
|
@ -47,6 +44,11 @@ def start():
|
|||
validation.sanitize_perms()
|
||||
validation.sanitize_readable()
|
||||
|
||||
from webapp import views
|
||||
from webapp import auth, admin, vcard, passwd
|
||||
for module in (auth, admin, vcard, passwd):
|
||||
app.register_blueprint(module.bp)
|
||||
|
||||
app.connections = pools.LDAPConnectionPool(config.ldap_url, timeout=300.0)
|
||||
def drop_profile(dn):
|
||||
if dn != config.admin_dn:
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
import ldap
|
||||
import re
|
||||
import flask
|
||||
|
||||
from webapp import app, context, config
|
||||
from webapp.auth import login_required
|
||||
|
||||
bp = flask.Blueprint('admin', __name__)
|
||||
|
||||
def _ldap_not_in(patterns):
|
||||
joined_patterns = ''.join(f'({p})' for p in patterns)
|
||||
one_of_pattern = f'(|{joined_patterns})'
|
||||
return f'!{one_of_pattern}'
|
||||
|
||||
def _ldap_get_users_list(conn, query='&'):
|
||||
all_users = []
|
||||
results = conn.search_s(config.ldap_people, ldap.SCOPE_SUBTREE, f'(&(uid=*)(cn=*)({query}))', attrlist=['uid', 'cn'])
|
||||
for user, attrs in results:
|
||||
user_uid = attrs['uid'][0].decode()
|
||||
user_cn = attrs['cn'][0].decode()
|
||||
all_users.append((user_uid, user_cn))
|
||||
|
||||
all_users.sort(key=lambda user: user[0].lower())
|
||||
return all_users
|
||||
|
||||
def _ldap_get_all_users_groupped(conn):
|
||||
group_queries = [
|
||||
(group_name, f'memberOf={pattern}')
|
||||
for group_name, pattern in config.admin_groups.items()
|
||||
]
|
||||
|
||||
groupped_users = [
|
||||
(group_name, _ldap_get_users_list(conn, query))
|
||||
for group_name, query in group_queries
|
||||
]
|
||||
|
||||
other_users_query = _ldap_not_in(query for _, query in group_queries)
|
||||
groupped_users.append(
|
||||
('Other', _ldap_get_users_list(conn, other_users_query))
|
||||
)
|
||||
|
||||
return groupped_users
|
||||
|
||||
@bp.route('/admin/')
|
||||
@login_required
|
||||
def admin_list():
|
||||
if not flask.session['is_admin']:
|
||||
flask.abort(403)
|
||||
|
||||
conn = context.get_connection()
|
||||
user_groups = _ldap_get_all_users_groupped(conn)
|
||||
|
||||
return flask.render_template('admin/list.html', user_groups=user_groups)
|
||||
|
||||
def _ldap_get_user(conn, uid):
|
||||
profile = []
|
||||
|
||||
for user, attrs in conn.search_s(config.dn_format % uid, ldap.SCOPE_SUBTREE):
|
||||
for attr, values in attrs.items():
|
||||
for value in values:
|
||||
profile.append((attr, value.decode()))
|
||||
|
||||
return profile
|
||||
|
||||
def _rendered_ldap_profile(profile):
|
||||
rendered_profile = []
|
||||
for attr, value in profile:
|
||||
attr_sanitized = attr.lower()
|
||||
attr_full_name = config.full_name.get(attr_sanitized, attr_sanitized)
|
||||
attr_readable_name = config.readable_names.get(attr_full_name)
|
||||
rendered_profile.append((attr, attr_readable_name, value))
|
||||
|
||||
# Attributes with readable names first, then by name
|
||||
rendered_profile.sort(key=lambda profile: profile[0])
|
||||
rendered_profile.sort(key=lambda profile: profile[1] is None)
|
||||
return rendered_profile
|
||||
|
||||
def _ldap_get_user_groups(conn, uid):
|
||||
groups = []
|
||||
user_dn = config.dn_format % uid
|
||||
filter = f'(&(objectClass=groupOfUniqueNames)(uniqueMember={user_dn}))'
|
||||
for group_dn, attrs in conn.search_s(config.ldap_base, ldap.SCOPE_SUBTREE, filter):
|
||||
groups.append(attrs['cn'][0].decode())
|
||||
|
||||
return groups
|
||||
|
||||
def _ldap_validate_uid(uid):
|
||||
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9-_]*\Z', uid):
|
||||
raise RuntimeError('Invalid uid')
|
||||
|
||||
@bp.route('/admin/users/<uid>')
|
||||
@login_required
|
||||
def admin_user_view(uid):
|
||||
if not flask.session['is_admin']:
|
||||
flask.abort(403)
|
||||
|
||||
conn = context.get_connection()
|
||||
_ldap_validate_uid(uid)
|
||||
|
||||
profile = _ldap_get_user(conn, uid)
|
||||
groups = _ldap_get_user_groups(conn, uid)
|
||||
|
||||
return flask.render_template('admin/user.html', uid=uid, profile=_rendered_ldap_profile(profile), groups=groups)
|
|
@ -0,0 +1,70 @@
|
|||
import functools
|
||||
import ldap
|
||||
import flask
|
||||
import urllib
|
||||
|
||||
from webapp import app, context, config
|
||||
|
||||
bp = flask.Blueprint('auth', __name__)
|
||||
|
||||
def login_required(f):
|
||||
@functools.wraps(f)
|
||||
def func(*a, **kw):
|
||||
conn = context.get_connection()
|
||||
if not conn:
|
||||
flask.session.clear()
|
||||
flask.flash('You must log in to continue', category='warning')
|
||||
return flask.redirect('/login?' + urllib.parse.urlencode({'goto': flask.request.path}))
|
||||
return f(*a, **kw)
|
||||
return func
|
||||
|
||||
def req_to_ctx():
|
||||
return dict(flask.request.form.items())
|
||||
|
||||
@bp.route('/login', methods=["GET"])
|
||||
def login_form():
|
||||
return flask.render_template('login.html', **req_to_ctx())
|
||||
|
||||
def _connect_to_ldap(dn, password):
|
||||
try:
|
||||
return app.connections.bind(dn, password)
|
||||
except ldap.LDAPError as error_message:
|
||||
print("Could not connect to server:", error_message)
|
||||
return None
|
||||
|
||||
@bp.route('/login', methods=["POST"])
|
||||
def login_action():
|
||||
# LDAP usernames/DNs are case-insensitive, so we normalize them just in
|
||||
# case,
|
||||
username = flask.request.form.get("username", "").lower()
|
||||
password = flask.request.form.get("password", "")
|
||||
goto = flask.request.values.get("goto", "/")
|
||||
dn = config.dn_format % username
|
||||
|
||||
conn = _connect_to_ldap(dn, password)
|
||||
if conn:
|
||||
# Now that we have logged in, we can retrieve the 'real' username (which
|
||||
# might be cased differently from the login name).
|
||||
res = conn.search_s(dn, ldap.SCOPE_SUBTREE)
|
||||
for (k, vs) in res[0][1].items():
|
||||
if k == 'uid':
|
||||
username = vs[0].decode()
|
||||
|
||||
# Check if user belongs to admin group
|
||||
is_admin = bool(conn.search_s(dn, ldap.SCOPE_SUBTREE, f'memberOf={config.ldapweb_admin_group}'))
|
||||
|
||||
flask.session["username"] = username
|
||||
flask.session['dn'] = dn
|
||||
flask.session['is_admin'] = is_admin
|
||||
context.refresh_profile()
|
||||
return flask.redirect(goto)
|
||||
else:
|
||||
flask.flash("Invalid credentials.", category='danger')
|
||||
return login_form()
|
||||
|
||||
@bp.route('/logout')
|
||||
@login_required
|
||||
def logout_action():
|
||||
app.connections.unbind(flask.session['dn'])
|
||||
flask.session.clear()
|
||||
return flask.redirect('/')
|
|
@ -0,0 +1,54 @@
|
|||
import ldap
|
||||
import kerberos
|
||||
import flask
|
||||
import flask_wtf
|
||||
|
||||
from webapp import app, context, config
|
||||
from webapp.auth import login_required
|
||||
|
||||
bp = flask.Blueprint('passwd', __name__)
|
||||
|
||||
@bp.route('/passwd', methods=["GET"])
|
||||
@login_required
|
||||
def passwd_form():
|
||||
return flask.render_template('passwd.html')
|
||||
|
||||
def _passwd_ldap(current, new):
|
||||
conn = context.get_connection()
|
||||
dn = context.get_dn()
|
||||
try:
|
||||
conn.passwd_s(dn, current. new)
|
||||
return True
|
||||
except ldap.LDAPError as e:
|
||||
print('LDAP error:', e)
|
||||
return False
|
||||
|
||||
def _passwd_kadmin(current, new):
|
||||
username = flask.session.get('username')
|
||||
try:
|
||||
principal_name = config.kadmin_principal_map.format(username)
|
||||
return kerberos.changePassword(principal_name, current, new)
|
||||
except Exception as e:
|
||||
print('Kerberos error:', e)
|
||||
logging.exception('kpasswd failed')
|
||||
return False
|
||||
|
||||
@bp.route('/passwd', methods=["POST"])
|
||||
@login_required
|
||||
def passwd_action():
|
||||
current, new, confirm = (flask.request.form[n] for n in ('current', 'new', 'confirm'))
|
||||
if new != confirm:
|
||||
flask.flash(u"New passwords don't match", category='danger')
|
||||
return flask.render_template('passwd.html')
|
||||
|
||||
result = False
|
||||
if config.kadmin_passwd:
|
||||
result = _passwd_kadmin(current, new)
|
||||
else:
|
||||
result = _passwd_ldap(current, new)
|
||||
|
||||
if result:
|
||||
flask.flash(u'Password changed', category='info')
|
||||
else:
|
||||
flask.flash(u'Wrong password', category='danger')
|
||||
return flask.render_template('passwd.html')
|
|
@ -0,0 +1,88 @@
|
|||
import ldap
|
||||
import flask
|
||||
import flask_wtf
|
||||
|
||||
from webapp import app, context, config, validation
|
||||
from webapp.auth import login_required
|
||||
|
||||
bp = flask.Blueprint('vcard', __name__)
|
||||
|
||||
def str_to_ldap(s):
|
||||
return s.encode('utf-8')
|
||||
|
||||
def attr_op(op, attrName, uid = None, templates=config.std_templates, success_redirect='/vcard',
|
||||
fatal_redirect='/vcard'):
|
||||
try:
|
||||
attr, old_value = None, None
|
||||
if uid:
|
||||
attr = context.get_profile()[uid]
|
||||
attrName = attr.name
|
||||
old_value = str(attr)
|
||||
form = DelForm() if op == 'del' else app.forms[attrName](value=old_value)
|
||||
form.attr_data = attr
|
||||
if attrName not in config.can[op]:
|
||||
flask.flash(config.perm_errors[op], 'danger')
|
||||
return flask.redirect(fatal_redirect)
|
||||
if form.validate_on_submit():
|
||||
if op in ['add', 'mod']:
|
||||
new_value = form.value.data
|
||||
admin = attrName in config.admin_required
|
||||
conn = context.get_admin_connection() if admin else context.get_connection()
|
||||
dn = context.get_dn()
|
||||
|
||||
# Most fields should be modified by remove/add.
|
||||
if op == 'mod' and attrName not in ['commonname']:
|
||||
op = 'modreadd'
|
||||
|
||||
if op in ['del', 'modreadd']:
|
||||
conn.modify_s(dn, [(ldap.MOD_DELETE, attrName, str_to_ldap(old_value))])
|
||||
if op in ['add', 'modreadd']:
|
||||
conn.modify_s(dn, [(ldap.MOD_ADD, attrName, str_to_ldap(new_value))])
|
||||
|
||||
if op in ['mod']:
|
||||
conn.modify_s(dn, [(ldap.MOD_REPLACE, attrName, str_to_ldap(new_value))])
|
||||
|
||||
context.refresh_profile()
|
||||
return flask.redirect(success_redirect)
|
||||
if form.is_submitted():
|
||||
for field, errors in form.errors.items():
|
||||
for error in errors:
|
||||
flask.flash(u"Error in the {} field - {}".format(
|
||||
getattr(form, field).label.text,
|
||||
error
|
||||
), 'danger')
|
||||
return flask.redirect(success_redirect)
|
||||
status = 400 if form.is_submitted() else 200
|
||||
return flask.make_response(flask.render_template(templates[op], fatal_redirect=fatal_redirect,
|
||||
attr_op=op, form=form, attr_name=attrName, uid=uid), status)
|
||||
except ldap.LDAPError as e:
|
||||
print('LDAP error:', e)
|
||||
flask.flash('Could not modify profile', 'danger')
|
||||
return flask.redirect(fatal_redirect)
|
||||
|
||||
class DelForm(flask_wtf.FlaskForm):
|
||||
pass
|
||||
|
||||
@bp.route('/vcard', methods=['GET'])
|
||||
@login_required
|
||||
def vcard():
|
||||
data = {}
|
||||
for v in context.get_profile().values():
|
||||
data.setdefault(v.name, []).append(v)
|
||||
return flask.render_template('vcard.html', can_add=config.can['add'],
|
||||
can_modify=config.can['mod'], can_delete=config.can['del'], profile=data)
|
||||
|
||||
@bp.route('/vcard/add/<attrName>', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
def add_attr(attrName):
|
||||
return attr_op('add', attrName)
|
||||
|
||||
@bp.route('/vcard/delete/<uid>', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
def del_attr(uid):
|
||||
return attr_op('del', None, uid)
|
||||
|
||||
@bp.route('/vcard/modify/<uid>', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
def mod_attr(uid):
|
||||
return attr_op('mod', None, uid)
|
293
webapp/views.py
293
webapp/views.py
|
@ -1,299 +1,10 @@
|
|||
import logging
|
||||
import urllib
|
||||
import functools
|
||||
|
||||
import ldap
|
||||
import kerberos
|
||||
import re
|
||||
|
||||
import flask
|
||||
import flask_wtf
|
||||
|
||||
from webapp import app, context, config, validation
|
||||
|
||||
def login_required(f):
|
||||
@functools.wraps(f)
|
||||
def func(*a, **kw):
|
||||
conn = context.get_connection()
|
||||
if not conn:
|
||||
flask.session.clear()
|
||||
flask.flash('You must log in to continue', category='warning')
|
||||
return flask.redirect('/login?' + urllib.parse.urlencode({'goto': flask.request.path}))
|
||||
return f(*a, **kw)
|
||||
return func
|
||||
|
||||
def req_to_ctx():
|
||||
return dict(flask.request.form.items())
|
||||
|
||||
def str_to_ldap(s):
|
||||
return s.encode('utf-8')
|
||||
|
||||
def attr_op(op, attrName, uid = None, templates=config.std_templates, success_redirect='/vcard',
|
||||
fatal_redirect='/vcard'):
|
||||
try:
|
||||
attr, old_value = None, None
|
||||
if uid:
|
||||
attr = context.get_profile()[uid]
|
||||
attrName = attr.name
|
||||
old_value = str(attr)
|
||||
form = DelForm() if op == 'del' else app.forms[attrName](value=old_value)
|
||||
form.attr_data = attr
|
||||
if attrName not in config.can[op]:
|
||||
flask.flash(config.perm_errors[op], 'danger')
|
||||
return flask.redirect(fatal_redirect)
|
||||
if form.validate_on_submit():
|
||||
if op in ['add', 'mod']:
|
||||
new_value = form.value.data
|
||||
admin = attrName in config.admin_required
|
||||
conn = context.get_admin_connection() if admin else context.get_connection()
|
||||
dn = context.get_dn()
|
||||
|
||||
# Most fields should be modified by remove/add.
|
||||
if op == 'mod' and attrName not in ['commonname']:
|
||||
op = 'modreadd'
|
||||
|
||||
if op in ['del', 'modreadd']:
|
||||
conn.modify_s(dn, [(ldap.MOD_DELETE, attrName, str_to_ldap(old_value))])
|
||||
if op in ['add', 'modreadd']:
|
||||
conn.modify_s(dn, [(ldap.MOD_ADD, attrName, str_to_ldap(new_value))])
|
||||
|
||||
if op in ['mod']:
|
||||
conn.modify_s(dn, [(ldap.MOD_REPLACE, attrName, str_to_ldap(new_value))])
|
||||
|
||||
context.refresh_profile()
|
||||
return flask.redirect(success_redirect)
|
||||
if form.is_submitted():
|
||||
for field, errors in form.errors.items():
|
||||
for error in errors:
|
||||
flask.flash(u"Error in the {} field - {}".format(
|
||||
getattr(form, field).label.text,
|
||||
error
|
||||
), 'danger')
|
||||
return flask.redirect(success_redirect)
|
||||
status = 400 if form.is_submitted() else 200
|
||||
return flask.make_response(flask.render_template(templates[op], fatal_redirect=fatal_redirect,
|
||||
attr_op=op, form=form, attr_name=attrName, uid=uid), status)
|
||||
except ldap.LDAPError as e:
|
||||
print('LDAP error:', e)
|
||||
flask.flash('Could not modify profile', 'danger')
|
||||
return flask.redirect(fatal_redirect)
|
||||
|
||||
def connect_to_ldap(dn, password):
|
||||
try:
|
||||
return app.connections.bind(dn, password)
|
||||
except ldap.LDAPError as error_message:
|
||||
print("Could not connect to server:", error_message)
|
||||
return None
|
||||
|
||||
class DelForm(flask_wtf.FlaskForm):
|
||||
pass
|
||||
|
||||
@app.route('/login', methods=["GET"])
|
||||
def login_form():
|
||||
return flask.render_template('login.html', **req_to_ctx())
|
||||
|
||||
@app.route('/login', methods=["POST"])
|
||||
def login_action():
|
||||
# LDAP usernames/DNs are case-insensitive, so we normalize them just in
|
||||
# case,
|
||||
username = flask.request.form.get("username", "").lower()
|
||||
password = flask.request.form.get("password", "")
|
||||
goto = flask.request.values.get("goto", "/")
|
||||
dn = config.dn_format % username
|
||||
|
||||
conn = connect_to_ldap(dn, password)
|
||||
if conn:
|
||||
# Now that we have logged in, we can retrieve the 'real' username (which
|
||||
# might be cased differently from the login name).
|
||||
res = conn.search_s(dn, ldap.SCOPE_SUBTREE)
|
||||
for (k, vs) in res[0][1].items():
|
||||
if k == 'uid':
|
||||
username = vs[0].decode()
|
||||
|
||||
# Check if user belongs to admin group
|
||||
is_admin = bool(conn.search_s(dn, ldap.SCOPE_SUBTREE, f'memberOf={config.ldapweb_admin_group}'))
|
||||
|
||||
flask.session["username"] = username
|
||||
flask.session['dn'] = dn
|
||||
flask.session['is_admin'] = is_admin
|
||||
context.refresh_profile()
|
||||
return flask.redirect(goto)
|
||||
else:
|
||||
flask.flash("Invalid credentials.", category='danger')
|
||||
return login_form()
|
||||
|
||||
@app.route('/logout')
|
||||
@login_required
|
||||
def logout_action():
|
||||
app.connections.unbind(flask.session['dn'])
|
||||
flask.session.clear()
|
||||
return flask.redirect('/')
|
||||
|
||||
@app.route('/passwd', methods=["GET"])
|
||||
@login_required
|
||||
def passwd_form():
|
||||
return flask.render_template('passwd.html')
|
||||
|
||||
def _passwd_ldap(current, new):
|
||||
conn = context.get_connection()
|
||||
dn = context.get_dn()
|
||||
try:
|
||||
conn.passwd_s(dn, current. new)
|
||||
return True
|
||||
except ldap.LDAPError as e:
|
||||
print('LDAP error:', e)
|
||||
return False
|
||||
|
||||
def _passwd_kadmin(current, new):
|
||||
username = flask.session.get('username')
|
||||
try:
|
||||
principal_name = config.kadmin_principal_map.format(username)
|
||||
return kerberos.changePassword(principal_name, current, new)
|
||||
except Exception as e:
|
||||
print('Kerberos error:', e)
|
||||
logging.exception('kpasswd failed')
|
||||
return False
|
||||
|
||||
@app.route('/passwd', methods=["POST"])
|
||||
@login_required
|
||||
def passwd_action():
|
||||
current, new, confirm = (flask.request.form[n] for n in ('current', 'new', 'confirm'))
|
||||
if new != confirm:
|
||||
flask.flash(u"New passwords don't match", category='danger')
|
||||
return flask.render_template('passwd.html')
|
||||
|
||||
result = False
|
||||
if config.kadmin_passwd:
|
||||
result = _passwd_kadmin(current, new)
|
||||
else:
|
||||
result = _passwd_ldap(current, new)
|
||||
|
||||
if result:
|
||||
flask.flash(u'Password changed', category='info')
|
||||
else:
|
||||
flask.flash(u'Wrong password', category='danger')
|
||||
return flask.render_template('passwd.html')
|
||||
from webapp import app, context, config
|
||||
from webapp.auth import login_required
|
||||
|
||||
@app.route("/")
|
||||
@login_required
|
||||
def root():
|
||||
return flask.render_template('root.html', **flask.session)
|
||||
|
||||
@app.route('/vcard', methods=['GET'])
|
||||
@login_required
|
||||
def vcard():
|
||||
data = {}
|
||||
for v in context.get_profile().values():
|
||||
data.setdefault(v.name, []).append(v)
|
||||
return flask.render_template('vcard.html', can_add=config.can['add'],
|
||||
can_modify=config.can['mod'], can_delete=config.can['del'], profile=data)
|
||||
|
||||
@app.route('/vcard/add/<attrName>', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
def add_attr(attrName):
|
||||
return attr_op('add', attrName)
|
||||
|
||||
@app.route('/vcard/delete/<uid>', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
def del_attr(uid):
|
||||
return attr_op('del', None, uid)
|
||||
|
||||
@app.route('/vcard/modify/<uid>', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
def mod_attr(uid):
|
||||
return attr_op('mod', None, uid)
|
||||
|
||||
def _ldap_not_in(patterns):
|
||||
joined_patterns = ''.join(f'({p})' for p in patterns)
|
||||
one_of_pattern = f'(|{joined_patterns})'
|
||||
return f'!{one_of_pattern}'
|
||||
|
||||
def _ldap_get_users_list(conn, query='&'):
|
||||
all_users = []
|
||||
results = conn.search_s(config.ldap_people, ldap.SCOPE_SUBTREE, f'(&(uid=*)(cn=*)({query}))', attrlist=['uid', 'cn'])
|
||||
for user, attrs in results:
|
||||
user_uid = attrs['uid'][0].decode()
|
||||
user_cn = attrs['cn'][0].decode()
|
||||
all_users.append((user_uid, user_cn))
|
||||
|
||||
all_users.sort(key=lambda user: user[0].lower())
|
||||
return all_users
|
||||
|
||||
def _ldap_get_all_users_groupped(conn):
|
||||
group_queries = [
|
||||
(group_name, f'memberOf={pattern}')
|
||||
for group_name, pattern in config.admin_groups.items()
|
||||
]
|
||||
|
||||
groupped_users = [
|
||||
(group_name, _ldap_get_users_list(conn, query))
|
||||
for group_name, query in group_queries
|
||||
]
|
||||
|
||||
other_users_query = _ldap_not_in(query for _, query in group_queries)
|
||||
groupped_users.append(
|
||||
('Other', _ldap_get_users_list(conn, other_users_query))
|
||||
)
|
||||
|
||||
return groupped_users
|
||||
|
||||
@app.route('/admin/')
|
||||
@login_required
|
||||
def admin_list():
|
||||
if not flask.session['is_admin']:
|
||||
flask.abort(403)
|
||||
|
||||
conn = context.get_connection()
|
||||
user_groups = _ldap_get_all_users_groupped(conn)
|
||||
|
||||
return flask.render_template('admin/list.html', user_groups=user_groups)
|
||||
|
||||
def _ldap_get_user(conn, uid):
|
||||
profile = []
|
||||
|
||||
for user, attrs in conn.search_s(config.dn_format % uid, ldap.SCOPE_SUBTREE):
|
||||
for attr, values in attrs.items():
|
||||
for value in values:
|
||||
profile.append((attr, value.decode()))
|
||||
|
||||
return profile
|
||||
|
||||
def _rendered_ldap_profile(profile):
|
||||
rendered_profile = []
|
||||
for attr, value in profile:
|
||||
attr_sanitized = attr.lower()
|
||||
attr_full_name = config.full_name.get(attr_sanitized, attr_sanitized)
|
||||
attr_readable_name = config.readable_names.get(attr_full_name)
|
||||
rendered_profile.append((attr, attr_readable_name, value))
|
||||
|
||||
# Attributes with readable names first, then by name
|
||||
rendered_profile.sort(key=lambda profile: profile[0])
|
||||
rendered_profile.sort(key=lambda profile: profile[1] is None)
|
||||
return rendered_profile
|
||||
|
||||
def _ldap_get_user_groups(conn, uid):
|
||||
groups = []
|
||||
user_dn = config.dn_format % uid
|
||||
filter = f'(&(objectClass=groupOfUniqueNames)(uniqueMember={user_dn}))'
|
||||
for group_dn, attrs in conn.search_s(config.ldap_base, ldap.SCOPE_SUBTREE, filter):
|
||||
groups.append(attrs['cn'][0].decode())
|
||||
|
||||
return groups
|
||||
|
||||
def _ldap_validate_uid(uid):
|
||||
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9-_]*\Z', uid):
|
||||
raise RuntimeError('Invalid uid')
|
||||
|
||||
@app.route('/admin/users/<uid>')
|
||||
@login_required
|
||||
def admin_user_view(uid):
|
||||
if not flask.session['is_admin']:
|
||||
flask.abort(403)
|
||||
|
||||
conn = context.get_connection()
|
||||
_ldap_validate_uid(uid)
|
||||
|
||||
profile = _ldap_get_user(conn, uid)
|
||||
groups = _ldap_get_user_groups(conn, uid)
|
||||
|
||||
return flask.render_template('admin/user.html', uid=uid, profile=_rendered_ldap_profile(profile), groups=groups)
|
||||
|
|
Loading…
Reference in New Issue