134 lines
3.8 KiB
Python
134 lines
3.8 KiB
Python
|
from authlib.oauth2 import OAuth2Error
|
||
|
from flask import Blueprint, render_template, jsonify, request, flash, redirect, current_app
|
||
|
from flask_login import login_required, current_user, login_user
|
||
|
|
||
|
from .forms import LoginForm
|
||
|
from .ldap import LDAPUser, check_credentials
|
||
|
from .oauth2 import authorization, require_oauth
|
||
|
from .models import Token
|
||
|
|
||
|
|
||
|
bp = Blueprint(__name__, 'sso')
|
||
|
|
||
|
|
||
|
@bp.route("/")
|
||
|
@bp.route("/profile")
|
||
|
@login_required
|
||
|
def profile():
|
||
|
tokens = Token.query.filter(Token.user == current_user.username)
|
||
|
return render_template(
|
||
|
"profile.html", tokens=tokens
|
||
|
)
|
||
|
|
||
|
|
||
|
@bp.route("/login", methods=["GET", "POST"])
|
||
|
def login():
|
||
|
form = LoginForm()
|
||
|
next = request.args.get("next")
|
||
|
if form.validate_on_submit():
|
||
|
username, password = form.data["username"], form.data["password"]
|
||
|
if not check_credentials(username, password):
|
||
|
flash("Invalid username or password")
|
||
|
return render_template("login_oauth.html", form=form, next=next)
|
||
|
user = LDAPUser.by_login(username)
|
||
|
if not user.is_active:
|
||
|
flash("User inactive - have you paid your membership fees?")
|
||
|
return render_template("login_oauth.html", form=form, next=next)
|
||
|
|
||
|
login_user(user, form.data["remember"])
|
||
|
flash("Logged in successfully.")
|
||
|
|
||
|
return redirect(next or url_for("profile"))
|
||
|
|
||
|
return render_template("login_oauth.html", form=form, next=next)
|
||
|
|
||
|
|
||
|
@bp.route("/logout")
|
||
|
def logout():
|
||
|
logout_user()
|
||
|
return redirect("/")
|
||
|
|
||
|
|
||
|
# HSWAW specific endpoint
|
||
|
@bp.route("/api/profile")
|
||
|
@bp.route("/api/1/profile")
|
||
|
@require_oauth("profile:read")
|
||
|
def api_profile():
|
||
|
user = LDAPUser.by_login(request.oauth.user)
|
||
|
return jsonify(
|
||
|
email=user.email,
|
||
|
username=user.username,
|
||
|
gecos=user.gecos,
|
||
|
phone=user.phone,
|
||
|
personal_email=user.personal_email,
|
||
|
)
|
||
|
|
||
|
|
||
|
# OpenIDConnect userinfo
|
||
|
@bp.route("/api/1/userinfo")
|
||
|
@require_oauth("profile:read")
|
||
|
def api_userinfo():
|
||
|
user = LDAPUser.by_login(request.oauth.user)
|
||
|
groups = []
|
||
|
if user.is_staff:
|
||
|
groups.append("staff")
|
||
|
return jsonify(
|
||
|
sub=user.username,
|
||
|
name=user.gecos,
|
||
|
email=user.email,
|
||
|
preferred_username=user.username,
|
||
|
nickname=user.username,
|
||
|
user_name=user.username,
|
||
|
user_id=user.username,
|
||
|
groups=groups,
|
||
|
)
|
||
|
|
||
|
|
||
|
@bp.route("/oauth/authorize", methods=["GET", "POST"])
|
||
|
@login_required
|
||
|
def authorize():
|
||
|
if request.method == 'GET':
|
||
|
try:
|
||
|
grant = authorization.validate_consent_request(end_user=current_user)
|
||
|
except OAuth2Error as error:
|
||
|
print(error)
|
||
|
return 'Could not authorize: ' + error.error
|
||
|
|
||
|
return render_template("oauthorize.html", user=current_user, grant=grant, scopes=grant.request.scope.split())
|
||
|
|
||
|
grant_user = None
|
||
|
if request.form['confirm']:
|
||
|
grant_user = current_user
|
||
|
|
||
|
return authorization.create_authorization_response(grant_user=grant_user)
|
||
|
|
||
|
|
||
|
@bp.route("/oauth/token", methods=["GET", "POST"])
|
||
|
def access_token():
|
||
|
return authorization.create_token_response()
|
||
|
|
||
|
|
||
|
@bp.route("/token/<int:id>/revoke", methods=["POST"])
|
||
|
@login_required
|
||
|
def token_revoke(id):
|
||
|
token = Token.query.filter(Token.user == current_user.username, Token.id == id).first()
|
||
|
if not token:
|
||
|
flask.abort(404)
|
||
|
token.delete()
|
||
|
return redirect('/')
|
||
|
|
||
|
|
||
|
@bp.route("/oauth/revoke", methods=["POST"])
|
||
|
def oauth_token_revoke():
|
||
|
return authorization.create_endpoint_response('revocation')
|
||
|
|
||
|
@bp.route("/.well-known/openid-configuration")
|
||
|
def oidc_configureation():
|
||
|
issuer = current_app.config['ISSUER_URL']
|
||
|
return jsonify({
|
||
|
"issuer": issuer,
|
||
|
"authorization_endpint": issuer + "/oauth/authorize",
|
||
|
"token_endpint": issuer + "/oauth/token",
|
||
|
"token_endpint": issuer + "/api/1/userinfo",
|
||
|
})
|