from datetime import datetime from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy() class Client(db.Model): # human readable name name = db.Column(db.String(40)) # human readable description description = db.Column(db.String(400)) client_id = db.Column(db.String(40), primary_key=True) client_secret = db.Column(db.String(55), unique=True, index=True, nullable=False) # public or confidential is_confidential = db.Column(db.Boolean) redirect_uris_ = db.Column(db.Text) default_scopes_ = db.Column(db.Text) # TODO # approved = db.Column(db.Boolean, default=False) approved = True @property def client_type(self): if self.is_confidential: return "confidential" return "public" @property def redirect_uris(self): if self.redirect_uris_: return self.redirect_uris_.split() return [] @property def default_redirect_uri(self): return self.redirect_uris[0] @property def default_scopes(self): if self.default_scopes_: return self.default_scopes_.split() return [] def check_response_type(self, response_type): return True def check_redirect_uri(self, redirect_uri): return redirect_uri in self.redirect_uris def check_requested_scopes(self, scopes): return { "profile:read", "profile:write", "password:write", "users:read", "openid", }.issuperset(scopes) def check_token_endpoint_auth_method(self, method): allowed = ['client_secret_post', 'client_secret_basic'] if not self.is_confidential: allowed.append('none') return method in allowed def check_client_secret(self, secret): return self.client_secret == secret def check_grant_type(self, grant_type): return grant_type in ['authorization_code'] def check_client_type(self, client_type): return client_type == self.client_type class Grant(db.Model): id = db.Column(db.Integer, primary_key=True) user = db.Column(db.String(40), nullable=False) client_id = db.Column( db.String(40), db.ForeignKey("client.client_id"), nullable=False ) client = db.relationship("Client") code = db.Column(db.String(255), index=True, nullable=False) redirect_uri = db.Column(db.String(255)) expires = db.Column(db.DateTime) _scopes = db.Column(db.Text) def is_expired(self): return self.expires < datetime.utcnow() def get_redirect_uri(self): return self.redirect_uri def get_scope(self): return self._scopes class Token(db.Model): id = db.Column(db.Integer, primary_key=True) client_id = db.Column( db.String(40), db.ForeignKey("client.client_id"), nullable=False ) client = db.relationship("Client") user = db.Column(db.String(40), nullable=False) # currently only bearer is supported token_type = db.Column(db.String(40)) access_token = db.Column(db.String(255), unique=True, nullable=False) refresh_token = db.Column(db.String(255)) expires = db.Column(db.DateTime) _scopes = db.Column(db.Text) def delete(self): db.session.delete(self) db.session.commit() return self @property def scopes(self): if self._scopes: return self._scopes.split() return [] def is_expired(self): return self.expires < datetime.utcnow() def delete(self): db.session.delete(self) db.session.commit()