from authlib.oauth2 import OAuth2Error from flask import Blueprint, render_template, jsonify, request, flash, redirect, current_app from flask_login import login_required, current_user, login_user from .forms import LoginForm from .ldap import LDAPUser, check_credentials from .oauth2 import authorization, require_oauth from .models import Token bp = Blueprint(__name__, 'sso') @bp.route("/") @bp.route("/profile") @login_required def profile(): tokens = Token.query.filter(Token.user == current_user.username) return render_template( "profile.html", tokens=tokens ) @bp.route("/login", methods=["GET", "POST"]) def login(): form = LoginForm() next = request.args.get("next") if form.validate_on_submit(): username, password = form.data["username"], form.data["password"] if not check_credentials(username, password): flash("Invalid username or password") return render_template("login_oauth.html", form=form, next=next) user = LDAPUser.by_login(username) if not user.is_active: flash("User inactive - have you paid your membership fees?") return render_template("login_oauth.html", form=form, next=next) login_user(user, form.data["remember"]) flash("Logged in successfully.") return redirect(next or url_for("profile")) return render_template("login_oauth.html", form=form, next=next) @bp.route("/logout") def logout(): logout_user() return redirect("/") # HSWAW specific endpoint @bp.route("/api/profile") @bp.route("/api/1/profile") @require_oauth("profile:read") def api_profile(): user = LDAPUser.by_login(request.oauth.user) return jsonify( email=user.email, username=user.username, gecos=user.gecos, phone=user.phone, personal_email=user.personal_email, ) # OpenIDConnect userinfo @bp.route("/api/1/userinfo") @require_oauth("profile:read") def api_userinfo(): user = LDAPUser.by_login(request.oauth.user) groups = [] if user.is_staff: groups.append("staff") return jsonify( sub=user.username, name=user.gecos, email=user.email, preferred_username=user.username, nickname=user.username, user_name=user.username, user_id=user.username, groups=groups, ) @bp.route("/oauth/authorize", methods=["GET", "POST"]) @login_required def authorize(): if request.method == 'GET': try: grant = authorization.validate_consent_request(end_user=current_user) except OAuth2Error as error: print(error) return 'Could not authorize: ' + error.error return render_template("oauthorize.html", user=current_user, grant=grant, scopes=grant.request.scope.split()) grant_user = None if request.form['confirm']: grant_user = current_user return authorization.create_authorization_response(grant_user=grant_user) @bp.route("/oauth/token", methods=["GET", "POST"]) def access_token(): return authorization.create_token_response() @bp.route("/token//revoke", methods=["POST"]) @login_required def token_revoke(id): token = Token.query.filter(Token.user == current_user.username, Token.id == id).first() if not token: flask.abort(404) token.delete() return redirect('/') @bp.route("/oauth/revoke", methods=["POST"]) def oauth_token_revoke(): return authorization.create_endpoint_response('revocation') @bp.route("/.well-known/openid-configuration") def oidc_configureation(): issuer = current_app.config['ISSUER_URL'] return jsonify({ "issuer": issuer, "authorization_endpint": issuer + "/oauth/authorize", "token_endpint": issuer + "/oauth/token", "token_endpint": issuer + "/api/1/userinfo", })