import ldap import logging import re from datetime import datetime, timedelta from cached_property import cached_property import flask from flask import Flask, render_template, make_response, flash, redirect, url_for from flask_oauthlib.provider import OAuth2Provider from flask_login import ( LoginManager, login_user, logout_user, login_required, current_user, ) from flask_sqlalchemy import SQLAlchemy from flask_wtf import FlaskForm from wtforms import StringField, PasswordField, BooleanField from wtforms.validators import DataRequired import requests logging.basicConfig( level=logging.DEBUG, format="[%(asctime)-15s] %(name)-10s %(levelname)7s: %(message)s", ) app = Flask("auth") app.config.from_object(__name__) app.config.from_pyfile("auth.cfg") oauth = OAuth2Provider(app) login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = "/login" db = SQLAlchemy(app) class Client(db.Model): # human readable name name = db.Column(db.String(40)) # human readable description description = db.Column(db.String(400)) client_id = db.Column(db.String(40), primary_key=True) client_secret = db.Column(db.String(55), unique=True, index=True, nullable=False) # public or confidential is_confidential = db.Column(db.Boolean) redirect_uris_ = db.Column(db.Text) default_scopes_ = db.Column(db.Text) # TODO # approved = db.Column(db.Boolean, default=False) approved = True @property def client_type(self): if self.is_confidential: return "confidential" return "public" @property def redirect_uris(self): if self.redirect_uris_: return self.redirect_uris_.split() return [] @property def default_redirect_uri(self): return self.redirect_uris[0] @property def default_scopes(self): if self.default_scopes_: return self.default_scopes_.split() return [] def validate_scopes(self, scopes): return { "profile:read", "profile:write", "password:write", "users:read", }.issuperset(scopes) class Grant(db.Model): id = db.Column(db.Integer, primary_key=True) user = db.Column(db.String(40), nullable=False) client_id = db.Column( db.String(40), db.ForeignKey("client.client_id"), nullable=False ) client = db.relationship("Client") code = db.Column(db.String(255), index=True, nullable=False) redirect_uri = db.Column(db.String(255)) expires = db.Column(db.DateTime) _scopes = db.Column(db.Text) def delete(self): db.session.delete(self) db.session.commit() return self @property def scopes(self): if self._scopes: return self._scopes.split() return [] class Token(db.Model): id = db.Column(db.Integer, primary_key=True) client_id = db.Column( db.String(40), db.ForeignKey("client.client_id"), nullable=False ) client = db.relationship("Client") user = db.Column(db.String(40), nullable=False) # currently only bearer is supported token_type = db.Column(db.String(40)) access_token = db.Column(db.String(255), unique=True) refresh_token = db.Column(db.String(255), unique=True) expires = db.Column(db.DateTime) _scopes = db.Column(db.Text) def delete(self): db.session.delete(self) db.session.commit() return self @property def scopes(self): if self._scopes: return self._scopes.split() return [] def connect_to_ldap(): conn = ldap.initialize(app.config["LDAP_URL"]) conn.start_tls_s() conn.simple_bind(app.config["LDAP_BIND_DN"], app.config["LDAP_BIND_PASSWORD"]) return conn def check_credentials(username, password): conn = ldap.initialize(app.config["LDAP_URL"]) conn.start_tls_s() try: conn.simple_bind_s(app.config["DN_STRING"] % username, password) return True except ldap.LDAPError: return False @oauth.clientgetter def load_client(client_id): return Client.query.filter_by(client_id=client_id).first() @oauth.grantgetter def load_grant(client_id, code): return Grant.query.filter_by(client_id=client_id, code=code).first() @oauth.grantsetter def save_grant(client_id, code, request, *args, **kwargs): # decide the expires time yourself expires = datetime.utcnow() + timedelta(seconds=100) grant = Grant( client_id=client_id, code=code["code"], redirect_uri=request.redirect_uri, _scopes=" ".join(request.scopes), user=current_user.username, expires=expires, ) db.session.add(grant) db.session.commit() return grant @oauth.tokengetter def load_token(access_token=None, refresh_token=None): if access_token: return Token.query.filter_by(access_token=access_token).first() elif refresh_token: return Token.query.filter_by(refresh_token=refresh_token).first() @oauth.tokensetter def save_token(token, request, *args, **kwargs): toks = Token.query.filter_by(client_id=request.client.client_id, user=request.user) # make sure that every client has only one token connected to a user for t in toks: db.session.delete(t) expires_in = token.get("expires_in") expires = datetime.utcnow() + timedelta(seconds=expires_in) tok = Token( access_token=token["access_token"], refresh_token=token.get("refresh_token"), token_type=token["token_type"], _scopes=token["scope"], expires=expires, client_id=request.client.client_id, user=request.user, ) db.session.add(tok) db.session.commit() return tok @app.route("/oauth/authorize", methods=["GET", "POST"]) @login_required @oauth.authorize_handler def authorize(*args, **kwargs): form = FlaskForm() if Token.query.filter( Token.client_id == kwargs.get("client_id"), Token.user == current_user.username ).count(): # User has unrevoked token already - grant by default return True if not form.validate_on_submit(): client_id = kwargs.get("client_id") client = Client.query.filter_by(client_id=client_id).first() kwargs["client"] = client kwargs["user"] = current_user kwargs["form"] = form return render_template("oauthorize.html", **kwargs) confirm = flask.request.form.get("confirm", "no") return confirm == "yes" @app.route("/oauth/token", methods=["GET", "POST"]) @oauth.token_handler def access_token(): return None class LDAPUserProxy(object): def __init__(self, username): self.username = re.sub(app.config["STRIP_RE"], "", username) self.is_authenticated = True self.is_anonymous = False conn = connect_to_ldap() res = conn.search_s( app.config["PEOPLE_BASEDN"], ldap.SCOPE_SUBTREE, app.config["UID_LDAP_FILTER"] % self.username, ) if len(res) != 1: raise Exception("No such username.") dn, data = res[0] self.username = data.get("uid", [None])[0] self.gecos = data.get("gecos", [None])[0] self.mifare_hashes = data.get("mifareIDHash", []) self.phone = data.get("mobile", [None])[0] self.personal_email = data.get("mailRoutingAddress", []) def __repr__(self): active = "active" if self.is_active else "inactive" return "".format(self.username, active) @property def email(self): return self.username + "@hackerspace.pl" @cached_property def is_active(self): url = "https://kasownik.hackerspace.pl/api/judgement/{}.json" try: r = requests.get(url.format(self.username)) return bool(r.json()["content"]) except Exception as e: logging.error("When getting data from Kasownik: {}".format(e)) # Fail-safe. return True @cached_property def is_staff(self): url = "https://capacifier.hackerspace.pl/staff/{}" try: r = requests.get(url.format(self.username)) return "YES" in r.text except Exception as e: logging.error("When getting data from Capacifier: {}".format(e)) return False def get_id(self): return self.username class LoginForm(FlaskForm): username = StringField("username", validators=[DataRequired()]) password = PasswordField("password", validators=[DataRequired()]) remember = BooleanField("remember me") @app.route("/") @app.route("/profile") @login_required def profile(): return render_template( "profile.html", tokens=Token.query.filter(Token.user == current_user.username) ) @app.route("/token//revoke", methods=["POST"]) @login_required def token_revoke(id): token = Token.query.filter( Token.user == current_user.username, Token.id == id ).first() if not token: flask.abort(404) token.delete() return redirect("/") @app.route("/login", methods=["GET", "POST"]) def login(): form = LoginForm() next = flask.request.args.get("next") if form.validate_on_submit(): username, password = form.data["username"], form.data["password"] if not check_credentials(username, password): flash("Invalid username or password") return render_template("login_oauth.html", form=form, next=next) user = LDAPUserProxy(username) if not user.is_active: flash("User inactive - have you paid your membership fees?") return render_template("login_oauth.html", form=form, next=next) login_user(user, form.data["remember"]) flash("Logged in successfully.") return redirect(next or url_for("profile")) return render_template("login_oauth.html", form=form, next=next) @app.route("/logout") def logout(): logout_user() return redirect("/") @login_manager.user_loader def load_user(user_id): return LDAPUserProxy(user_id) # HSWAW specific endpoint @app.route("/api/profile") @app.route("/api/1/profile") @oauth.require_oauth("profile:read") def api_profile(): user = LDAPUserProxy(flask.request.oauth.user) return flask.jsonify( email=user.email, username=user.username, gecos=user.gecos, phone=user.phone, personal_email=user.personal_email, ) # OpenIDConnect userinfo @app.route("/api/1/userinfo") @oauth.require_oauth("profile:read") def api_userinfo(): user = LDAPUserProxy(flask.request.oauth.user) groups = [] if user.is_staff: groups.append("staff") return flask.jsonify( sub=user.username, name=user.gecos, email=user.email, preferred_username=user.username, nickname=user.username, user_name=user.username, user_id=user.username, groups=groups, ) if __name__ == "__main__": app.run("0.0.0.0", 8082, debug=True)