334 lines
10 KiB
Python
334 lines
10 KiB
Python
"""
|
|
Module which serves users' avatars.
|
|
|
|
Based on a simple cache which keeps all avatars in memory.
|
|
"""
|
|
|
|
import base64
|
|
import binascii
|
|
import colorsys
|
|
import time
|
|
import io
|
|
import logging
|
|
import random
|
|
import hashlib
|
|
|
|
from PIL import Image, ImageDraw
|
|
import flask
|
|
import ldap
|
|
|
|
from webapp import app, ldaputils, config
|
|
|
|
from typing import List
|
|
|
|
bp = flask.Blueprint("avatar", __name__)
|
|
log = logging.getLogger("ldap-web.avatar")
|
|
|
|
|
|
# Stolen from https://stackoverflow.com/questions/43512615/reshaping-rectangular-image-to-square
|
|
def resize_image(image: Image.Image, length: int) -> Image.Image:
|
|
"""
|
|
Resize an image to a square. Can make an image bigger to make it fit or smaller if it doesn't fit. It also crops
|
|
part of the image.
|
|
|
|
:param self:
|
|
:param image: Image to resize.
|
|
:param length: Width and height of the output image.
|
|
:return: Return the resized image.
|
|
"""
|
|
|
|
"""
|
|
Resizing strategy :
|
|
1) We resize the smallest side to the desired dimension (e.g. 1080)
|
|
2) We crop the other side so as to make it fit with the same length as the smallest side (e.g. 1080)
|
|
"""
|
|
if image.size[0] < image.size[1]:
|
|
# The image is in portrait mode. Height is bigger than width.
|
|
|
|
# This makes the width fit the LENGTH in pixels while conserving the ration.
|
|
resized_image = image.resize(
|
|
(length, int(image.size[1] * (length / image.size[0])))
|
|
)
|
|
|
|
# Amount of pixel to lose in total on the height of the image.
|
|
required_loss = resized_image.size[1] - length
|
|
|
|
# Crop the height of the image so as to keep the center part.
|
|
resized_image = resized_image.crop(
|
|
box=(
|
|
0,
|
|
required_loss // 2,
|
|
length,
|
|
resized_image.size[1] - required_loss // 2,
|
|
)
|
|
)
|
|
|
|
# We now have a length*length pixels image.
|
|
return resized_image
|
|
else:
|
|
# This image is in landscape mode or already squared. The width is bigger than the heihgt.
|
|
|
|
# This makes the height fit the LENGTH in pixels while conserving the ration.
|
|
resized_image = image.resize(
|
|
(int(image.size[0] * (length / image.size[1])), length)
|
|
)
|
|
|
|
# Amount of pixel to lose in total on the width of the image.
|
|
required_loss = resized_image.size[0] - length
|
|
|
|
# Crop the width of the image so as to keep 1080 pixels of the center part.
|
|
resized_image = resized_image.crop(
|
|
box=(
|
|
required_loss // 2,
|
|
0,
|
|
resized_image.size[0] - required_loss // 2,
|
|
length,
|
|
)
|
|
)
|
|
|
|
# We now have a length*length pixels image.
|
|
return resized_image
|
|
|
|
|
|
def process_upload(data: bytes) -> bytes:
|
|
img = Image.open(io.BytesIO(data))
|
|
img = resize_image(img, 256)
|
|
res = io.BytesIO()
|
|
img.save(res, "PNG")
|
|
return base64.b64encode(res.getvalue())
|
|
|
|
|
|
syrenka = Image.open("syrenka.png")
|
|
|
|
|
|
def default_avatar(uid: str) -> bytes:
|
|
"""
|
|
Create little generative avatar for people who don't have a custom one
|
|
configured.
|
|
"""
|
|
img = Image.new("RGBA", (256, 256), (255, 255, 255, 0))
|
|
draw = ImageDraw.Draw(img)
|
|
|
|
# Deterministic rng for stable output.
|
|
rng = random.Random(uid)
|
|
|
|
# Pick a nice random neon color.
|
|
n_h, n_s, n_l = rng.random(), 0.5 + rng.random() / 2.0, 0.4 + rng.random() / 5.0
|
|
|
|
# Use muted version for background.
|
|
r, g, b = [int(256 * i) for i in colorsys.hls_to_rgb(n_h, n_l + 0.3, n_s - 0.1)]
|
|
draw.rectangle([(0, 0), (256, 256)], fill=(r, g, b, 255))
|
|
|
|
# Scale logo by randomized factor.
|
|
factor = 0.7 + 0.1 * rng.random()
|
|
w, h = int(syrenka.size[0] * factor), int(syrenka.size[1] * factor)
|
|
overlay = syrenka.resize((w, h))
|
|
|
|
# Crop to headshot.
|
|
overlay = overlay.crop(box=(0, 0, w, w))
|
|
|
|
# Give it a little nudge.
|
|
overlay = overlay.rotate((rng.random() - 0.5) * 100) # type: ignore
|
|
|
|
# Colorize with full neon color.
|
|
r, g, b = [int(256 * i) for i in colorsys.hls_to_rgb(n_h, n_l, n_s)]
|
|
pixels = overlay.load() # type: ignore
|
|
for x in range(img.size[0]):
|
|
for y in range(img.size[1]):
|
|
alpha = pixels[x, y][3]
|
|
pixels[x, y] = (r, g, b, alpha)
|
|
|
|
img.alpha_composite(overlay) # type: ignore
|
|
|
|
res = io.BytesIO()
|
|
img.save(res, "PNG")
|
|
return res.getvalue()
|
|
|
|
|
|
class AvatarCacheEntry:
|
|
# UID of the avatar's user.
|
|
uid: str
|
|
# Deadline when this entry expires.
|
|
deadline: float
|
|
# Cached source bytes
|
|
data: bytes
|
|
# Cached converted bytes
|
|
_converted: bytes
|
|
|
|
def __init__(self, uid: str, data: bytes) -> None:
|
|
self.uid = uid
|
|
self.deadline = time.time() + config.avatar_cache_timeout
|
|
self.data = data
|
|
self._converted = b""
|
|
|
|
def serve(self) -> flask.Response:
|
|
"""
|
|
Serve sanitized image. Always re-encode to PNG 256x256.
|
|
"""
|
|
# Re-encode to PNG if we haven't yet.
|
|
if self._converted == b"":
|
|
try:
|
|
img = Image.open(io.BytesIO(self.data))
|
|
except Exception as e:
|
|
log.warning("Could not parse avatar for {}: {}".format(self.uid, e))
|
|
self.data = default_avatar(self.uid)
|
|
img = Image.open(io.BytesIO(self.data))
|
|
|
|
res = io.BytesIO()
|
|
img = resize_image(img, 256)
|
|
img.save(res, "PNG")
|
|
self._converted = res.getvalue()
|
|
|
|
return flask.Response(self._converted, mimetype="image/png")
|
|
|
|
|
|
class AvatarCache:
|
|
# keyed by uid
|
|
entries: dict[str, AvatarCacheEntry]
|
|
|
|
def __init__(self) -> None:
|
|
self.entries = {}
|
|
|
|
def reset(self) -> None:
|
|
self.entries = {}
|
|
|
|
def reset_user(self, uid: str) -> None:
|
|
if uid in self.entries:
|
|
del self.entries[uid]
|
|
|
|
def get(self, uid: str, bust: bool = False) -> flask.Response:
|
|
"""
|
|
Get avatar, either from cache or from LDAP on cache miss. If 'bust' is
|
|
set to True, the cache will not be consulted and the newest result from
|
|
LDAP will always be served.
|
|
"""
|
|
now = time.time()
|
|
# Try to get a cache entry to serve, if possible.
|
|
if not bust and uid in self.entries:
|
|
entry = self.entries[uid]
|
|
if entry.deadline > now:
|
|
return entry.serve()
|
|
else:
|
|
# Entry expired, remove it.
|
|
del self.entries[uid]
|
|
|
|
# Otherwise, retrieve from LDAP.
|
|
conn = app.get_connection()
|
|
if conn is None:
|
|
conn = app.get_admin_connection()
|
|
try:
|
|
dn = ldaputils.user_dn(uid)
|
|
res = conn.search_s(dn, ldap.SCOPE_SUBTREE)
|
|
except ldap.NO_SUCH_OBJECT:
|
|
res = []
|
|
|
|
avatar = None
|
|
is_user_found = len(res) == 1
|
|
if is_user_found:
|
|
for attr, vs in res[0][1].items():
|
|
if attr == "jpegPhoto":
|
|
for v in vs:
|
|
# Temporary workaround: treat empty jpegPhoto as no avatar.
|
|
if v == b"":
|
|
avatar = None
|
|
break
|
|
|
|
try:
|
|
avatar = base64.b64decode(v)
|
|
except binascii.Error as e:
|
|
log.warning("Could not b64decode avatar for {}".format(uid))
|
|
avatar = None
|
|
else:
|
|
break
|
|
break
|
|
# If nothing was found in LDAP (either uid doesn't exist or uid doesn't
|
|
# have an avatar attached), serve default avatar.
|
|
if avatar is None:
|
|
# don't generate avatars for non-users to reduce DoS potential
|
|
# (note: capacifier already leaks existence of users, so whatever)
|
|
avatar = default_avatar(uid if is_user_found else "default")
|
|
|
|
# Save avatar in cache.
|
|
entry = AvatarCacheEntry(uid, avatar)
|
|
self.entries[uid] = entry
|
|
|
|
# And serve the entry.
|
|
return entry.serve()
|
|
|
|
|
|
cache = AvatarCache()
|
|
|
|
|
|
def hash_for_uid(uid: str) -> str:
|
|
# NOTE: Gravatar documentation says to use SHA256, but everyone passes MD5 instead
|
|
email = f"{uid}@hackerspace.pl".strip().lower()
|
|
hasher = hashlib.md5()
|
|
hasher.update(email.encode())
|
|
return hasher.hexdigest()
|
|
|
|
|
|
def get_all_user_uids(conn: ldap.ldapobject) -> List[str]:
|
|
all_uids = []
|
|
|
|
results = conn.search_s(
|
|
config.ldap_people, ldap.SCOPE_SUBTREE, "uid=*", attrlist=["uid"]
|
|
)
|
|
for user, attrs in results:
|
|
uid = attrs["uid"][0].decode()
|
|
all_uids.append(uid)
|
|
|
|
return all_uids
|
|
|
|
|
|
class HashCache:
|
|
# email hash -> uid mapping
|
|
entries: dict[str, str] = {}
|
|
# deadline when this cache expires
|
|
deadline: float = 0
|
|
|
|
def get(self, email_hash: str) -> str:
|
|
self.rebuild_if_needed()
|
|
return self.entries.get(email_hash, "default")
|
|
|
|
def reset(self) -> None:
|
|
self.entries = {}
|
|
self.deadline = 0
|
|
|
|
def rebuild_if_needed(self) -> None:
|
|
now = time.time()
|
|
if now > self.deadline:
|
|
self.rebuild()
|
|
|
|
def rebuild(self) -> None:
|
|
log.info("Rebuilding email hash cache")
|
|
conn = app.get_admin_connection()
|
|
users = get_all_user_uids(conn)
|
|
self.deadline = time.time() + config.avatar_cache_timeout
|
|
self.entries = {hash_for_uid(uid): uid for uid in users}
|
|
|
|
|
|
hash_cache = HashCache()
|
|
|
|
|
|
def sanitize_email_hash(hash: str) -> str:
|
|
"""
|
|
lowercases, removes file extension (probably)
|
|
"""
|
|
hash = hash.lower()
|
|
if hash.endswith(".png") or hash.endswith(".jpg"):
|
|
hash = hash[:-4]
|
|
return hash
|
|
|
|
|
|
@bp.route("/avatar/<email_hash>", methods=["GET"])
|
|
def gravatar_serve(email_hash: str) -> flask.Response:
|
|
"""
|
|
Serves avatar in a Gravatar-compatible(ish) way, i.e. by email hash, not user name.
|
|
"""
|
|
uid = hash_cache.get(sanitize_email_hash(email_hash))
|
|
return cache.get(uid)
|
|
|
|
|
|
@bp.route("/avatar/user/<uid>", methods=["GET"])
|
|
def avatar_serve(uid: str) -> flask.Response:
|
|
return cache.get(uid)
|