master
q3k 2020-02-24 19:46:38 +01:00
parent bd6ec1ddc3
commit 69aa2b3d11
13 changed files with 595 additions and 426 deletions

3
app.py Normal file
View File

@ -0,0 +1,3 @@
from website.app import create_app
app = create_app()

403
auth.py
View File

@ -1,403 +0,0 @@
import ldap
import logging
import re
from datetime import datetime, timedelta
from cached_property import cached_property
import flask
from flask import Flask, render_template, make_response, flash, redirect, url_for
from flask_oauthlib.provider import OAuth2Provider
from flask_login import (
LoginManager,
login_user,
logout_user,
login_required,
current_user,
)
from flask_sqlalchemy import SQLAlchemy
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField
from wtforms.validators import DataRequired
import requests
logging.basicConfig(
level=logging.DEBUG,
format="[%(asctime)-15s] %(name)-10s %(levelname)7s: %(message)s",
)
app = Flask("auth")
app.config.from_object(__name__)
app.config.from_pyfile("auth.cfg")
oauth = OAuth2Provider(app)
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "/login"
db = SQLAlchemy(app)
class Client(db.Model):
# human readable name
name = db.Column(db.String(40))
# human readable description
description = db.Column(db.String(400))
client_id = db.Column(db.String(40), primary_key=True)
client_secret = db.Column(db.String(55), unique=True, index=True, nullable=False)
# public or confidential
is_confidential = db.Column(db.Boolean)
redirect_uris_ = db.Column(db.Text)
default_scopes_ = db.Column(db.Text)
# TODO
# approved = db.Column(db.Boolean, default=False)
approved = True
@property
def client_type(self):
if self.is_confidential:
return "confidential"
return "public"
@property
def redirect_uris(self):
if self.redirect_uris_:
return self.redirect_uris_.split()
return []
@property
def default_redirect_uri(self):
return self.redirect_uris[0]
@property
def default_scopes(self):
if self.default_scopes_:
return self.default_scopes_.split()
return []
def validate_scopes(self, scopes):
return {
"profile:read",
"profile:write",
"password:write",
"users:read",
}.issuperset(scopes)
class Grant(db.Model):
id = db.Column(db.Integer, primary_key=True)
user = db.Column(db.String(40), nullable=False)
client_id = db.Column(
db.String(40), db.ForeignKey("client.client_id"), nullable=False
)
client = db.relationship("Client")
code = db.Column(db.String(255), index=True, nullable=False)
redirect_uri = db.Column(db.String(255))
expires = db.Column(db.DateTime)
_scopes = db.Column(db.Text)
def delete(self):
db.session.delete(self)
db.session.commit()
return self
@property
def scopes(self):
if self._scopes:
return self._scopes.split()
return []
class Token(db.Model):
id = db.Column(db.Integer, primary_key=True)
client_id = db.Column(
db.String(40), db.ForeignKey("client.client_id"), nullable=False
)
client = db.relationship("Client")
user = db.Column(db.String(40), nullable=False)
# currently only bearer is supported
token_type = db.Column(db.String(40))
access_token = db.Column(db.String(255), unique=True)
refresh_token = db.Column(db.String(255), unique=True)
expires = db.Column(db.DateTime)
_scopes = db.Column(db.Text)
def delete(self):
db.session.delete(self)
db.session.commit()
return self
@property
def scopes(self):
if self._scopes:
return self._scopes.split()
return []
def connect_to_ldap():
conn = ldap.initialize(app.config["LDAP_URL"])
conn.start_tls_s()
conn.simple_bind(app.config["LDAP_BIND_DN"], app.config["LDAP_BIND_PASSWORD"])
return conn
def check_credentials(username, password):
conn = ldap.initialize(app.config["LDAP_URL"])
conn.start_tls_s()
try:
conn.simple_bind_s(app.config["DN_STRING"] % username, password)
return True
except ldap.LDAPError:
return False
@oauth.clientgetter
def load_client(client_id):
return Client.query.filter_by(client_id=client_id).first()
@oauth.grantgetter
def load_grant(client_id, code):
return Grant.query.filter_by(client_id=client_id, code=code).first()
@oauth.grantsetter
def save_grant(client_id, code, request, *args, **kwargs):
# decide the expires time yourself
expires = datetime.utcnow() + timedelta(seconds=100)
grant = Grant(
client_id=client_id,
code=code["code"],
redirect_uri=request.redirect_uri,
_scopes=" ".join(request.scopes),
user=current_user.username,
expires=expires,
)
db.session.add(grant)
db.session.commit()
return grant
@oauth.tokengetter
def load_token(access_token=None, refresh_token=None):
if access_token:
return Token.query.filter_by(access_token=access_token).first()
elif refresh_token:
return Token.query.filter_by(refresh_token=refresh_token).first()
@oauth.tokensetter
def save_token(token, request, *args, **kwargs):
toks = Token.query.filter_by(client_id=request.client.client_id, user=request.user)
# make sure that every client has only one token connected to a user
for t in toks:
db.session.delete(t)
expires_in = token.get("expires_in")
expires = datetime.utcnow() + timedelta(seconds=expires_in)
tok = Token(
access_token=token["access_token"],
refresh_token=token.get("refresh_token"),
token_type=token["token_type"],
_scopes=token["scope"],
expires=expires,
client_id=request.client.client_id,
user=request.user,
)
db.session.add(tok)
db.session.commit()
return tok
@app.route("/oauth/authorize", methods=["GET", "POST"])
@login_required
@oauth.authorize_handler
def authorize(*args, **kwargs):
form = FlaskForm()
if Token.query.filter(
Token.client_id == kwargs.get("client_id"), Token.user == current_user.username
).count():
# User has unrevoked token already - grant by default
return True
if not form.validate_on_submit():
client_id = kwargs.get("client_id")
client = Client.query.filter_by(client_id=client_id).first()
kwargs["client"] = client
kwargs["user"] = current_user
kwargs["form"] = form
return render_template("oauthorize.html", **kwargs)
confirm = flask.request.form.get("confirm", "no")
return confirm == "yes"
@app.route("/oauth/token", methods=["GET", "POST"])
@oauth.token_handler
def access_token():
return None
class LDAPUserProxy(object):
def __init__(self, username):
self.username = re.sub(app.config["STRIP_RE"], "", username)
self.is_authenticated = True
self.is_anonymous = False
conn = connect_to_ldap()
res = conn.search_s(
app.config["PEOPLE_BASEDN"],
ldap.SCOPE_SUBTREE,
app.config["UID_LDAP_FILTER"] % self.username,
)
if len(res) != 1:
raise Exception("No such username.")
dn, data = res[0]
self.username = data.get("uid", [None])[0]
self.gecos = data.get("gecos", [None])[0]
self.mifare_hashes = data.get("mifareIDHash", [])
self.phone = data.get("mobile", [None])[0]
self.personal_email = data.get("mailRoutingAddress", [])
def __repr__(self):
active = "active" if self.is_active else "inactive"
return "<LDAPUserProxy {}, {}>".format(self.username, active)
@property
def email(self):
return self.username + "@hackerspace.pl"
@cached_property
def is_active(self):
url = "https://kasownik.hackerspace.pl/api/judgement/{}.json"
try:
r = requests.get(url.format(self.username))
return bool(r.json()["content"])
except Exception as e:
logging.error("When getting data from Kasownik: {}".format(e))
# Fail-safe.
return True
@cached_property
def is_staff(self):
url = "https://capacifier.hackerspace.pl/staff/{}"
try:
r = requests.get(url.format(self.username))
return "YES" in r.text
except Exception as e:
logging.error("When getting data from Capacifier: {}".format(e))
return False
def get_id(self):
return self.username
class LoginForm(FlaskForm):
username = StringField("username", validators=[DataRequired()])
password = PasswordField("password", validators=[DataRequired()])
remember = BooleanField("remember me")
@app.route("/")
@app.route("/profile")
@login_required
def profile():
return render_template(
"profile.html", tokens=Token.query.filter(Token.user == current_user.username)
)
@app.route("/token/<int:id>/revoke", methods=["POST"])
@login_required
def token_revoke(id):
token = Token.query.filter(
Token.user == current_user.username, Token.id == id
).first()
if not token:
flask.abort(404)
token.delete()
return redirect("/")
@app.route("/login", methods=["GET", "POST"])
def login():
form = LoginForm()
next = flask.request.args.get("next")
if form.validate_on_submit():
username, password = form.data["username"], form.data["password"]
if not check_credentials(username, password):
flash("Invalid username or password")
return render_template("login_oauth.html", form=form, next=next)
user = LDAPUserProxy(username)
if not user.is_active:
flash("User inactive - have you paid your membership fees?")
return render_template("login_oauth.html", form=form, next=next)
login_user(user, form.data["remember"])
flash("Logged in successfully.")
return redirect(next or url_for("profile"))
return render_template("login_oauth.html", form=form, next=next)
@app.route("/logout")
def logout():
logout_user()
return redirect("/")
@login_manager.user_loader
def load_user(user_id):
return LDAPUserProxy(user_id)
# HSWAW specific endpoint
@app.route("/api/profile")
@app.route("/api/1/profile")
@oauth.require_oauth("profile:read")
def api_profile():
user = LDAPUserProxy(flask.request.oauth.user)
return flask.jsonify(
email=user.email,
username=user.username,
gecos=user.gecos,
phone=user.phone,
personal_email=user.personal_email,
)
# OpenIDConnect userinfo
@app.route("/api/1/userinfo")
@oauth.require_oauth("profile:read")
def api_userinfo():
user = LDAPUserProxy(flask.request.oauth.user)
groups = []
if user.is_staff:
groups.append("staff")
return flask.jsonify(
sub=user.username,
name=user.gecos,
email=user.email,
preferred_username=user.username,
nickname=user.username,
user_name=user.username,
user_id=user.username,
groups=groups,
)
if __name__ == "__main__":
app.run("0.0.0.0", 8082, debug=True)

