capacifier/role.py

93 lines
2.9 KiB
Python

import grp
import ldap
class Context(object):
def __init__(self, **kwargs):
for (k, v) in kwargs.iteritems():
setattr(self, k, v)
class Role(object):
def __call__(self, login, context=None):
pass
def __or__(self, other):
if not isinstance(other, Role):
raise TypeError
return OrRole(self, other)
def __and__(self, other):
if not isinstance(other, Role):
raise TypeError
return AndRole(self, other)
class OrRole(Role):
def __init__(self, *roles):
self.roles = roles
def __call__(self, login, context=None):
return any(role(login, context) for role in self.roles)
class AndRole(Role):
def __init__(self, *roles):
self.roles = roles
def __call__(self, login, context=None):
return all(role(login, context) for role in self.roles)
class Allow(Role):
def __init__(self, *logins):
self.logins = logins
def __call__(self, login, context=None):
return login in self.logins
class Deny(Role):
def __init__(self, *logins):
self.logins = logins
def __call__(self, login, context=None):
return login not in self.logins
class PamGroup(Role):
def __init__(self, name):
self.name = name
def __call__(self, login, context=None):
return login in grp.getgrnam(self.name).gr_mem
class LDAPRole(Role):
def __init__(self, url=None, connection=None, tls=None, binddn=None, bindpw=None):
self.conn = connection
self.url = url
self.tls = tls
self.binddn = binddn
self.bindpw = bindpw
@staticmethod
def make_connection(url, binddn=None, bindpw=None, tls=True):
conn = ldap.initialize(url)
if tls:
conn.start_tls_s()
if binddn:
conn.bind_s(binddn, bindpw)
return conn
def get_connection(self, context):
conn = (hasattr(context, 'ldap_connection') and context.ldap_connection) or \
self.conn or LDAPRole.make_connection(self.url, self.binddn, self.bindpw, self.tls)
return conn
class FilterLDAPRole(LDAPRole):
def __init__(self, dn, scope=ldap.SCOPE_BASE, **kwargs):
LDAPRole.__init__(self, **kwargs)
self.dn = dn
self.scope = scope
def make_filter(self, login):
return self.filter % login
def __call__(self, login, context=None):
conn = self.get_connection(context)
res = conn.search_s(self.dn, self.scope, self.make_filter(login))
return len(res) > 0
class GroupOfNames(FilterLDAPRole):
member_form = '%s'
def __init__(self, dn, member_form=None, **kwargs):
FilterLDAPRole.__init__(self, dn, **kwargs)
self.member_form = member_form or self.member_form
def make_filter(self, login):
return '(&(objectClass=groupOfNames)(member=%s))' % (self.member_form % login)
class PosixGroup(FilterLDAPRole):
filter = '(&(objectClass=posixGroup)(memberuid=%s))'