1
0
Fork 0

hswaw/lib: add flask_spaceauth

Change-Id: I3bb47bb65e739eaf27f54c07f03df18e79b398e0
master
q3k 2019-12-18 14:16:53 +01:00
parent 43189235bd
commit edeb3ccf78
7 changed files with 196 additions and 0 deletions

1
hswaw/lib/flask_spaceauth/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
*.py[co]

View File

@ -0,0 +1,8 @@
py_binary(
name = "example",
srcs = ["example.py"],
deps = [
"//hswaw/lib/flask_spaceauth/spaceauth",
"@pydeps//flask",
]
)

View File

@ -0,0 +1,6 @@
Flask-SpaceAuth
===============
Simple wrapper around Flask-OAuthlib & Flask-Login for quick and dirty
integration of random services with [Warsaw Hackerspace Single
Sign-On](https://sso.hackerspace.pl).

View File

@ -0,0 +1,29 @@
from flask import Flask, request, url_for, Markup
from hswaw.lib.flask_spaceauth.spaceauth import SpaceAuth, login_required, \
cap_required, current_user
app = Flask('spaceauth-example')
app.config['SECRET_KEY'] = 'testing'
app.config['SPACEAUTH_CONSUMER_KEY'] = 'testing'
app.config['SPACEAUTH_CONSUMER_SECRET'] = 'asdTasdfhwqweryrewegfdsfJIxkGc'
auth = SpaceAuth(app)
@app.route('/')
def index():
return Markup('Hey! <a href="%s">Login with spaceauth</a> / %r') % (
url_for('spaceauth.login'), current_user)
@app.route('/profile')
@login_required
def profile():
return Markup('Hey {}!').format(spaceauth.current_user)
@app.route('/staff')
@cap_required('staff')
def staff_only():
return 'This is staff-only zone!'
if __name__ == "__main__":
app.run()

View File

@ -0,0 +1,13 @@
py_library(
name = "spaceauth",
srcs = [
"__init__.py",
"caps.py",
],
visibility = ["//visibility:public"],
deps = [
"@pydeps//blinker",
"@pydeps//flask_login",
"@pydeps//flask_oauthlib",
],
)

View File

@ -0,0 +1,84 @@
from flask import Blueprint, request, url_for, session, redirect, abort, flash
from flask_oauthlib.client import OAuth, OAuthException
from flask_login import LoginManager, login_user, logout_user, current_user, login_required, UserMixin
from .caps import cap_required
class SpaceAuth(LoginManager):
def __init__(self, app=None, *args, **kwargs):
self.oauth = OAuth()
self.remote = self.oauth.remote_app(
'spaceauth',
base_url='https://sso.hackerspace.pl/api/',
access_token_url='https://sso.hackerspace.pl/oauth/token',
authorize_url='https://sso.hackerspace.pl/oauth/authorize',
request_token_params={'scope': 'profile:read'},
app_key='SPACEAUTH')
self.remote.tokengetter(self.tokengetter)
bp = Blueprint('spaceauth', __name__)
bp.add_url_rule('/login', 'login', self.login_view_handler)
bp.add_url_rule('/logout', 'logout', self.logout_view_handler)
bp.add_url_rule('/callback', 'callback', self.callback_view_handler)
self.blueprint = bp
super(SpaceAuth, self).__init__()
self.refresh_view = 'spaceauth.login'
self.login_view = 'spaceauth.login'
self.user_loader(self.user_loader_handler)
if app:
self.init_app(app, *args, **kwargs)
def init_app(self, app, url_prefix='/oauth'):
self.oauth.init_app(app)
super(SpaceAuth, self).init_app(app)
app.register_blueprint(self.blueprint, url_prefix=url_prefix)
@app.errorhandler(OAuthException)
def errorhandler(err):
flash('OAuth error occured', 'error')
return redirect('/')
def login_view_handler(self):
session['spaceauth_next'] = request.args.get('next') or request.referrer
return self.remote.authorize(
callback=url_for('spaceauth.callback', _external=True)
)
def logout_view_handler(self):
# TODO revoke token
session.pop('spaceauth_token', None)
session.pop('spaceauth_next', None)
logout_user()
return redirect('/')
def callback_view_handler(self):
resp = self.remote.authorized_response()
if resp is None:
raise OAuthException(
'Access denied', type=request.args.get('error'))
# TODO encrypt token...?
session['spaceauth_token'] = resp['access_token']
profile = self.remote.get('profile').data
login_user(self.user_loader_handler(profile['username'], profile))
return redirect(session.pop('spaceauth_next', None) or '/')
def tokengetter(self):
return (session.get('spaceauth_token'), '')
def user_loader_handler(self, uid, profile=None):
"""
Default user loader just to differentiate authenticated user from
anonymous.
"""
user = UserMixin()
user.id = uid
return user
def user_profile(self):
return self.remote.get('profile').data

View File

@ -0,0 +1,55 @@
from flask import abort, session, current_app
from flask_login import current_user
from flask_login.signals import user_logged_out
import requests
import functools
import time
def cap_check(capability, user=None):
'''Checks if specified user (or current user) has desired capacifier
capability'''
if not user and not current_user.is_authenticated:
return False
user = user or current_user.get_id()
cache_key = '{}-{}'.format(user, capability)
cached_cap = session.get('_caps', {}).get(cache_key, (False, 0))
if cached_cap[1] > time.time():
return cached_cap[0]
allowed = requests.get(
'https://capacifier.hackerspace.pl/%s/%s' % (capability, user)
).status_code == 200
if '_caps' not in session:
session['_caps'] = {}
session['_caps'][cache_key] = \
(allowed, time.time() + current_app.config.get('CAP_TTL', 3600))
return allowed
@user_logged_out.connect
def caps_cleanup(app, user):
# Cleanup caps cache
session.pop('_caps', None)
def cap_required(capability):
'''Decorator to check if user has desired capacifier capability, returns
403 otherwise'''
def inner(func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
if not cap_check(capability):
abort(403)
return func(*args, **kwargs)
return wrapped
return inner