View File

@ -1,7 +1,11 @@
import random
import string
from auth import app, Client, db
from website.app import create_app
from website.models import Client, db
app = create_app()
app.app_context().push()
app_name = ""
app_description = ""
@ -10,15 +14,15 @@ confidential = ""
redirect_uris = ""
while not app_name:
app_name = raw_input("Application name (ie. Printmaster): ").strip()
app_name = input("Application name (ie. Printmaster): ").strip()
while not app_description:
app_description = raw_input("Application description (ie. Print control software): ").strip()
app_description = input("Application description (ie. Print control software): ").strip()
while not client_id:
client_id = raw_input("OAuth Client ID (ie. printmaster): ").strip()
client_id = input("OAuth Client ID (ie. printmaster): ").strip()
while not confidential:
confidential = raw_input("Is the client confidential? Say yes for web apps, no for mobile apps: [yn] ").strip()
confidential = input("Is the client confidential? Say yes for web apps, no for mobile apps: [yn] ").strip()
while not redirect_uris:
redirect_uris = raw_input("Whitespace-delimited redirect URIs: ").strip()
redirect_uris = input("Whitespace-delimited redirect URIs: ").strip()
if confidential.lower().startswith('y'):
confidential = True
@ -26,25 +30,25 @@ else:
confidential = False
print "\n\nSummary\n-------"
print "Application name:", app_name
print "Application description:", app_description
print "Client ID:", client_id
print "Confidential client:", confidential
print "Redirect URIs:", redirect_uris
print("\n\nSummary\n-------")
print("Application name:", app_name)
print("Application description:", app_description)
print("Client ID:", client_id)
print("Confidential client:", confidential)
print("Redirect URIs:", redirect_uris)
while raw_input("Type YES to continue. ") != "YES":
while input("Type YES to continue. ") != "YES":
pass
c = Client()
c.name = app_name
c.description = app_description
c.client_id = client_id
c.client_secret = ''.join([random.choice(string.uppercase + string.lowercase) for _ in range(32)])
c.client_secret = ''.join([random.choice("0123456789abcdef") for _ in range(32)])
c.is_confidential = confidential
c.redirect_uris_ = redirect_uris
db.session.add(c)
db.session.commit()
print "Client secret:", c.client_secret
print("Client secret:", c.client_secret)

