import logging import re from cached_property import cached_property from flask import current_app import ldap import requests def check_credentials(username, password): conn = ldap.initialize(current_app.config["LDAP_URL"]) conn.start_tls_s() try: conn.simple_bind_s(current_app.config["DN_STRING"] % username, password) return True except ldap.LDAPError: return False def _connect(): conn = ldap.initialize(current_app.config["LDAP_URL"]) conn.start_tls_s() conn.simple_bind(current_app.config["LDAP_BIND_DN"], current_app.config["LDAP_BIND_PASSWORD"]) return conn class LDAPUser(object): def __init__(self, username, ldap_data): self.username = username self.is_authenticated = True self.is_anonymous = False self.username = ldap_data.get("uid", [None])[0].decode() self.gecos = ldap_data.get("gecos", [None])[0].decode() self.mifare_hashes = [m.decode() for m in ldap_data.get("mifareIDHash", [])] self.phone = ldap_data.get("mobile", [None])[0].decode() self.personal_email = [m.decode() for m in ldap_data.get("mailRoutingAddress", [])] @classmethod def by_login(cls, username): username = re.sub(current_app.config["STRIP_RE"], "", username) conn = _connect() res = conn.search_s( current_app.config["PEOPLE_BASEDN"], ldap.SCOPE_SUBTREE, current_app.config["UID_LDAP_FILTER"] % username, ) if len(res) != 1: raise Exception("No such username.") _, data = res[0] return cls(username, data) def __repr__(self): active = "active" if self.is_active else "inactive" return "".format(self.username, active) @property def email(self): return self.username + "@hackerspace.pl" @cached_property def is_active(self): url = "https://kasownik.hackerspace.pl/api/judgement/{}.json" try: r = requests.get(url.format(self.username)) return bool(r.json()["content"]) except Exception as e: logging.error("When getting data from Kasownik: {}".format(e)) # Fail-safe. return True @cached_property def is_staff(self): url = "https://capacifier.hackerspace.pl/staff/{}" try: r = requests.get(url.format(self.username)) return "YES" in r.text except Exception as e: logging.error("When getting data from Capacifier: {}".format(e)) return False def get_id(self): return self.username