83 lines
2.6 KiB
Python
83 lines
2.6 KiB
Python
import logging
|
|
import re
|
|
|
|
from functools import cached_property
|
|
from flask import current_app
|
|
import ldap
|
|
import requests
|
|
|
|
|
|
def check_credentials(username, password):
|
|
conn = ldap.initialize(current_app.config["LDAP_URL"])
|
|
conn.start_tls_s()
|
|
try:
|
|
conn.simple_bind_s(current_app.config["DN_STRING"] % username, password)
|
|
return True
|
|
except ldap.LDAPError:
|
|
return False
|
|
|
|
def _connect():
|
|
conn = ldap.initialize(current_app.config["LDAP_URL"])
|
|
conn.start_tls_s()
|
|
conn.simple_bind(current_app.config["LDAP_BIND_DN"], current_app.config["LDAP_BIND_PASSWORD"])
|
|
return conn
|
|
|
|
|
|
class LDAPUser(object):
|
|
def __init__(self, username, ldap_data):
|
|
self.username = username
|
|
self.is_authenticated = True
|
|
self.is_anonymous = False
|
|
self.username = ldap_data.get("uid", [None])[0].decode()
|
|
self.gecos = ldap_data.get("gecos", [None])[0].decode()
|
|
self.mifare_hashes = [m.decode() for m in ldap_data.get("mifareIDHash", [])]
|
|
self.phone = ldap_data.get("mobile", [None])[0].decode()
|
|
self.personal_email = [m.decode() for m in ldap_data.get("mailRoutingAddress", [])]
|
|
|
|
@classmethod
|
|
def by_login(cls, username):
|
|
username = re.sub(current_app.config["STRIP_RE"], "", username)
|
|
|
|
conn = _connect()
|
|
res = conn.search_s(
|
|
current_app.config["PEOPLE_BASEDN"],
|
|
ldap.SCOPE_SUBTREE,
|
|
current_app.config["UID_LDAP_FILTER"] % username,
|
|
)
|
|
if len(res) != 1:
|
|
raise Exception("No such username.")
|
|
|
|
_, data = res[0]
|
|
return cls(username, data)
|
|
|
|
def __repr__(self):
|
|
active = "active" if self.is_active else "inactive"
|
|
return "<LDAPUser {}, {}>".format(self.username, active)
|
|
|
|
@property
|
|
def email(self):
|
|
return self.username + "@hackerspace.pl"
|
|
|
|
@cached_property
|
|
def is_active(self):
|
|
url = "https://kasownik.hackerspace.pl/api/judgement/{}.json"
|
|
try:
|
|
r = requests.get(url.format(self.username))
|
|
return bool(r.json()["content"])
|
|
except Exception as e:
|
|
logging.error("When getting data from Kasownik: {}".format(e))
|
|
# Fail-safe.
|
|
return True
|
|
|
|
@cached_property
|
|
def is_staff(self):
|
|
url = "https://capacifier.hackerspace.pl/staff/{}"
|
|
try:
|
|
r = requests.get(url.format(self.username))
|
|
return "YES" in r.text
|
|
except Exception as e:
|
|
logging.error("When getting data from Capacifier: {}".format(e))
|
|
return False
|
|
|
|
def get_id(self):
|
|
return self.username
|