216 lines
6.8 KiB
Python
216 lines
6.8 KiB
Python
"""
|
|
Module which serves users' avatars.
|
|
|
|
Based on a simple cache which keeps all avatars in memory.
|
|
"""
|
|
|
|
import base64
|
|
import binascii
|
|
import colorsys
|
|
import time
|
|
import io
|
|
import logging
|
|
import random
|
|
|
|
from PIL import Image, ImageDraw
|
|
import flask
|
|
import ldap
|
|
|
|
from webapp import context, ldaputils
|
|
|
|
bp = flask.Blueprint('avatar', __name__)
|
|
log = logging.getLogger('ldap-web.avatar')
|
|
|
|
|
|
# Stolen from https://stackoverflow.com/questions/43512615/reshaping-rectangular-image-to-square
|
|
def resize_image(image: Image, length: int) -> Image:
|
|
"""
|
|
Resize an image to a square. Can make an image bigger to make it fit or smaller if it doesn't fit. It also crops
|
|
part of the image.
|
|
|
|
:param self:
|
|
:param image: Image to resize.
|
|
:param length: Width and height of the output image.
|
|
:return: Return the resized image.
|
|
"""
|
|
|
|
"""
|
|
Resizing strategy :
|
|
1) We resize the smallest side to the desired dimension (e.g. 1080)
|
|
2) We crop the other side so as to make it fit with the same length as the smallest side (e.g. 1080)
|
|
"""
|
|
if image.size[0] < image.size[1]:
|
|
# The image is in portrait mode. Height is bigger than width.
|
|
|
|
# This makes the width fit the LENGTH in pixels while conserving the ration.
|
|
resized_image = image.resize((length, int(image.size[1] * (length / image.size[0]))))
|
|
|
|
# Amount of pixel to lose in total on the height of the image.
|
|
required_loss = (resized_image.size[1] - length)
|
|
|
|
# Crop the height of the image so as to keep the center part.
|
|
resized_image = resized_image.crop(
|
|
box=(0, required_loss / 2, length, resized_image.size[1] - required_loss / 2))
|
|
|
|
# We now have a length*length pixels image.
|
|
return resized_image
|
|
else:
|
|
# This image is in landscape mode or already squared. The width is bigger than the heihgt.
|
|
|
|
# This makes the height fit the LENGTH in pixels while conserving the ration.
|
|
resized_image = image.resize((int(image.size[0] * (length / image.size[1])), length))
|
|
|
|
# Amount of pixel to lose in total on the width of the image.
|
|
required_loss = resized_image.size[0] - length
|
|
|
|
# Crop the width of the image so as to keep 1080 pixels of the center part.
|
|
resized_image = resized_image.crop(
|
|
box=(required_loss / 2, 0, resized_image.size[0] - required_loss / 2, length))
|
|
|
|
# We now have a length*length pixels image.
|
|
return resized_image
|
|
|
|
|
|
syrenka = Image.open("syrenka.png")
|
|
|
|
|
|
def default_avatar(uid: str) -> Image:
|
|
"""
|
|
Create little generative avatar for people who don't have a custom one
|
|
configured.
|
|
"""
|
|
img = Image.new('RGBA', (256, 256), (255, 255, 255, 0))
|
|
draw = ImageDraw.Draw(img)
|
|
|
|
# Deterministic rng for stable output.
|
|
rng = random.Random(uid)
|
|
|
|
# Pick a nice random neon color.
|
|
n_h, n_s, n_l = rng.random(), 0.5 + rng.random()/2.0, 0.4 + rng.random()/5.0
|
|
|
|
# Use muted version for background.
|
|
r, g, b = [int(256*i) for i in colorsys.hls_to_rgb(n_h, n_l+0.3, n_s-0.1)]
|
|
draw.rectangle([(0, 0), (256, 256)], fill=(r,g,b,255))
|
|
|
|
# Scale logo by randomized factor.
|
|
factor = 0.7 + 0.1 * rng.random()
|
|
w, h = int(syrenka.size[0] * factor), int(syrenka.size[1] * factor)
|
|
overlay = syrenka.resize((w, h))
|
|
|
|
# Crop to headshot.
|
|
overlay = overlay.crop(box=(0, 0, w, w))
|
|
|
|
# Give it a little nudge.
|
|
overlay = overlay.rotate((rng.random() - 0.5) * 100)
|
|
|
|
# Colorize with full neon color.
|
|
r, g, b = [int(256*i) for i in colorsys.hls_to_rgb(n_h,n_l,n_s)]
|
|
pixels = overlay.load()
|
|
for x in range(img.size[0]):
|
|
for y in range(img.size[1]):
|
|
alpha = pixels[x, y][3]
|
|
pixels[x, y] = (r, g, b, alpha)
|
|
|
|
img.alpha_composite(overlay)
|
|
|
|
res = io.BytesIO()
|
|
img.save(res, 'PNG')
|
|
return res.getvalue()
|
|
|
|
|
|
class AvatarCacheEntry:
|
|
# UID of the avatar's user.
|
|
uid: str
|
|
# Deadline when this entry expires.
|
|
deadline: float
|
|
# Cached source bytes
|
|
data: bytes
|
|
# Cached converted bytes
|
|
_converted: bytes
|
|
|
|
def __init__(self, uid: str, data: bytes):
|
|
self.uid = uid
|
|
self.deadline = time.time() + 3600
|
|
self.data = data
|
|
self._converted = b""
|
|
|
|
def serve(self):
|
|
"""
|
|
Serve sanitized image. Always re-encode to PNG 256x256.
|
|
"""
|
|
# Re-encode to PNG if we haven't yet.
|
|
if self._converted == b"":
|
|
try:
|
|
img = Image.open(io.BytesIO(self.data))
|
|
except Exception as e:
|
|
log.warning("Could not parse avatar for {}: {}".format(self.uid, e))
|
|
self.data = default_avatar(self.uid)
|
|
img = Image.open(io.BytesIO(self.data))
|
|
|
|
res = io.BytesIO()
|
|
img = resize_image(img, 256)
|
|
img.save(res, 'PNG')
|
|
self._converted = res.getvalue()
|
|
|
|
return flask.Response(self._converted, mimetype='image/png')
|
|
|
|
|
|
class AvatarCache:
|
|
# keyed by uid
|
|
entries: dict[str, AvatarCacheEntry]
|
|
|
|
def __init__(self):
|
|
self.entries = {}
|
|
|
|
def get(self, uid: str, bust: bool = False) -> AvatarCacheEntry:
|
|
"""
|
|
Get avatar, either from cache or from LDAP on cache miss. If 'bust' is
|
|
set to True, the cache will not be consulted and the newest result from
|
|
LDAP will always be served.
|
|
"""
|
|
now = time.time()
|
|
# Try to get a cache entry to serve, if possible.
|
|
if not bust and uid in self.entries:
|
|
entry = self.entries[uid]
|
|
if entry.deadline > now:
|
|
return entry.serve()
|
|
|
|
# Otherwise, retrieve from LDAP.
|
|
conn = context.get_admin_connection()
|
|
try:
|
|
dn = ldaputils.user_dn(uid)
|
|
res = conn.search_s(dn, ldap.SCOPE_SUBTREE)
|
|
except ldap.NO_SUCH_OBJECT:
|
|
res = []
|
|
|
|
avatar = None
|
|
if len(res) == 1:
|
|
for attr, vs in res[0][1].items():
|
|
if attr == 'jpegPhoto':
|
|
for v in vs:
|
|
try:
|
|
avatar = base64.b64decode(v)
|
|
except binascii.Error as e:
|
|
log.warning("Could not b64decode avatar for {}".format(uid))
|
|
avatar = None
|
|
else:
|
|
break
|
|
break
|
|
# If nothing was found in LDAP (either uid doesn't exist or uid doesn't
|
|
# have an avatar attached), serve default avatar.
|
|
if avatar is None:
|
|
avatar = default_avatar(uid)
|
|
|
|
# Save avatar in cache.
|
|
entry = AvatarCacheEntry(uid, avatar)
|
|
self.entries[uid] = entry
|
|
|
|
# And serve the entry.
|
|
return entry.serve()
|
|
|
|
cache = AvatarCache()
|
|
|
|
@bp.route('/avatar/<uid>', methods=['GET'])
|
|
def avatar_serve(uid):
|
|
return cache.get(uid)
|