ldapweb/webapp/avatar.py

303 lines
9.4 KiB
Python

"""
Module which serves users' avatars.
Based on a simple cache which keeps all avatars in memory.
"""
import base64
import binascii
import colorsys
import time
import io
import logging
import random
import hashlib
from PIL import Image, ImageDraw
import flask
import ldap
from webapp import context, ldaputils, config
bp = flask.Blueprint('avatar', __name__)
log = logging.getLogger('ldap-web.avatar')
# Stolen from https://stackoverflow.com/questions/43512615/reshaping-rectangular-image-to-square
def resize_image(image: Image, length: int) -> Image:
"""
Resize an image to a square. Can make an image bigger to make it fit or smaller if it doesn't fit. It also crops
part of the image.
:param self:
:param image: Image to resize.
:param length: Width and height of the output image.
:return: Return the resized image.
"""
"""
Resizing strategy :
1) We resize the smallest side to the desired dimension (e.g. 1080)
2) We crop the other side so as to make it fit with the same length as the smallest side (e.g. 1080)
"""
if image.size[0] < image.size[1]:
# The image is in portrait mode. Height is bigger than width.
# This makes the width fit the LENGTH in pixels while conserving the ration.
resized_image = image.resize((length, int(image.size[1] * (length / image.size[0]))))
# Amount of pixel to lose in total on the height of the image.
required_loss = (resized_image.size[1] - length)
# Crop the height of the image so as to keep the center part.
resized_image = resized_image.crop(
box=(0, required_loss / 2, length, resized_image.size[1] - required_loss / 2))
# We now have a length*length pixels image.
return resized_image
else:
# This image is in landscape mode or already squared. The width is bigger than the heihgt.
# This makes the height fit the LENGTH in pixels while conserving the ration.
resized_image = image.resize((int(image.size[0] * (length / image.size[1])), length))
# Amount of pixel to lose in total on the width of the image.
required_loss = resized_image.size[0] - length
# Crop the width of the image so as to keep 1080 pixels of the center part.
resized_image = resized_image.crop(
box=(required_loss / 2, 0, resized_image.size[0] - required_loss / 2, length))
# We now have a length*length pixels image.
return resized_image
def process_upload(data: bytes) -> bytes:
img = Image.open(io.BytesIO(data))
img = resize_image(img, 256)
res = io.BytesIO()
img.save(res, 'PNG')
return base64.b64encode(res.getvalue())
syrenka = Image.open("syrenka.png")
def default_avatar(uid: str) -> Image:
"""
Create little generative avatar for people who don't have a custom one
configured.
"""
img = Image.new('RGBA', (256, 256), (255, 255, 255, 0))
draw = ImageDraw.Draw(img)
# Deterministic rng for stable output.
rng = random.Random(uid)
# Pick a nice random neon color.
n_h, n_s, n_l = rng.random(), 0.5 + rng.random()/2.0, 0.4 + rng.random()/5.0
# Use muted version for background.
r, g, b = [int(256*i) for i in colorsys.hls_to_rgb(n_h, n_l+0.3, n_s-0.1)]
draw.rectangle([(0, 0), (256, 256)], fill=(r,g,b,255))
# Scale logo by randomized factor.
factor = 0.7 + 0.1 * rng.random()
w, h = int(syrenka.size[0] * factor), int(syrenka.size[1] * factor)
overlay = syrenka.resize((w, h))
# Crop to headshot.
overlay = overlay.crop(box=(0, 0, w, w))
# Give it a little nudge.
overlay = overlay.rotate((rng.random() - 0.5) * 100)
# Colorize with full neon color.
r, g, b = [int(256*i) for i in colorsys.hls_to_rgb(n_h,n_l,n_s)]
pixels = overlay.load()
for x in range(img.size[0]):
for y in range(img.size[1]):
alpha = pixels[x, y][3]
pixels[x, y] = (r, g, b, alpha)
img.alpha_composite(overlay)
res = io.BytesIO()
img.save(res, 'PNG')
return res.getvalue()
class AvatarCacheEntry:
# UID of the avatar's user.
uid: str
# Deadline when this entry expires.
deadline: float
# Cached source bytes
data: bytes
# Cached converted bytes
_converted: bytes
def __init__(self, uid: str, data: bytes):
self.uid = uid
self.deadline = time.time() + config.avatar_cache_timeout
self.data = data
self._converted = b""
def serve(self):
"""
Serve sanitized image. Always re-encode to PNG 256x256.
"""
# Re-encode to PNG if we haven't yet.
if self._converted == b"":
try:
img = Image.open(io.BytesIO(self.data))
except Exception as e:
log.warning("Could not parse avatar for {}: {}".format(self.uid, e))
self.data = default_avatar(self.uid)
img = Image.open(io.BytesIO(self.data))
res = io.BytesIO()
img = resize_image(img, 256)
img.save(res, 'PNG')
self._converted = res.getvalue()
return flask.Response(self._converted, mimetype='image/png')
class AvatarCache:
# keyed by uid
entries: dict[str, AvatarCacheEntry]
def __init__(self):
self.entries = {}
def reset(self):
self.entries = {}
def reset_user(self, uid: str):
if uid in self.entries:
del self.entries[uid]
def get(self, uid: str, bust: bool = False) -> AvatarCacheEntry:
"""
Get avatar, either from cache or from LDAP on cache miss. If 'bust' is
set to True, the cache will not be consulted and the newest result from
LDAP will always be served.
"""
now = time.time()
# Try to get a cache entry to serve, if possible.
if not bust and uid in self.entries:
entry = self.entries[uid]
if entry.deadline > now:
return entry.serve()
else:
# Entry expired, remove it.
del self.entries[uid]
# Otherwise, retrieve from LDAP.
conn = context.get_admin_connection()
try:
dn = ldaputils.user_dn(uid)
res = conn.search_s(dn, ldap.SCOPE_SUBTREE)
except ldap.NO_SUCH_OBJECT:
res = []
avatar = None
is_user_found = len(res) == 1
if is_user_found:
for attr, vs in res[0][1].items():
if attr == 'jpegPhoto':
for v in vs:
# Temporary workaround: treat empty jpegPhoto as no avatar.
if v == b'':
avatar = None
break
try:
avatar = base64.b64decode(v)
except binascii.Error as e:
log.warning("Could not b64decode avatar for {}".format(uid))
avatar = None
else:
break
break
# If nothing was found in LDAP (either uid doesn't exist or uid doesn't
# have an avatar attached), serve default avatar.
if avatar is None:
# don't generate avatars for non-users to reduce DoS potential
# (note: capacifier already leaks existence of users, so whatever)
avatar = default_avatar(uid if is_user_found else 'default')
# Save avatar in cache.
entry = AvatarCacheEntry(uid, avatar)
self.entries[uid] = entry
# And serve the entry.
return entry.serve()
cache = AvatarCache()
def hash_for_uid(uid):
# NOTE: Gravatar documentation says to use SHA256, but everyone passes MD5 instead
email = f'{uid}@hackerspace.pl'.strip().lower()
hasher = hashlib.md5()
hasher.update(email.encode())
return hasher.hexdigest()
def get_all_user_uids(conn):
all_uids = []
results = conn.search_s(config.ldap_people, ldap.SCOPE_SUBTREE, 'uid=*', attrlist=['uid'])
for user, attrs in results:
uid = attrs['uid'][0].decode()
all_uids.append(uid)
return all_uids
class HashCache:
# email hash -> uid mapping
entries: dict[str, str] = {}
# deadline when this cache expires
deadline: float = 0
def get(self, email_hash: str) -> str:
self.rebuild_if_needed()
return self.entries.get(email_hash, 'default')
def reset(self):
self.entries = {}
self.deadline = 0
def rebuild_if_needed(self):
now = time.time()
if now > self.deadline:
self.rebuild()
def rebuild(self):
log.info("Rebuilding email hash cache")
conn = context.get_admin_connection()
users = get_all_user_uids(conn)
self.deadline = time.time() + config.avatar_cache_timeout
self.entries = { hash_for_uid(uid): uid for uid in users }
hash_cache = HashCache()
def sanitize_email_hash(hash: str):
"""
lowercases, removes file extension (probably)
"""
hash = hash.lower()
if hash.endswith('.png') or hash.endswith('.jpg'):
hash = hash[:-4]
return hash
@bp.route('/avatar/<email_hash>', methods=['GET'])
def gravatar_serve(email_hash):
"""
Serves avatar in a Gravatar-compatible(ish) way, i.e. by email hash, not user name.
"""
uid = hash_cache.get(sanitize_email_hash(email_hash))
return cache.get(uid)
@bp.route('/avatar/user/<uid>', methods=['GET'])
def avatar_serve(uid):
return cache.get(uid)