From bd6ec1ddc3520ace80eed47e15a3b7aec6a5a42e Mon Sep 17 00:00:00 2001 From: Sergiusz Bazanski Date: Sun, 1 Sep 2019 16:52:22 +0200 Subject: [PATCH] blacken --- auth.py | 223 +++++++++++++++++++++++++++++++------------------------- 1 file changed, 123 insertions(+), 100 deletions(-) diff --git a/auth.py b/auth.py index 1a525a5..8a17793 100644 --- a/auth.py +++ b/auth.py @@ -8,8 +8,13 @@ from cached_property import cached_property import flask from flask import Flask, render_template, make_response, flash, redirect, url_for from flask_oauthlib.provider import OAuth2Provider -from flask_login import LoginManager, login_user, logout_user, \ - login_required, current_user +from flask_login import ( + LoginManager, + login_user, + logout_user, + login_required, + current_user, +) from flask_sqlalchemy import SQLAlchemy from flask_wtf import FlaskForm from wtforms import StringField, PasswordField, BooleanField @@ -17,14 +22,17 @@ from wtforms.validators import DataRequired import requests -logging.basicConfig(level=logging.DEBUG, format='[%(asctime)-15s] %(name)-10s %(levelname)7s: %(message)s') -app = Flask('auth') +logging.basicConfig( + level=logging.DEBUG, + format="[%(asctime)-15s] %(name)-10s %(levelname)7s: %(message)s", +) +app = Flask("auth") app.config.from_object(__name__) -app.config.from_pyfile('auth.cfg') +app.config.from_pyfile("auth.cfg") oauth = OAuth2Provider(app) login_manager = LoginManager() login_manager.init_app(app) -login_manager.login_view = '/login' +login_manager.login_view = "/login" db = SQLAlchemy(app) @@ -35,8 +43,7 @@ class Client(db.Model): description = db.Column(db.String(400)) client_id = db.Column(db.String(40), primary_key=True) - client_secret = db.Column(db.String(55), unique=True, index=True, - nullable=False) + client_secret = db.Column(db.String(55), unique=True, index=True, nullable=False) # public or confidential is_confidential = db.Column(db.Boolean) @@ -51,8 +58,8 @@ class Client(db.Model): @property def client_type(self): if self.is_confidential: - return 'confidential' - return 'public' + return "confidential" + return "public" @property def redirect_uris(self): @@ -71,8 +78,12 @@ class Client(db.Model): return [] def validate_scopes(self, scopes): - return {'profile:read', 'profile:write', 'password:write', \ - 'users:read'}.issuperset(scopes) + return { + "profile:read", + "profile:write", + "password:write", + "users:read", + }.issuperset(scopes) class Grant(db.Model): @@ -81,10 +92,9 @@ class Grant(db.Model): user = db.Column(db.String(40), nullable=False) client_id = db.Column( - db.String(40), db.ForeignKey('client.client_id'), - nullable=False, + db.String(40), db.ForeignKey("client.client_id"), nullable=False ) - client = db.relationship('Client') + client = db.relationship("Client") code = db.Column(db.String(255), index=True, nullable=False) @@ -108,14 +118,11 @@ class Grant(db.Model): class Token(db.Model): id = db.Column(db.Integer, primary_key=True) client_id = db.Column( - db.String(40), db.ForeignKey('client.client_id'), - nullable=False, + db.String(40), db.ForeignKey("client.client_id"), nullable=False ) - client = db.relationship('Client') + client = db.relationship("Client") - user = db.Column( - db.String(40), nullable=False - ) + user = db.Column(db.String(40), nullable=False) # currently only bearer is supported token_type = db.Column(db.String(40)) @@ -138,22 +145,22 @@ class Token(db.Model): def connect_to_ldap(): - conn = ldap.initialize(app.config['LDAP_URL']) + conn = ldap.initialize(app.config["LDAP_URL"]) conn.start_tls_s() - conn.simple_bind(app.config['LDAP_BIND_DN'], app.config['LDAP_BIND_PASSWORD']) + conn.simple_bind(app.config["LDAP_BIND_DN"], app.config["LDAP_BIND_PASSWORD"]) return conn + def check_credentials(username, password): - conn = ldap.initialize(app.config['LDAP_URL']) + conn = ldap.initialize(app.config["LDAP_URL"]) conn.start_tls_s() try: - conn.simple_bind_s(app.config['DN_STRING'] % username, password) + conn.simple_bind_s(app.config["DN_STRING"] % username, password) return True except ldap.LDAPError: return False - @oauth.clientgetter def load_client(client_id): return Client.query.filter_by(client_id=client_id).first() @@ -170,11 +177,11 @@ def save_grant(client_id, code, request, *args, **kwargs): expires = datetime.utcnow() + timedelta(seconds=100) grant = Grant( client_id=client_id, - code=code['code'], + code=code["code"], redirect_uri=request.redirect_uri, - _scopes=' '.join(request.scopes), + _scopes=" ".join(request.scopes), user=current_user.username, - expires=expires + expires=expires, ) db.session.add(grant) db.session.commit() @@ -191,20 +198,19 @@ def load_token(access_token=None, refresh_token=None): @oauth.tokensetter def save_token(token, request, *args, **kwargs): - toks = Token.query.filter_by(client_id=request.client.client_id, - user=request.user) + toks = Token.query.filter_by(client_id=request.client.client_id, user=request.user) # make sure that every client has only one token connected to a user for t in toks: db.session.delete(t) - expires_in = token.get('expires_in') + expires_in = token.get("expires_in") expires = datetime.utcnow() + timedelta(seconds=expires_in) tok = Token( - access_token=token['access_token'], - refresh_token=token.get('refresh_token'), - token_type=token['token_type'], - _scopes=token['scope'], + access_token=token["access_token"], + refresh_token=token.get("refresh_token"), + token_type=token["token_type"], + _scopes=token["scope"], expires=expires, client_id=request.client.client_id, user=request.user, @@ -214,31 +220,31 @@ def save_token(token, request, *args, **kwargs): return tok -@app.route('/oauth/authorize', methods=['GET', 'POST']) +@app.route("/oauth/authorize", methods=["GET", "POST"]) @login_required @oauth.authorize_handler def authorize(*args, **kwargs): form = FlaskForm() if Token.query.filter( - Token.client_id == kwargs.get('client_id'), - Token.user == current_user.username).count(): + Token.client_id == kwargs.get("client_id"), Token.user == current_user.username + ).count(): # User has unrevoked token already - grant by default return True if not form.validate_on_submit(): - client_id = kwargs.get('client_id') + client_id = kwargs.get("client_id") client = Client.query.filter_by(client_id=client_id).first() - kwargs['client'] = client - kwargs['user'] = current_user - kwargs['form'] = form - return render_template('oauthorize.html', **kwargs) + kwargs["client"] = client + kwargs["user"] = current_user + kwargs["form"] = form + return render_template("oauthorize.html", **kwargs) - confirm = flask.request.form.get('confirm', 'no') - return confirm == 'yes' + confirm = flask.request.form.get("confirm", "no") + return confirm == "yes" -@app.route('/oauth/token', methods=['GET', 'POST']) +@app.route("/oauth/token", methods=["GET", "POST"]) @oauth.token_handler def access_token(): return None @@ -246,37 +252,39 @@ def access_token(): class LDAPUserProxy(object): def __init__(self, username): - self.username = re.sub(app.config['STRIP_RE'], '', username) + self.username = re.sub(app.config["STRIP_RE"], "", username) self.is_authenticated = True self.is_anonymous = False conn = connect_to_ldap() - res = conn.search_s(app.config['PEOPLE_BASEDN'], ldap.SCOPE_SUBTREE, - app.config['UID_LDAP_FILTER'] % self.username) + res = conn.search_s( + app.config["PEOPLE_BASEDN"], + ldap.SCOPE_SUBTREE, + app.config["UID_LDAP_FILTER"] % self.username, + ) if len(res) != 1: - raise Exception('No such username.') + raise Exception("No such username.") dn, data = res[0] - self.username = data.get('uid', [None,])[0] - self.gecos = data.get('gecos', [None, ])[0] - self.mifare_hashes = data.get('mifareIDHash', []) - self.phone = data.get('mobile', [None, ])[0] - self.personal_email = data.get('mailRoutingAddress', []) + self.username = data.get("uid", [None])[0] + self.gecos = data.get("gecos", [None])[0] + self.mifare_hashes = data.get("mifareIDHash", []) + self.phone = data.get("mobile", [None])[0] + self.personal_email = data.get("mailRoutingAddress", []) def __repr__(self): - active = 'active' if self.is_active else 'inactive' - return ''.format(self.username, active) - + active = "active" if self.is_active else "inactive" + return "".format(self.username, active) @property def email(self): - return self.username + '@hackerspace.pl' + return self.username + "@hackerspace.pl" @cached_property def is_active(self): - url = 'https://kasownik.hackerspace.pl/api/judgement/{}.json' + url = "https://kasownik.hackerspace.pl/api/judgement/{}.json" try: r = requests.get(url.format(self.username)) - return bool(r.json()['content']) + return bool(r.json()["content"]) except Exception as e: logging.error("When getting data from Kasownik: {}".format(e)) # Fail-safe. @@ -284,10 +292,10 @@ class LDAPUserProxy(object): @cached_property def is_staff(self): - url = 'https://capacifier.hackerspace.pl/staff/{}' + url = "https://capacifier.hackerspace.pl/staff/{}" try: r = requests.get(url.format(self.username)) - return 'YES' in r.text + return "YES" in r.text except Exception as e: logging.error("When getting data from Capacifier: {}".format(e)) return False @@ -297,53 +305,58 @@ class LDAPUserProxy(object): class LoginForm(FlaskForm): - username = StringField('username', validators=[DataRequired()]) - password = PasswordField('password', validators=[DataRequired()]) - remember = BooleanField('remember me') + username = StringField("username", validators=[DataRequired()]) + password = PasswordField("password", validators=[DataRequired()]) + remember = BooleanField("remember me") -@app.route('/') -@app.route('/profile') +@app.route("/") +@app.route("/profile") @login_required def profile(): - return render_template('profile.html', tokens=Token.query.filter(Token.user == current_user.username)) + return render_template( + "profile.html", tokens=Token.query.filter(Token.user == current_user.username) + ) -@app.route('/token//revoke', methods=['POST']) +@app.route("/token//revoke", methods=["POST"]) @login_required def token_revoke(id): - token = Token.query.filter(Token.user == current_user.username, Token.id == id).first() + token = Token.query.filter( + Token.user == current_user.username, Token.id == id + ).first() if not token: flask.abort(404) token.delete() - return redirect('/') + return redirect("/") -@app.route('/login', methods=['GET', 'POST']) +@app.route("/login", methods=["GET", "POST"]) def login(): form = LoginForm() - next = flask.request.args.get('next') + next = flask.request.args.get("next") if form.validate_on_submit(): - username, password = form.data['username'], form.data['password'] + username, password = form.data["username"], form.data["password"] if not check_credentials(username, password): - flash('Invalid username or password') - return render_template('login_oauth.html', form=form, next=next) + flash("Invalid username or password") + return render_template("login_oauth.html", form=form, next=next) user = LDAPUserProxy(username) if not user.is_active: - flash('User inactive - have you paid your membership fees?') - return render_template('login_oauth.html', form=form, next=next) + flash("User inactive - have you paid your membership fees?") + return render_template("login_oauth.html", form=form, next=next) - login_user(user, form.data['remember']) - flash('Logged in successfully.') + login_user(user, form.data["remember"]) + flash("Logged in successfully.") - return redirect(next or url_for('profile')) + return redirect(next or url_for("profile")) - return render_template('login_oauth.html', form=form, next=next) + return render_template("login_oauth.html", form=form, next=next) -@app.route('/logout') + +@app.route("/logout") def logout(): logout_user() - return redirect('/') + return redirect("/") @login_manager.user_loader @@ -352,29 +365,39 @@ def load_user(user_id): # HSWAW specific endpoint -@app.route('/api/profile') -@app.route('/api/1/profile') -@oauth.require_oauth('profile:read') +@app.route("/api/profile") +@app.route("/api/1/profile") +@oauth.require_oauth("profile:read") def api_profile(): user = LDAPUserProxy(flask.request.oauth.user) return flask.jsonify( - email=user.email, username=user.username, - gecos=user.gecos, phone=user.phone, - personal_email=user.personal_email) + email=user.email, + username=user.username, + gecos=user.gecos, + phone=user.phone, + personal_email=user.personal_email, + ) # OpenIDConnect userinfo -@app.route('/api/1/userinfo') -@oauth.require_oauth('profile:read') +@app.route("/api/1/userinfo") +@oauth.require_oauth("profile:read") def api_userinfo(): user = LDAPUserProxy(flask.request.oauth.user) groups = [] if user.is_staff: - groups.append('staff') - return flask.jsonify(sub=user.username, name=user.gecos, email=user.email, - preferred_username=user.username, nickname=user.username, - user_name=user.username, user_id=user.username, groups=groups) + groups.append("staff") + return flask.jsonify( + sub=user.username, + name=user.gecos, + email=user.email, + preferred_username=user.username, + nickname=user.username, + user_name=user.username, + user_id=user.username, + groups=groups, + ) -if __name__ == '__main__': - app.run('0.0.0.0', 8082, debug=True) +if __name__ == "__main__": + app.run("0.0.0.0", 8082, debug=True)