commit bb4ff2d75672226841e18d798f0c61deb21ce18d Author: Tomek Dubrownik Date: Sat Sep 15 07:10:58 2012 +0200 initial commit diff --git a/capacifier.cfg.dist b/capacifier.cfg.dist new file mode 100644 index 0000000..6bbb159 --- /dev/null +++ b/capacifier.cfg.dist @@ -0,0 +1,5 @@ +ROLES_MODULE='hs_roles' +CONTEXT_MAKER='ldap_context.context_maker' +ROLE_FILE='hackerspace.roles' + +LDAP_URL = 'ldap://hackerspace.pl' diff --git a/capacifier.ini.dist b/capacifier.ini.dist new file mode 100644 index 0000000..28ec8d4 --- /dev/null +++ b/capacifier.ini.dist @@ -0,0 +1,8 @@ +[uwsgi] +plugins = python27 +master = 1 +threads = 10 +chdir = /var/www/capacifier +module = capacifier +callable = app +debug = false diff --git a/capacifier.py b/capacifier.py new file mode 100644 index 0000000..ba230fe --- /dev/null +++ b/capacifier.py @@ -0,0 +1,32 @@ +import imp +import role +from flask import Flask, request, make_response, g +app = Flask('capacifier') +app.config.from_pyfile('capacifier.cfg') + +@app.before_first_request +def load_roles(): +# heavily inspired by flask.config + app.roles = {} + rmodule = __import__(app.config['ROLES_MODULE']) + execfile(app.config['ROLE_FILE'], rmodule.__dict__, app.roles) + for name in app.roles.iterkeys(): + if name[0] == '_': + app.roles.pop(name) + cmodule, cfun = app.config['CONTEXT_MAKER'].rsplit('.', 1) + app.maker = getattr(__import__(cmodule), cfun) + +@app.before_request +def run_maker(): + app.maker(app.config) + +@app.route('//') +def check_cap(cap_name, login): + cap = app.roles.get(cap_name, role.Allow()) + code, res = 401, 'NO' + if cap(login, g.context): + code, res = 200, 'YES' + return make_response(res, code, { 'Content-Type': 'text/plain' }) + +if __name__ == '__main__': + app.run('0.0.0.0', 8083, debug=True) diff --git a/hackerspace.roles.dist b/hackerspace.roles.dist new file mode 100644 index 0000000..75c3360 --- /dev/null +++ b/hackerspace.roles.dist @@ -0,0 +1,3 @@ +xmpp = GroupOfNames('cn=xmpp-users,ou=Group,dc=hackerspace,dc=pl') +itanic_shell = PosixGroup('cn=itanic-shell,ou=Group,dc=hackerspace,dc=pl') +wiki_admin = GroupOfNames('cn=admin,dc=wiki,dc=hackerspace,dc=pl') diff --git a/hs_roles.py b/hs_roles.py new file mode 100644 index 0000000..cd7c7d5 --- /dev/null +++ b/hs_roles.py @@ -0,0 +1,5 @@ +import role +class GroupOfNames(role.GroupOfNames): + member_form = 'uid=%s,ou=People,dc=hackerspace,dc=pl' + +from role import Allow, Deny, PamGroup, PosixGroup diff --git a/ldap_context.py b/ldap_context.py new file mode 100644 index 0000000..f70f243 --- /dev/null +++ b/ldap_context.py @@ -0,0 +1,5 @@ +from flask import g +from role import Context, LDAPRole +def context_maker(config): + g.context = Context(ldap_connection= + LDAPRole.make_connection(config['LDAP_URL'], tls=False)) diff --git a/role.py b/role.py new file mode 100644 index 0000000..5c2e553 --- /dev/null +++ b/role.py @@ -0,0 +1,92 @@ +import grp +import ldap + +class Context(object): + def __init__(self, **kwargs): + for (k, v) in kwargs.iteritems(): + setattr(self, k, v) + +class Role(object): + def __call__(self, login, context=None): + pass + def __or__(self, other): + if not isinstance(other, Role): + raise TypeError + return OrRole(self, other) + def __and__(self, other): + if not isinstance(other, Role): + raise TypeError + return AndRole(self, other) + +class OrRole(Role): + def __init__(self, *roles): + self.roles = roles + def __call__(self, login, context=None): + return any(role(login, context) for role in self.roles) + +class AndRole(Role): + def __init__(self, *roles): + self.roles = roles + def __call__(self, login, context=None): + return all(role(login, context) for role in self.roles) + +class Allow(Role): + def __init__(self, *logins): + self.logins = logins + def __call__(self, login, context=None): + return login in self.logins + +class Deny(Role): + def __init__(self, *logins): + self.logins = logins + def __call__(self, login, context=None): + return login not in self.logins + +class PamGroup(Role): + def __init__(self, name): + self.name = name + def __call__(self, login, context=None): + return login in grp.getgrnam(self.name).gr_mem + +class LDAPRole(Role): + def __init__(self, url=None, connection=None, tls=None, binddn=None, bindpw=None): + self.conn = connection + self.url = url + self.tls = tls + self.binddn = binddn + self.bindpw = bindpw + @staticmethod + def make_connection(url, binddn=None, bindpw=None, tls=True): + conn = ldap.initialize(url) + if tls: + conn.start_tls_s() + if binddn: + conn.bind_s(binddn, bindpw) + return conn + def get_connection(self, context): + conn = (hasattr(context, 'ldap_connection') and context.ldap_connection) or \ + self.conn or LDAPRole.make_connection(self.url, self.binddn, self.bindpw, self.tls) + return conn + +class FilterLDAPRole(LDAPRole): + def __init__(self, dn, scope=ldap.SCOPE_BASE, **kwargs): + LDAPRole.__init__(self, **kwargs) + self.dn = dn + self.scope = scope + def make_filter(self, login): + return self.filter % login + def __call__(self, login, context=None): + conn = self.get_connection(context) + res = conn.search_s(self.dn, self.scope, self.make_filter(login)) + return len(res) > 0 + +class GroupOfNames(FilterLDAPRole): + member_form = '%s' + def __init__(self, dn, member_form=None, **kwargs): + FilterLDAPRole.__init__(self, dn, **kwargs) + self.member_form = member_form or self.member_form + def make_filter(self, login): + return '(&(objectClass=groupOfNames)(member=%s))' % (self.member_form % login) + +class PosixGroup(FilterLDAPRole): + filter = '(&(objectClass=posixGroup)(memberuid=%s))' diff --git a/test.py b/test.py new file mode 100644 index 0000000..459fb8f --- /dev/null +++ b/test.py @@ -0,0 +1,23 @@ +import role +import hs_roles +import unittest + +class TestBasicRoles(unittest.TestCase): + def test_pam(self): + tgrp = role.PamGroup('root') + admgrp = role.PamGroup('adm') + self.assertFalse(admgrp('tester')) + self.assertTrue(admgrp('root')) + def test_ldap(self): + c = role.LDAPRole.make_connection('ldap://ldap.hackerspace.pl') + l1 = role.GroupOfNames('cn=xmpp-users,ou=Group,dc=hackerspace,dc=pl') + context = role.Context(ldap_connection=c) + self.assertTrue(l1('uid=ola,ou=People,dc=hackerspace,dc=pl', context)) + l2 = hs_roles.GroupOfNames('cn=xmpp-users,ou=Group,dc=hackerspace,dc=pl') + self.assertTrue(l2('ola', context)) + l3 = role.PosixGroup('cn=staff,ou=Group,dc=hackerspace,dc=pl') + self.assertFalse(l3('tester', context)) + self.assertTrue((l1 | l2)('ola', context)) + +if __name__ == '__main__': + unittest.main()