initial commit
commit
bb4ff2d756
|
@ -0,0 +1,5 @@
|
|||
ROLES_MODULE='hs_roles'
|
||||
CONTEXT_MAKER='ldap_context.context_maker'
|
||||
ROLE_FILE='hackerspace.roles'
|
||||
|
||||
LDAP_URL = 'ldap://hackerspace.pl'
|
|
@ -0,0 +1,8 @@
|
|||
[uwsgi]
|
||||
plugins = python27
|
||||
master = 1
|
||||
threads = 10
|
||||
chdir = /var/www/capacifier
|
||||
module = capacifier
|
||||
callable = app
|
||||
debug = false
|
|
@ -0,0 +1,32 @@
|
|||
import imp
|
||||
import role
|
||||
from flask import Flask, request, make_response, g
|
||||
app = Flask('capacifier')
|
||||
app.config.from_pyfile('capacifier.cfg')
|
||||
|
||||
@app.before_first_request
|
||||
def load_roles():
|
||||
# heavily inspired by flask.config
|
||||
app.roles = {}
|
||||
rmodule = __import__(app.config['ROLES_MODULE'])
|
||||
execfile(app.config['ROLE_FILE'], rmodule.__dict__, app.roles)
|
||||
for name in app.roles.iterkeys():
|
||||
if name[0] == '_':
|
||||
app.roles.pop(name)
|
||||
cmodule, cfun = app.config['CONTEXT_MAKER'].rsplit('.', 1)
|
||||
app.maker = getattr(__import__(cmodule), cfun)
|
||||
|
||||
@app.before_request
|
||||
def run_maker():
|
||||
app.maker(app.config)
|
||||
|
||||
@app.route('/<cap_name>/<login>')
|
||||
def check_cap(cap_name, login):
|
||||
cap = app.roles.get(cap_name, role.Allow())
|
||||
code, res = 401, 'NO'
|
||||
if cap(login, g.context):
|
||||
code, res = 200, 'YES'
|
||||
return make_response(res, code, { 'Content-Type': 'text/plain' })
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run('0.0.0.0', 8083, debug=True)
|
|
@ -0,0 +1,3 @@
|
|||
xmpp = GroupOfNames('cn=xmpp-users,ou=Group,dc=hackerspace,dc=pl')
|
||||
itanic_shell = PosixGroup('cn=itanic-shell,ou=Group,dc=hackerspace,dc=pl')
|
||||
wiki_admin = GroupOfNames('cn=admin,dc=wiki,dc=hackerspace,dc=pl')
|
|
@ -0,0 +1,5 @@
|
|||
import role
|
||||
class GroupOfNames(role.GroupOfNames):
|
||||
member_form = 'uid=%s,ou=People,dc=hackerspace,dc=pl'
|
||||
|
||||
from role import Allow, Deny, PamGroup, PosixGroup
|
|
@ -0,0 +1,5 @@
|
|||
from flask import g
|
||||
from role import Context, LDAPRole
|
||||
def context_maker(config):
|
||||
g.context = Context(ldap_connection=
|
||||
LDAPRole.make_connection(config['LDAP_URL'], tls=False))
|
|
@ -0,0 +1,92 @@
|
|||
import grp
|
||||
import ldap
|
||||
|
||||
class Context(object):
|
||||
def __init__(self, **kwargs):
|
||||
for (k, v) in kwargs.iteritems():
|
||||
setattr(self, k, v)
|
||||
|
||||
class Role(object):
|
||||
def __call__(self, login, context=None):
|
||||
pass
|
||||
def __or__(self, other):
|
||||
if not isinstance(other, Role):
|
||||
raise TypeError
|
||||
return OrRole(self, other)
|
||||
def __and__(self, other):
|
||||
if not isinstance(other, Role):
|
||||
raise TypeError
|
||||
return AndRole(self, other)
|
||||
|
||||
class OrRole(Role):
|
||||
def __init__(self, *roles):
|
||||
self.roles = roles
|
||||
def __call__(self, login, context=None):
|
||||
return any(role(login, context) for role in self.roles)
|
||||
|
||||
class AndRole(Role):
|
||||
def __init__(self, *roles):
|
||||
self.roles = roles
|
||||
def __call__(self, login, context=None):
|
||||
return all(role(login, context) for role in self.roles)
|
||||
|
||||
class Allow(Role):
|
||||
def __init__(self, *logins):
|
||||
self.logins = logins
|
||||
def __call__(self, login, context=None):
|
||||
return login in self.logins
|
||||
|
||||
class Deny(Role):
|
||||
def __init__(self, *logins):
|
||||
self.logins = logins
|
||||
def __call__(self, login, context=None):
|
||||
return login not in self.logins
|
||||
|
||||
class PamGroup(Role):
|
||||
def __init__(self, name):
|
||||
self.name = name
|
||||
def __call__(self, login, context=None):
|
||||
return login in grp.getgrnam(self.name).gr_mem
|
||||
|
||||
class LDAPRole(Role):
|
||||
def __init__(self, url=None, connection=None, tls=None, binddn=None, bindpw=None):
|
||||
self.conn = connection
|
||||
self.url = url
|
||||
self.tls = tls
|
||||
self.binddn = binddn
|
||||
self.bindpw = bindpw
|
||||
@staticmethod
|
||||
def make_connection(url, binddn=None, bindpw=None, tls=True):
|
||||
conn = ldap.initialize(url)
|
||||
if tls:
|
||||
conn.start_tls_s()
|
||||
if binddn:
|
||||
conn.bind_s(binddn, bindpw)
|
||||
return conn
|
||||
def get_connection(self, context):
|
||||
conn = (hasattr(context, 'ldap_connection') and context.ldap_connection) or \
|
||||
self.conn or LDAPRole.make_connection(self.url, self.binddn, self.bindpw, self.tls)
|
||||
return conn
|
||||
|
||||
class FilterLDAPRole(LDAPRole):
|
||||
def __init__(self, dn, scope=ldap.SCOPE_BASE, **kwargs):
|
||||
LDAPRole.__init__(self, **kwargs)
|
||||
self.dn = dn
|
||||
self.scope = scope
|
||||
def make_filter(self, login):
|
||||
return self.filter % login
|
||||
def __call__(self, login, context=None):
|
||||
conn = self.get_connection(context)
|
||||
res = conn.search_s(self.dn, self.scope, self.make_filter(login))
|
||||
return len(res) > 0
|
||||
|
||||
class GroupOfNames(FilterLDAPRole):
|
||||
member_form = '%s'
|
||||
def __init__(self, dn, member_form=None, **kwargs):
|
||||
FilterLDAPRole.__init__(self, dn, **kwargs)
|
||||
self.member_form = member_form or self.member_form
|
||||
def make_filter(self, login):
|
||||
return '(&(objectClass=groupOfNames)(member=%s))' % (self.member_form % login)
|
||||
|
||||
class PosixGroup(FilterLDAPRole):
|
||||
filter = '(&(objectClass=posixGroup)(memberuid=%s))'
|
|
@ -0,0 +1,23 @@
|
|||
import role
|
||||
import hs_roles
|
||||
import unittest
|
||||
|
||||
class TestBasicRoles(unittest.TestCase):
|
||||
def test_pam(self):
|
||||
tgrp = role.PamGroup('root')
|
||||
admgrp = role.PamGroup('adm')
|
||||
self.assertFalse(admgrp('tester'))
|
||||
self.assertTrue(admgrp('root'))
|
||||
def test_ldap(self):
|
||||
c = role.LDAPRole.make_connection('ldap://ldap.hackerspace.pl')
|
||||
l1 = role.GroupOfNames('cn=xmpp-users,ou=Group,dc=hackerspace,dc=pl')
|
||||
context = role.Context(ldap_connection=c)
|
||||
self.assertTrue(l1('uid=ola,ou=People,dc=hackerspace,dc=pl', context))
|
||||
l2 = hs_roles.GroupOfNames('cn=xmpp-users,ou=Group,dc=hackerspace,dc=pl')
|
||||
self.assertTrue(l2('ola', context))
|
||||
l3 = role.PosixGroup('cn=staff,ou=Group,dc=hackerspace,dc=pl')
|
||||
self.assertFalse(l3('tester', context))
|
||||
self.assertTrue((l1 | l2)('ola', context))
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue