from flask import Blueprint, request, url_for, session, redirect, flash from authlib.common.errors import AuthlibBaseError from authlib.integrations.flask_client import OAuth, OAuthError from flask_login import ( LoginManager, login_user, logout_user, current_user, login_required, UserMixin, ) from spaceauth.caps import cap_required from datetime import datetime, timedelta class SSOUser(UserMixin): def __init__(self, parent, id): self.id = id def __str__(self): return self.id class SpaceAuth(LoginManager): def __init__(self, app=None, domain="https://sso.hackerspace.pl", *args, **kwargs): self.oauth = OAuth() self.domain = domain bp = Blueprint("spaceauth", __name__) bp.add_url_rule("/login", "login", self.login_view_handler) bp.add_url_rule("/logout", "logout", self.logout_view_handler) bp.add_url_rule("/callback", "callback", self.callback_view_handler) self.blueprint = bp super(SpaceAuth, self).__init__() self.refresh_view = "spaceauth.login" self.login_view = "spaceauth.login" self.user_loader(self.user_loader_handler) if app: self.init_app(app, *args, **kwargs) def init_app(self, app, url_prefix="/oauth"): app.config.get("SPACEAUTH_CONSUMER_KEY") self.remote = self.oauth.register( "spaceauth", client_id=app.config["SPACEAUTH_CONSUMER_KEY"], client_secret=app.config["SPACEAUTH_CONSUMER_SECRET"], api_base_url=self.domain + "/api/", access_token_url=self.domain + "/oauth/token", authorize_url=self.domain + "/oauth/authorize", token_endpoint_auth_method="client_secret_post", # server_metadata_url=self.domain + "/.well-known/openid-configuration", scope="profile:read", fetch_token=self.fetch_token, update_token=self.update_token, ) self.oauth.init_app(app) super(SpaceAuth, self).init_app(app) app.register_blueprint(self.blueprint, url_prefix=url_prefix) @app.errorhandler(AuthlibBaseError) def errorhandler(err): flash("OAuth error occured ({})".format(err), "error") return redirect("/") def login_view_handler(self): session["spaceauth_next"] = request.args.get("next") or request.referrer return self.remote.authorize_redirect( redirect_uri=url_for("spaceauth.callback", _external=True) ) def logout_view_handler(self): # TODO revoke token session.pop("spaceauth_token", None) session.pop("spaceauth_next", None) logout_user() return redirect("/") def callback_view_handler(self): try: resp = self.remote.authorize_access_token() self.update_token(resp) return redirect(session.pop("spaceauth_next", None) or "/") except Exception as exc: raise OAuthError( error="Unable to authorize access token", description=str(exc) ) def user_loader_handler(self, uid, profile=None): """ Default user loader just to differentiate authenticated user from anonymous. """ return SSOUser(self, uid) def user_profile(self): return self.remote.get("profile").data def fetch_token(self): return session.get("spaceauth_token") def update_token(self, token, refresh_token=None, access_token=None): session["spaceauth_token"] = token profile = self.remote.get("1/userinfo").json() login_user( self.user_loader_handler(profile["sub"], profile), duration=timedelta(seconds=token["expires_in"]), )