sso/auth.py

333 lines
9.2 KiB
Python

import ldap
import logging
import re
from datetime import datetime, timedelta
from cached_property import cached_property
import flask
from flask import Flask, render_template, make_response, flash, redirect, url_for
from flask_oauthlib.provider import OAuth2Provider
from flask_login import LoginManager, login_user, login_required, current_user
from flask_sqlalchemy import SQLAlchemy
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField
from wtforms.validators import DataRequired
import requests
logging.basicConfig(level=logging.DEBUG, format='[%(asctime)-15s] %(name)-10s %(levelname)7s: %(message)s')
app = Flask('auth')
app.config.from_object(__name__)
app.config.from_pyfile('auth.cfg')
oauth = OAuth2Provider(app)
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = '/login'
db = SQLAlchemy(app)
class Client(db.Model):
# human readable name
name = db.Column(db.String(40))
# human readable description
description = db.Column(db.String(400))
client_id = db.Column(db.String(40), primary_key=True)
client_secret = db.Column(db.String(55), unique=True, index=True,
nullable=False)
# public or confidential
is_confidential = db.Column(db.Boolean)
redirect_uris_ = db.Column(db.Text)
default_scopes_ = db.Column(db.Text)
@property
def client_type(self):
if self.is_confidential:
return 'confidential'
return 'public'
@property
def redirect_uris(self):
if self.redirect_uris_:
return self.redirect_uris_.split()
return []
@property
def default_redirect_uri(self):
return self.redirect_uris[0]
@property
def default_scopes(self):
if self.default_scopes_:
return self.default_scopes_.split()
return []
def validate_scopes(self, scopes):
return {'profile:read', 'profile:write', 'password:write', \
'users:read'}.issuperset(scopes)
class Grant(db.Model):
id = db.Column(db.Integer, primary_key=True)
user = db.Column(db.String(40), nullable=False)
client_id = db.Column(
db.String(40), db.ForeignKey('client.client_id'),
nullable=False,
)
client = db.relationship('Client')
code = db.Column(db.String(255), index=True, nullable=False)
redirect_uri = db.Column(db.String(255))
expires = db.Column(db.DateTime)
_scopes = db.Column(db.Text)
def delete(self):
db.session.delete(self)
db.session.commit()
return self
@property
def scopes(self):
if self._scopes:
return self._scopes.split()
return []
class Token(db.Model):
id = db.Column(db.Integer, primary_key=True)
client_id = db.Column(
db.String(40), db.ForeignKey('client.client_id'),
nullable=False,
)
client = db.relationship('Client')
user = db.Column(
db.String(40), nullable=False
)
# currently only bearer is supported
token_type = db.Column(db.String(40))
access_token = db.Column(db.String(255), unique=True)
refresh_token = db.Column(db.String(255), unique=True)
expires = db.Column(db.DateTime)
_scopes = db.Column(db.Text)
def delete(self):
db.session.delete(self)
db.session.commit()
return self
@property
def scopes(self):
if self._scopes:
return self._scopes.split()
return []
def connect_to_ldap():
conn = ldap.initialize(app.config['LDAP_URL'])
conn.start_tls_s()
conn.simple_bind(app.config['LDAP_BIND_DN'], app.config['LDAP_BIND_PASSWORD'])
return conn
def check_credentials(username, password):
conn = ldap.initialize(app.config['LDAP_URL'])
conn.start_tls_s()
try:
conn.simple_bind_s(app.config['DN_STRING'] % username, password)
return True
except ldap.LDAPError:
return False
@oauth.clientgetter
def load_client(client_id):
return Client.query.filter_by(client_id=client_id).first()
@oauth.grantgetter
def load_grant(client_id, code):
return Grant.query.filter_by(client_id=client_id, code=code).first()
@oauth.grantsetter
def save_grant(client_id, code, request, *args, **kwargs):
# decide the expires time yourself
expires = datetime.utcnow() + timedelta(seconds=100)
grant = Grant(
client_id=client_id,
code=code['code'],
redirect_uri=request.redirect_uri,
_scopes=' '.join(request.scopes),
user=current_user.username,
expires=expires
)
db.session.add(grant)
db.session.commit()
return grant
@oauth.tokengetter
def load_token(access_token=None, refresh_token=None):
if access_token:
return Token.query.filter_by(access_token=access_token).first()
elif refresh_token:
return Token.query.filter_by(refresh_token=refresh_token).first()
@oauth.tokensetter
def save_token(token, request, *args, **kwargs):
print request
toks = Token.query.filter_by(client_id=request.client.client_id,
user=current_user.username)
# make sure that every client has only one token connected to a user
for t in toks:
db.session.delete(t)
expires_in = token.get('expires_in')
expires = datetime.utcnow() + timedelta(seconds=expires_in)
tok = Token(
access_token=token['access_token'],
refresh_token=token.get('refresh_token'),
token_type=token['token_type'],
_scopes=token['scope'],
expires=expires,
client_id=request.client.client_id,
user=current_user.username,
)
db.session.add(tok)
db.session.commit()
return tok
@app.route('/oauth/authorize', methods=['GET', 'POST'])
@login_required
@oauth.authorize_handler
def authorize(*args, **kwargs):
if flask.request.method == 'GET':
client_id = kwargs.get('client_id')
client = Client.query.filter_by(client_id=client_id).first()
kwargs['client'] = client
kwargs['user'] = current_user
return render_template('oauthorize.html', **kwargs)
confirm = flask.request.form.get('confirm', 'no')
return confirm == 'yes'
@app.route('/oauth/token')
@oauth.token_handler
def access_token():
return None
class LDAPUserProxy(object):
def __init__(self, username):
self.username = re.sub(app.config['STRIP_RE'], '', username)
self.is_authenticated = True
self.is_anonymous = False
conn = connect_to_ldap()
res = conn.search_s(app.config['PEOPLE_BASEDN'], ldap.SCOPE_SUBTREE,
app.config['UID_LDAP_FILTER'] % self.username)
if len(res) != 1:
raise Exception('No such username.')
dn, data = res[0]
self.gecos = None
if 'gecos' in data and data['gecos']:
self.gecos = data['gecos'][0]
self.mifare_hashes = []
if 'mifareIDHash' in data:
self.mifare_hashes = data['mifareIDHash']
self.phone = None
if 'mobile' in data and data['mobile']:
self.phone = data['mobile'][0]
self.personal_email = None
if 'mailRoutingAddress' in data and data['mailRoutingAddress']:
self.personal_email = data['mailRoutingAddress']
def __repr__(self):
active = 'active' if self.is_active else 'inactive'
return '<LDAPUserProxy {}, {}>'.format(self.username, active)
@property
def email(self):
return self.username + '@hackerspace.pl'
@cached_property
def is_active(self):
url = 'https://kasownik.hackerspace.pl/api/judgement/{}.json'
try:
r = requests.get(url.format(self.username))
return bool(r.json()['content'])
except Exception as e:
logging.error("When getting data from Kasownik: {}".format(e))
# Fail-safe.
return True
def get_id(self):
return self.username
class LoginForm(FlaskForm):
username = StringField('username', validators=[DataRequired()])
password = PasswordField('password', validators=[DataRequired()])
@app.route('/profile')
@login_required
def profile():
return 'You are logged in as {}'.format(current_user.email)
@app.route('/login', methods=['GET', 'POST'])
def login():
form = LoginForm()
if form.validate_on_submit():
username, password = form.data['username'], form.data['password']
if not check_credentials(username, password):
flash('Invalid username or password')
return render_template('login_oauth.html', form=form)
login_user(LDAPUserProxy(username))
flash('Logged in successfully.')
next = request.args.get('next')
return redirect(next or url_for('profile'))
return render_template('login_oauth.html', form=form)
@login_manager.user_loader
def load_user(user_id):
return LDAPUserProxy(user_id)
@app.route('/api/profile')
@oauth.require_oauth('profile:read')
def api_profile():
user = LDAPUserProxy(flask.request.oauth.user)
return flask.jsonify(
email=user.email, username=user.username,
gecos=user.gecos, phone=user.phone,
personal_email=user.personal_email)
if __name__ == '__main__':
app.run('0.0.0.0', 8082, debug=True)