26
requirements.txt Normal file
View File

@ -0,0 +1,26 @@
asn1crypto==0.24.0
Authlib==0.11
cached-property==1.5.1
certifi==2019.6.16
cffi==1.12.3
chardet==3.0.4
Click==7.0
cryptography==2.7
Flask==1.1.1
Flask-Login==0.4.1
Flask-SQLAlchemy==2.4.0
Flask-WTF==0.14.2
idna==2.8
itsdangerous==1.1.0
Jinja2==2.10.1
MarkupSafe==1.1.1
pyasn1==0.4.6
pyasn1-modules==0.2.6
pycparser==2.19
python-ldap==3.2.0
requests==2.22.0
six==1.12.0
SQLAlchemy==1.3.8
urllib3==1.25.3
Werkzeug==0.15.5
WTForms==2.2.1

View File

@ -3,12 +3,12 @@
{% block content %}
<div class="container" id="authorize-container">
<center><img src="/static/hswaw_wht.svg" style="width: 50%;"/></center>
<h2>{{ client.name }}
{% if client.approved %}<small title="This application is approved."><sup><i class="glyphicon glyphicon-ok-circle text-success"></i></sup></small>{% endif %}
<h2>{{ grant.client.name }}
{% if grant.client.approved %}<small title="This application is approved."><sup><i class="glyphicon glyphicon-ok-circle text-success"></i></sup></small>{% endif %}
</h2>
<h4>This app would like to:</h4>
<ul class="list-group">
{% if 'profile:read' in scopes and 'profile:write' not in scopes %}
{% if ('profile:read' in scopes or 'openid' in scopes) and 'profile:write' not in scopes %}
<li class="list-group-item">
<span class="glyphicon glyphicon-user" aria-hidden="true"></span>
Read your profile data.
@ -34,12 +34,11 @@
{% endif %}
</ul>
<h4 style="margin-bottom: 20px;">On your ({{user.username}}) behalf.</h4>
<form action="/oauth/authorize" method="post">
{{ form.csrf_token }}
<input type="hidden" name="client_id" value="{{ client.client_id }}">
<form action="" method="post">
<!--<input type="hidden" name="client_id" value="{{ grant.client.client_id }}">
<input type="hidden" name="scope" value="{{ scopes|join(' ') }}">
<input type="hidden" name="response_type" value="{{ response_type }}">
<input type="hidden" name="redirect_uri" value="{{ redirect_uri }}">
<input type="hidden" name="redirect_uri" value="{{ redirect_uri }}">-->
{% if state %}
<input type="hidden" name="state" value="{{ state }}">
{% endif %}

