flask-spaceauth/spaceauth/__init__.py

111 lines
3.7 KiB
Python

from flask import Blueprint, request, url_for, session, redirect, flash
from authlib.common.errors import AuthlibBaseError
from authlib.integrations.flask_client import OAuth, OAuthError
from flask_login import (
LoginManager,
login_user,
logout_user,
current_user,
login_required,
UserMixin,
)
from spaceauth.caps import cap_required
from datetime import datetime, timedelta
class SSOUser(UserMixin):
def __init__(self, parent, id):
self.id = id
def __str__(self):
return self.id
class SpaceAuth(LoginManager):
def __init__(self, app=None, domain="https://sso.hackerspace.pl", *args, **kwargs):
self.oauth = OAuth()
self.domain = domain
bp = Blueprint("spaceauth", __name__)
bp.add_url_rule("/login", "login", self.login_view_handler)
bp.add_url_rule("/logout", "logout", self.logout_view_handler)
bp.add_url_rule("/callback", "callback", self.callback_view_handler)
self.blueprint = bp
super(SpaceAuth, self).__init__()
self.refresh_view = "spaceauth.login"
self.login_view = "spaceauth.login"
self.user_loader(self.user_loader_handler)
if app:
self.init_app(app, *args, **kwargs)
def init_app(self, app, url_prefix="/oauth"):
app.config.get("SPACEAUTH_CONSUMER_KEY")
self.remote = self.oauth.register(
"spaceauth",
client_id=app.config["SPACEAUTH_CONSUMER_KEY"],
client_secret=app.config["SPACEAUTH_CONSUMER_SECRET"],
api_base_url=self.domain + "/api/",
access_token_url=self.domain + "/oauth/token",
authorize_url=self.domain + "/oauth/authorize",
token_endpoint_auth_method="client_secret_post",
# server_metadata_url=self.domain + "/.well-known/openid-configuration",
scope="profile:read",
fetch_token=self.fetch_token,
update_token=self.update_token,
)
self.oauth.init_app(app)
super(SpaceAuth, self).init_app(app)
app.register_blueprint(self.blueprint, url_prefix=url_prefix)
@app.errorhandler(AuthlibBaseError)
def errorhandler(err):
flash("OAuth error occured ({})".format(err), "error")
return redirect("/")
def login_view_handler(self):
session["spaceauth_next"] = request.args.get("next") or request.referrer
return self.remote.authorize_redirect(
redirect_uri=url_for("spaceauth.callback", _external=True)
)
def logout_view_handler(self):
# TODO revoke token
session.pop("spaceauth_token", None)
session.pop("spaceauth_next", None)
logout_user()
return redirect("/")
def callback_view_handler(self):
try:
resp = self.remote.authorize_access_token()
self.update_token(resp)
return redirect(session.pop("spaceauth_next", None) or "/")
except Exception as exc:
raise OAuthError(
error="Unable to authorize access token", description=str(exc)
)
def user_loader_handler(self, uid, profile=None):
"""
Default user loader just to differentiate authenticated user from
anonymous.
"""
return SSOUser(self, uid)
def user_profile(self):
return self.remote.get("profile").data
def fetch_token(self):
return session.get("spaceauth_token")
def update_token(self, token, refresh_token=None, access_token=None):
session["spaceauth_token"] = token
profile = self.remote.get("1/userinfo").json()
login_user(
self.user_loader_handler(profile["sub"], profile),
duration=timedelta(seconds=token["expires_in"]),
)