263 lines
8.4 KiB
Python
263 lines
8.4 KiB
Python
import logging
|
|
import urllib
|
|
import functools
|
|
|
|
import ldap
|
|
import kerberos
|
|
|
|
import flask
|
|
import flask_wtf
|
|
|
|
from webapp import app, context, config, validation
|
|
|
|
def login_required(f):
|
|
@functools.wraps(f)
|
|
def func(*a, **kw):
|
|
conn = context.get_connection()
|
|
if not conn:
|
|
flask.session.clear()
|
|
flask.flash('You must log in to continue', category='warning')
|
|
return flask.redirect('/login?' + urllib.parse.urlencode({'goto': flask.request.path}))
|
|
return f(*a, **kw)
|
|
return func
|
|
|
|
def api_access(f):
|
|
@functools.wraps(f)
|
|
def func(*a, **kw):
|
|
dn = flask.request.environ.get('PEER_DN')
|
|
if dn not in config.api_allowed:
|
|
return flask.abort(403)
|
|
return f(*a, **kw)
|
|
return func
|
|
|
|
def req_to_ctx():
|
|
return dict(flask.request.form.items())
|
|
|
|
def str_to_ldap(s):
|
|
return s.encode('utf-8')
|
|
|
|
def attr_op(op, attrName, uid = None, templates=config.std_templates, success_redirect='/vcard',
|
|
fatal_redirect='/vcard'):
|
|
try:
|
|
attr, old_value = None, None
|
|
if uid:
|
|
attr = context.get_profile()[uid]
|
|
attrName = attr.name
|
|
old_value = str(attr)
|
|
form = DelForm() if op == 'del' else app.forms[attrName](value=old_value)
|
|
form.attr_data = attr
|
|
if attrName not in config.can[op]:
|
|
flask.flash(config.perm_errors[op], 'danger')
|
|
return flask.redirect(fatal_redirect)
|
|
if form.validate_on_submit():
|
|
if op in ['add', 'mod']:
|
|
new_value = form.value.data
|
|
admin = attrName in config.admin_required
|
|
conn = context.get_admin_connection() if admin else context.get_connection()
|
|
dn = context.get_dn()
|
|
|
|
# Most fields should be modified by remove/add.
|
|
if op == 'mod' and attrName not in ['commonname']:
|
|
op = 'modreadd'
|
|
|
|
if op in ['del', 'modreadd']:
|
|
conn.modify_s(dn, [(ldap.MOD_DELETE, attrName, str_to_ldap(old_value))])
|
|
if op in ['add', 'modreadd']:
|
|
conn.modify_s(dn, [(ldap.MOD_ADD, attrName, str_to_ldap(new_value))])
|
|
|
|
if op in ['mod']:
|
|
conn.modify_s(dn, [(ldap.MOD_REPLACE, attrName, str_to_ldap(new_value))])
|
|
|
|
context.refresh_profile()
|
|
return flask.redirect(success_redirect)
|
|
if form.is_submitted():
|
|
for field, errors in form.errors.items():
|
|
for error in errors:
|
|
flask.flash(u"Error in the {} field - {}".format(
|
|
getattr(form, field).label.text,
|
|
error
|
|
), 'danger')
|
|
return flask.redirect(success_redirect)
|
|
status = 400 if form.is_submitted() else 200
|
|
return flask.make_response(flask.render_template(templates[op], fatal_redirect=fatal_redirect,
|
|
attr_op=op, form=form, attr_name=attrName, uid=uid), status)
|
|
except ldap.LDAPError as e:
|
|
print('LDAP error:', e)
|
|
flask.flash('Could not modify profile', 'danger')
|
|
return flask.redirect(fatal_redirect)
|
|
|
|
def connect_to_ldap(dn, password):
|
|
try:
|
|
return app.connections.bind(dn, password)
|
|
except ldap.LDAPError as error_message:
|
|
print("Could not connect to server:", error_message)
|
|
return None
|
|
|
|
class DelForm(flask_wtf.FlaskForm):
|
|
pass
|
|
|
|
@app.route('/login', methods=["GET"])
|
|
def login_form():
|
|
return flask.render_template('login.html', **req_to_ctx())
|
|
|
|
@app.route('/login', methods=["POST"])
|
|
def login_action():
|
|
# LDAP usernames/DNs are case-insensitive, so we normalize them just in
|
|
# case,
|
|
username = flask.request.form.get("username", "").lower()
|
|
password = flask.request.form.get("password", "")
|
|
goto = flask.request.values.get("goto", "/")
|
|
dn = config.dn_format % username
|
|
|
|
conn = connect_to_ldap(dn, password)
|
|
if conn:
|
|
# Now that we have logged in, we can retrieve the 'real' username (which
|
|
# might be cased differently from the login name).
|
|
res = conn.search_s(dn, ldap.SCOPE_SUBTREE)
|
|
for (k, vs) in res[0][1].items():
|
|
if k == 'uid':
|
|
username = vs[0].decode()
|
|
|
|
# Check if user is an admin
|
|
is_admin = bool(conn.search_s(dn, ldap.SCOPE_SUBTREE, f'memberOf={config.ldapweb_admin_group}'))
|
|
|
|
flask.session["username"] = username
|
|
flask.session['dn'] = dn
|
|
flask.session['is_admin'] = is_admin
|
|
context.refresh_profile()
|
|
return flask.redirect(goto)
|
|
else:
|
|
flask.flash("Invalid credentials.", category='danger')
|
|
return login_form()
|
|
|
|
@app.route('/logout')
|
|
@login_required
|
|
def logout_action():
|
|
app.connections.unbind(flask.session['dn'])
|
|
flask.session.clear()
|
|
return flask.redirect('/')
|
|
|
|
@app.route('/passwd', methods=["GET"])
|
|
@login_required
|
|
def passwd_form():
|
|
return flask.render_template('passwd.html')
|
|
|
|
def _passwd_ldap(current, new):
|
|
conn = context.get_connection()
|
|
dn = context.get_dn()
|
|
try:
|
|
conn.passwd_s(dn, current. new)
|
|
return True
|
|
except ldap.LDAPError as e:
|
|
print('LDAP error:', e)
|
|
return False
|
|
|
|
def _passwd_kadmin(current, new):
|
|
username = flask.session.get('username')
|
|
try:
|
|
principal_name = config.kadmin_principal_map.format(username)
|
|
return kerberos.changePassword(principal_name, current, new)
|
|
except Exception as e:
|
|
print('Kerberos error:', e)
|
|
logging.exception('kpasswd failed')
|
|
return False
|
|
|
|
@app.route('/passwd', methods=["POST"])
|
|
@login_required
|
|
def passwd_action():
|
|
current, new, confirm = (flask.request.form[n] for n in ('current', 'new', 'confirm'))
|
|
if new != confirm:
|
|
flask.flash(u"New passwords don't match", category='danger')
|
|
return flask.render_template('passwd.html')
|
|
|
|
result = False
|
|
if config.kadmin_passwd:
|
|
result = _passwd_kadmin(current, new)
|
|
else:
|
|
result = _passwd_ldap(current, new)
|
|
|
|
if result:
|
|
flask.flash(u'Password changed', category='info')
|
|
else:
|
|
flask.flash(u'Wrong password', category='danger')
|
|
return flask.render_template('passwd.html')
|
|
|
|
@app.route('/request_token', methods=['GET'])
|
|
@login_required
|
|
def request_token():
|
|
context.generate_token()
|
|
return flask.redirect('/vcard')
|
|
|
|
@app.route("/")
|
|
@login_required
|
|
def root():
|
|
return flask.render_template('root.html', **flask.session)
|
|
|
|
@app.route('/vcard', methods=['GET'])
|
|
@login_required
|
|
def vcard():
|
|
data = {}
|
|
for v in context.get_profile().values():
|
|
data.setdefault(v.name, []).append(v)
|
|
return flask.render_template('vcard.html', token=context.get_token(), can_add=config.can['add'],
|
|
can_modify=config.can['mod'], can_delete=config.can['del'], profile=data)
|
|
|
|
@app.route('/vcard/add/<attrName>', methods=['GET', 'POST'])
|
|
@login_required
|
|
def add_attr(attrName):
|
|
return attr_op('add', attrName)
|
|
|
|
@app.route('/vcard/delete/<uid>', methods=['GET', 'POST'])
|
|
@login_required
|
|
def del_attr(uid):
|
|
return attr_op('del', None, uid)
|
|
|
|
@app.route('/vcard/modify/<uid>', methods=['GET', 'POST'])
|
|
@login_required
|
|
def mod_attr(uid):
|
|
return attr_op('mod', None, uid)
|
|
|
|
def ldap_not_in(patterns):
|
|
joined_patterns = ''.join(f'({p})' for p in patterns)
|
|
one_of_pattern = f'(|{joined_patterns})'
|
|
return f'!{one_of_pattern}'
|
|
|
|
def ldap_get_users_list(conn, query='&'):
|
|
all_users = []
|
|
results = conn.search_s(config.ldap_people, ldap.SCOPE_SUBTREE, f'(&(uid=*)(cn=*)({query}))', attrlist=['uid', 'cn'])
|
|
for user, attrs in results:
|
|
user_uid = attrs['uid'][0].decode()
|
|
user_cn = attrs['cn'][0].decode()
|
|
all_users.append((user_uid, user_cn))
|
|
|
|
all_users.sort(key=lambda user: user[0].lower())
|
|
return all_users
|
|
|
|
def ldap_get_all_users_groupped(conn):
|
|
group_queries = [
|
|
(group_name, f'memberOf={pattern}')
|
|
for group_name, pattern in config.admin_groups.items()
|
|
]
|
|
|
|
groupped_users = [
|
|
(group_name, ldap_get_users_list(conn, query))
|
|
for group_name, query in group_queries
|
|
]
|
|
|
|
other_users_query = ldap_not_in(query for _, query in group_queries)
|
|
groupped_users.append(
|
|
('Other', ldap_get_users_list(conn, other_users_query))
|
|
)
|
|
|
|
return groupped_users
|
|
|
|
@app.route('/admin')
|
|
@login_required
|
|
def admin_list():
|
|
# TODO: check if user is admin
|
|
conn = context.get_connection()
|
|
user_groups = ldap_get_all_users_groupped(conn)
|
|
|
|
return flask.render_template('admin/list.html', user_groups=user_groups)
|
|
|