View File

@ -29,7 +29,7 @@
</td>
<td>{{ token.expires }}</td>
<td>
<form class="text-right" method="post" action="{{ url_for('token_revoke', id=token.id) }}">
<form class="text-right" method="post" action="{{ url_for('website.routes.token_revoke', id=token.id)}}">
{# FIXME <input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>#}
<button class="btn btn-danger btn-xs">Revoke <i class="glyphicon glyphicon-remove"></i></button>
</form>

0
website/__init__.py Normal file
View File

33
website/app.py Normal file
View File

@ -0,0 +1,33 @@
import logging
from flask import Flask
from flask_login import LoginManager
from .models import db
from .routes import bp
from .ldap import LDAPUser
from .oauth2 import config_oauth
def create_app():
app = Flask("auth")
app.config.from_object(__name__)
app.config.from_pyfile("auth.cfg")
setup_app(app)
return app
def setup_app(app):
logging.basicConfig(
level=logging.DEBUG,
format="[%(asctime)-15s] %(name)-10s %(levelname)7s: %(message)s",
)
db.init_app(app)
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = "/login"
login_manager.user_loader(LDAPUser.by_login)
config_oauth(app)
app.register_blueprint(bp, url_prefix='')

9
website/forms.py Normal file
View File

@ -0,0 +1,9 @@
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField
from wtforms.validators import DataRequired
class LoginForm(FlaskForm):
username = StringField("username", validators=[DataRequired()])
password = PasswordField("password", validators=[DataRequired()])
remember = BooleanField("remember me")

85
website/ldap.py Normal file
View File

@ -0,0 +1,85 @@
import logging
import re
from cached_property import cached_property
from flask import current_app
import ldap
import requests
def check_credentials(username, password):
conn = ldap.initialize(current_app.config["LDAP_URL"])
conn.start_tls_s()
try:
conn.simple_bind_s(current_app.config["DN_STRING"] % username, password)
return True
except ldap.LDAPError:
return False
def _connect():
conn = ldap.initialize(current_app.config["LDAP_URL"])
conn.start_tls_s()
conn.simple_bind(current_app.config["LDAP_BIND_DN"], current_app.config["LDAP_BIND_PASSWORD"])
return conn
class LDAPUser(object):
def __init__(self, username, ldap_data):
self.username = username
self.is_authenticated = True
self.is_anonymous = False
self.username = ldap_data.get("uid", [None])[0].decode()
self.gecos = ldap_data.get("gecos", [None])[0].decode()
self.mifare_hashes = [m.decode() for m in ldap_data.get("mifareIDHash", [])]
self.phone = ldap_data.get("mobile", [None])[0].decode()
self.personal_email = [m.decode() for m in ldap_data.get("mailRoutingAddress", [])]
@classmethod
def by_login(cls, username):
username = re.sub(current_app.config["STRIP_RE"], "", username)
conn = _connect()
res = conn.search_s(
current_app.config["PEOPLE_BASEDN"],
ldap.SCOPE_SUBTREE,
current_app.config["UID_LDAP_FILTER"] % username,
)
if len(res) != 1:
raise Exception("No such username.")
_, data = res[0]
return cls(username, data)
def __repr__(self):
active = "active" if self.is_active else "inactive"
return "<LDAPUser {}, {}>".format(self.username, active)
@property
def email(self):
return self.username + "@hackerspace.pl"
@cached_property
def is_active(self):
url = "https://kasownik.hackerspace.pl/api/judgement/{}.json"
try:
r = requests.get(url.format(self.username))
return bool(r.json()["content"])
except Exception as e:
logging.error("When getting data from Kasownik: {}".format(e))
# Fail-safe.
return True
@cached_property
def is_staff(self):
url = "https://capacifier.hackerspace.pl/staff/{}"
try:
r = requests.get(url.format(self.username))
return "YES" in r.text
except Exception as e:
logging.error("When getting data from Capacifier: {}".format(e))
return False
def get_id(self):
return self.username

142
website/models.py Normal file
View File

@ -0,0 +1,142 @@
from datetime import datetime
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class Client(db.Model):
# human readable name
name = db.Column(db.String(40))
# human readable description
description = db.Column(db.String(400))
client_id = db.Column(db.String(40), primary_key=True)
client_secret = db.Column(db.String(55), unique=True, index=True, nullable=False)
# public or confidential
is_confidential = db.Column(db.Boolean)
redirect_uris_ = db.Column(db.Text)
default_scopes_ = db.Column(db.Text)
# TODO
# approved = db.Column(db.Boolean, default=False)
approved = True
@property
def client_type(self):
if self.is_confidential:
return "confidential"
return "public"
@property
def redirect_uris(self):
if self.redirect_uris_:
return self.redirect_uris_.split()
return []
@property
def default_redirect_uri(self):
return self.redirect_uris[0]
@property
def default_scopes(self):
if self.default_scopes_:
return self.default_scopes_.split()
return []
def check_response_type(self, response_type):
return True
def check_redirect_uri(self, redirect_uri):
return redirect_uri in self.redirect_uris
def check_requested_scopes(self, scopes):
return {
"profile:read",
"profile:write",
"password:write",
"users:read",
"openid",
}.issuperset(scopes)
def check_token_endpoint_auth_method(self, method):
allowed = ['client_secret_post', 'client_secret_basic']
if not self.is_confidential:
allowed.append('none')
return method in allowed
def check_client_secret(self, secret):
return self.client_secret == secret
def check_grant_type(self, grant_type):
return grant_type in ['authorization_code']
def check_client_type(self, client_type):
return client_type == self.client_type
class Grant(db.Model):
id = db.Column(db.Integer, primary_key=True)
user = db.Column(db.String(40), nullable=False)
client_id = db.Column(
db.String(40), db.ForeignKey("client.client_id"), nullable=False
)
client = db.relationship("Client")
code = db.Column(db.String(255), index=True, nullable=False)
redirect_uri = db.Column(db.String(255))
expires = db.Column(db.DateTime)
_scopes = db.Column(db.Text)
def is_expired(self):
return self.expires < datetime.utcnow()
def get_redirect_uri(self):
return self.redirect_uri
def get_scope(self):
return self._scopes
class Token(db.Model):
id = db.Column(db.Integer, primary_key=True)
client_id = db.Column(
db.String(40), db.ForeignKey("client.client_id"), nullable=False
)
client = db.relationship("Client")
user = db.Column(db.String(40), nullable=False)
# currently only bearer is supported
token_type = db.Column(db.String(40))
access_token = db.Column(db.String(255), unique=True, nullable=False)
refresh_token = db.Column(db.String(255))
expires = db.Column(db.DateTime)
_scopes = db.Column(db.Text)
def delete(self):
db.session.delete(self)
db.session.commit()
return self
@property
def scopes(self):
if self._scopes:
return self._scopes.split()
return []
def is_expired(self):
return self.expires < datetime.utcnow()
def delete(self):
db.session.delete(self)
db.session.commit()

138
website/oauth2.py Normal file
View File

@ -0,0 +1,138 @@
from datetime import datetime, timedelta
from authlib.flask.oauth2 import AuthorizationServer, ResourceProtector
from authlib.oauth2.rfc6749 import grants
from authlib.oidc.core import grants as oidgrants
from authlib.oauth2.rfc6750 import BearerTokenValidator
from authlib.oauth2.rfc7009 import RevocationEndpoint
from werkzeug.security import gen_salt
from .models import db
from .models import Client, Grant, Token
from .ldap import LDAPUser, check_credentials
class AuthorizationCodeGrant(grants.AuthorizationCodeGrant):
def create_authorization_code(self, client, user, request):
code = gen_salt(48)
expires = datetime.utcnow() + timedelta(seconds=100)
item = Grant(
code=code,
client_id=client.client_id,
redirect_uri=request.redirect_uri,
_scopes=request.scope,
user=user.username,
expires=expires,
)
db.session.add(item)
db.session.commit()
return code
def parse_authorization_code(self, code, client):
item = Grant.query.filter_by(
code=code, client_id=client.client_id).first()
if item and not item.is_expired():
return item
def delete_authorization_code(self, authorization_code):
db.session.delete(authorization_code)
db.session.commit()
def authenticate_user(self, authorization_code):
return LDAPUser.by_login(authorization_code.user)
class PasswordGrant(grants.ResourceOwnerPasswordCredentialsGrant):
def authenticate_user(self, username, password):
if not check_credentials(username, password):
return None
return LDAPUser.by_login(username)
class RefreshTokenGrant(grants.RefreshTokenGrant):
def authenticate_refresh_token(self, refresh_token):
token = Token.query.filter_by(refresh_token=refresh_token).first()
if token and not token.is_expired():
return token
def authenticate_user(self, credential):
return LDAPUser.by_login(credentials.user_id)
def query_client(client_id):
return Client.query.filter_by(client_id=client_id).first()
def save_token(token, request):
user_id = None
if request.user:
user_id = request.user.username
client = request.client
# HACK: convert to model field names
t = dict(
token_type=token['token_type'],
access_token=token.get('access_token'),
refresh_token=token.get('refresh_token'),
expires=datetime.utcnow() + timedelta(seconds=token['expires_in']),
_scopes=token['scope'],
)
item = Token(client_id=client.client_id, user=user_id, **t)
db.session.add(item)
db.session.commit()
authorization = AuthorizationServer(
query_client=query_client,
save_token=save_token,
)
require_oauth = ResourceProtector()
class _BearerTokenValidator(BearerTokenValidator):
def authenticate_token(self, token_string):
return Token.query.filter_by(access_token=token_string).first()
def request_invalid(self, request):
return False
def token_revoked(self,token):
return False
class _RevocationEndpoint(RevocationEndpoint):
def query_token(self, token, token_type_hint, client):
q = Token.query.filter_by(client_id=client.client_id)
if token_type_hint == 'access_token':
return q.filter_by(access_token=token).first()
elif token_type_hint == 'refresh_token':
return q.filter_by(refresh_token=token).first()
token = q.filter_by(access_token=token).first()
if token is None:
return token
return q.filter_by(refresh_token=token).first()
def revoke_token(self, token):
token.delete()
def config_oauth(app):
authorization.init_app(app)
# support all grants
authorization.register_grant(grants.ImplicitGrant)
authorization.register_grant(grants.ClientCredentialsGrant)
authorization.register_grant(AuthorizationCodeGrant)
authorization.register_grant(PasswordGrant)
authorization.register_grant(RefreshTokenGrant)
# support revocation
authorization.register_endpoint(_RevocationEndpoint)
# protect resource
require_oauth.register_token_validator(_BearerTokenValidator())

133
website/routes.py Normal file
View File

@ -0,0 +1,133 @@
from authlib.oauth2 import OAuth2Error
from flask import Blueprint, render_template, jsonify, request, flash, redirect, current_app
from flask_login import login_required, current_user, login_user
from .forms import LoginForm
from .ldap import LDAPUser, check_credentials
from .oauth2 import authorization, require_oauth
from .models import Token
bp = Blueprint(__name__, 'sso')
@bp.route("/")
@bp.route("/profile")
@login_required
def profile():
tokens = Token.query.filter(Token.user == current_user.username)
return render_template(
"profile.html", tokens=tokens
)
@bp.route("/login", methods=["GET", "POST"])
def login():
form = LoginForm()
next = request.args.get("next")
if form.validate_on_submit():
username, password = form.data["username"], form.data["password"]
if not check_credentials(username, password):
flash("Invalid username or password")
return render_template("login_oauth.html", form=form, next=next)
user = LDAPUser.by_login(username)
if not user.is_active:
flash("User inactive - have you paid your membership fees?")
return render_template("login_oauth.html", form=form, next=next)
login_user(user, form.data["remember"])
flash("Logged in successfully.")
return redirect(next or url_for("profile"))
return render_template("login_oauth.html", form=form, next=next)
@bp.route("/logout")
def logout():
logout_user()
return redirect("/")
# HSWAW specific endpoint
@bp.route("/api/profile")
@bp.route("/api/1/profile")
@require_oauth("profile:read")
def api_profile():
user = LDAPUser.by_login(request.oauth.user)
return jsonify(
email=user.email,
username=user.username,
gecos=user.gecos,
phone=user.phone,
personal_email=user.personal_email,
)
# OpenIDConnect userinfo
@bp.route("/api/1/userinfo")
@require_oauth("profile:read")
def api_userinfo():
user = LDAPUser.by_login(request.oauth.user)
groups = []
if user.is_staff:
groups.append("staff")
return jsonify(
sub=user.username,
name=user.gecos,
email=user.email,
preferred_username=user.username,
nickname=user.username,
user_name=user.username,
user_id=user.username,
groups=groups,
)
@bp.route("/oauth/authorize", methods=["GET", "POST"])
@login_required
def authorize():
if request.method == 'GET':
try:
grant = authorization.validate_consent_request(end_user=current_user)
except OAuth2Error as error:
print(error)
return 'Could not authorize: ' + error.error
return render_template("oauthorize.html", user=current_user, grant=grant, scopes=grant.request.scope.split())
grant_user = None
if request.form['confirm']:
grant_user = current_user
return authorization.create_authorization_response(grant_user=grant_user)
@bp.route("/oauth/token", methods=["GET", "POST"])
def access_token():
return authorization.create_token_response()
@bp.route("/token/<int:id>/revoke", methods=["POST"])
@login_required
def token_revoke(id):
token = Token.query.filter(Token.user == current_user.username, Token.id == id).first()
if not token:
flask.abort(404)
token.delete()
return redirect('/')
@bp.route("/oauth/revoke", methods=["POST"])
def oauth_token_revoke():
return authorization.create_endpoint_response('revocation')
@bp.route("/.well-known/openid-configuration")
def oidc_configureation():
issuer = current_app.config['ISSUER_URL']
return jsonify({
"issuer": issuer,
"authorization_endpint": issuer + "/oauth/authorize",
"token_endpint": issuer + "/oauth/token",
"token_endpint": issuer + "/api/1/userinfo",
})