hs-admin/hsadmin/cmd.py

206 lines
6.5 KiB
Python

import os
from pkg_resources import resource_filename, cleanup_resources, resource_string
import atexit
from pathlib import Path
atexit.register(cleanup_resources, force=True)
os.environ["KRB5_CONFIG"] = str(
Path(resource_filename(__name__, "krb5.conf")).resolve()
)
import kadmin
import getpass
import argparse
import secrets
import string
import shutil
import logging
from ldap3 import Server, Connection, LEVEL, MODIFY_REPLACE
from ldap3.utils.conv import escape_filter_chars
from ldap3.utils.dn import escape_rdn
import subprocess
from contextlib import contextmanager
import smtplib
from jinja2 import Template
from email.message import EmailMessage
FROM_LDAP = object()
parser = argparse.ArgumentParser()
parser.add_argument("--admin", default=getpass.getuser())
parser.add_argument("--verbose", action="store_true")
subparsers = parser.add_subparsers(dest="cmd", help="command")
reset_password = subparsers.add_parser(
"reset_password",
help="change user password to newly generated one and send it to his email address from LDAP",
)
reset_password.add_argument("user")
reset_password.add_argument(
"--show-password", action="store_true", help="print generated password"
)
reset_password.add_argument("email_address", default=FROM_LDAP, nargs="?")
APG_CMD = shutil.which("apg")
def generage_password(length=15):
if APG_CMD is None:
logging.warning("apg command not found. Using built in password generator")
pool = string.ascii_lowercase + string.ascii_uppercase + string.digits
password = "".join([secrets.choice(pool) for _ in range(length)])
else:
password = (
subprocess.run(
[APG_CMD, "-m", str(length), "-n", "1", "-M", "NCL"],
check=True,
capture_output=True,
)
.stdout.decode()
.strip()
)
if len(password) != length:
raise Exception("Password generation failed")
return password
def main():
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
if args.cmd == "reset_password":
admin_pass = getpass.getpass(f"{args.admin}@HACKERSPACE.PL password: ")
logging.debug("initializing kadmin")
k = kadmin.init_with_password(f"{args.admin}/admin", admin_pass)
logging.debug("kadmin initialized")
user = args.user
if user is None:
user = input("User: ")
p = k.get_principal(user)
with HsLdap.connect(args.admin, admin_pass) as ldap:
password = generage_password()
if args.email_address is not FROM_LDAP:
address = args.email_address
else:
address = ldap.get_email_address(user)
if isinstance(address, bytes):
address = address.decode()
assert isinstance(address, str)
if args.show_password:
print(f'password: "{password}"')
action = 'reset' if p is not None else 'create'
# TODO use separate command for password/welcome combo
if action is 'create':
logging.info("No existing pasword entry. Welcome email will be sent")
i = input(
f"Type yes to {action} {user}'s password and send email to {address!r}\n"
).strip()
if i != "yes":
print("Aborted")
return
if p is None:
k.add_principal(user, password)
print("password created")
else:
p.change_password(password)
print("password changed")
messages = []
if action == 'create':
messages.append(prepare_welcome_msg(args.admin, user, address))
messages.append(prepare_passwd_msg(args.admin, password, user, address))
send_mail(args.admin, admin_pass, messages)
print("email sent")
ldap.force_sasl(user)
print('LDAP password scheme set to SASL')
else:
parser.print_help()
class HsLdap:
def __init__(self, connection):
self._c = connection
@classmethod
@contextmanager
def connect(cls, admin, admin_pass):
s = Server("ldap.hackerspace.pl", use_ssl=True)
with Connection(
s,
user=f"uid={escape_rdn(admin)},ou=People,dc=hackerspace,dc=pl",
password=admin_pass,
raise_exceptions=True,
) as c:
logging.debug("connected to LDAP server")
yield cls(c)
def force_sasl(self, uid: str):
logging.debug("setting LDAP password scheme to SASL")
c = self._c
c.modify(
f'cn={escape_rdn(uid)},ou=People,dc=hackerspace,dc=pl',
{'userPassword': [(MODIFY_REPLACE, ['{crypt}x', f'{uid}@HACKERSPACE.PL'])]}
)
def get_email_address(self, uid):
logging.debug("fetching email address from LDAP")
c = self._c
c.search(
search_base="ou=People,dc=hackerspace,dc=pl",
search_filter=f"(uid={escape_filter_chars(uid)})",
search_scope=LEVEL,
attributes=["mailRoutingAddress"],
)
if not c.entries:
raise Exception("empty response")
if len(c.entries) > 1:
raise Exception("too many responses")
address = c.entries[0]["mailRoutingAddress"]
logging.debug(f"got mail address from LDAP: {address}")
return address.value
def prepare_passwd_msg(admin, password, user, address):
mail_template = Template(
resource_string(__name__, "password_reset.jinja2").decode()
)
msg = EmailMessage()
config = {"password": password, "user": user, "admin": admin}
msg.set_content(mail_template.render(config))
msg["Subject"] = f"Password reset for {user}@hackerspace.pl"
msg["From"] = f"{admin}@hackerspace.pl"
msg["To"] = address
return msg
def prepare_welcome_msg(admin, user, address):
msg = EmailMessage()
msg.set_content(resource_string(__name__, 'welcome.txt').decode())
msg["Subject"] = f"Witamy! Welcome to the Warsaw Hackerspace!"
msg["From"] = f"{admin}@hackerspace.pl"
msg["Bcc"] = f"{admin}@hackerspace.pl"
msg["To"] = ', '.join(set([address, f'{user}@hackerspace.pl']))
return msg
def send_mail(admin, admin_password, messages):
with smtplib.SMTP_SSL("mail.hackerspace.pl") as s:
s.login(admin, admin_password)
logging.debug(f"Sending {len(messages)} messages via SMTP")
for msg in messages:
s.send_message(msg)