from flask import Blueprint, request, url_for, session, redirect, abort, flash from authlib.integrations.flask_client import OAuth, OAuthError from flask_login import LoginManager, login_user, logout_user, current_user, login_required, UserMixin from .caps import cap_required class SpaceAuth(LoginManager): def __init__(self, app=None, *args, **kwargs): self.oauth = OAuth() self.remote = self.oauth.remote_app( 'spaceauth', base_url='https://sso.hackerspace.pl/api/', access_token_url='https://sso.hackerspace.pl/oauth/token', authorize_url='https://sso.hackerspace.pl/oauth/authorize', request_token_params={'scope': 'profile:read'}, app_key='SPACEAUTH') self.remote.tokengetter(self.tokengetter) bp = Blueprint('spaceauth', __name__) bp.add_url_rule('/login', 'login', self.login_view_handler) bp.add_url_rule('/logout', 'logout', self.logout_view_handler) bp.add_url_rule('/callback', 'callback', self.callback_view_handler) self.blueprint = bp super(SpaceAuth, self).__init__() self.refresh_view = 'spaceauth.login' self.login_view = 'spaceauth.login' self.user_loader(self.user_loader_handler) if app: self.init_app(app, *args, **kwargs) def init_app(self, app, url_prefix='/oauth'): self.oauth.init_app(app) super(SpaceAuth, self).init_app(app) app.register_blueprint(self.blueprint, url_prefix=url_prefix) @app.errorhandler(OAuthError) def errorhandler(err): flash('OAuth error occured', 'error') return redirect('/') def login_view_handler(self): session['spaceauth_next'] = request.args.get('next') or request.referrer return self.remote.authorize( callback=url_for('spaceauth.callback', _external=True) ) def logout_view_handler(self): # TODO revoke token session.pop('spaceauth_token', None) session.pop('spaceauth_next', None) logout_user() return redirect('/') def callback_view_handler(self): resp = self.remote.authorized_response() if resp is None: raise OAuthError( 'Access denied', type=request.args.get('error')) # TODO encrypt token...? session['spaceauth_token'] = resp['access_token'] profile = self.remote.get('profile').data login_user(self.user_loader_handler(profile['username'], profile)) return redirect(session.pop('spaceauth_next', None) or '/') def tokengetter(self): return (session.get('spaceauth_token'), '') def user_loader_handler(self, uid, profile=None): """ Default user loader just to differentiate authenticated user from anonymous. """ user = UserMixin() user.id = uid return user def user_profile(self): return self.remote.get('profile').data