from flask import redirect, flash, request, url_for import flask_admin from formity.extensions import admin, db, ModelView, ModelViewHighSecurity, AdminSecurityMixin from wtforms import TextAreaField from formity.models import FaceshieldRequest, RequestChange, Status, PostalCode, ExternalUser from spaceauth import current_user import enum import decimal from flask_weasyprint import HTML, render_pdf from sqlalchemy import inspect, func from sqlalchemy.orm import class_mapper from sqlalchemy.orm.attributes import get_history diff_sanitization_rules = [ (enum.Enum, lambda v: v.name), (decimal.Decimal, float), ] def get_diff(target, blacklist=['created', 'updated']): state_before = {} state_after = {} inspr = inspect(target) attrs = class_mapper(target.__class__).column_attrs for attr in attrs: if attr.key in blacklist: continue hist = getattr(inspr.attrs, attr.key).history if hist.has_changes(): state_before[attr.key] = get_history(target, attr.key)[2].pop() state_after[attr.key] = getattr(target, attr.key) for data in [state_before, state_after]: for t, c in diff_sanitization_rules: if isinstance(data[attr.key], t): data[attr.key] = c(data[attr.key]) if state_after[attr.key] == state_before[attr.key] or (state_after[attr.key] in ['', None] and state_before[attr.key] in ['', None]): state_after.pop(attr.key) state_before.pop(attr.key) return state_before, state_after class IndexView(AdminSecurityMixin, flask_admin.AdminIndexView): @flask_admin.expose('/') def index(self): stats = FaceshieldRequest.query.with_entities( FaceshieldRequest.status.label('status'), FaceshieldRequest.handling_orga.label('handling_orga'), func.count().label('count'), func.sum(FaceshieldRequest.faceshield_full_required).label('faceshield_full_required'), func.sum(FaceshieldRequest.faceshield_full_delivered).label('faceshield_full_delivered'), func.sum(FaceshieldRequest.faceshield_front_required).label('faceshield_front_required'), func.sum(FaceshieldRequest.faceshield_front_delivered).label('faceshield_front_delivered'), ).filter(FaceshieldRequest.status != Status.rejected, FaceshieldRequest.status != Status.spam).group_by(FaceshieldRequest.status, FaceshieldRequest.handling_orga).order_by(FaceshieldRequest.status, FaceshieldRequest.handling_orga).all() vstats = FaceshieldRequest.query.select_from(FaceshieldRequest).with_entities( func.coalesce(PostalCode.voivodeship, 'unknown').label('voivodeship'), func.count().label('count'), func.sum(FaceshieldRequest.faceshield_full_required).label('faceshield_full_required'), func.sum(FaceshieldRequest.faceshield_full_delivered).label('faceshield_full_delivered'), func.sum(FaceshieldRequest.faceshield_front_required).label('faceshield_front_required'), func.sum(FaceshieldRequest.faceshield_front_delivered).label('faceshield_front_delivered'), ).filter( FaceshieldRequest.status != Status.rejected, FaceshieldRequest.status != Status.spam, ).join(FaceshieldRequest.postalcode_info, isouter=True).group_by('voivodeship').order_by(func.count().desc()).all() dstats = FaceshieldRequest.query.select_from(FaceshieldRequest).with_entities( func.date_trunc('day', FaceshieldRequest.created).label('date'), func.count().label('count'), ).filter(FaceshieldRequest.status != Status.rejected, FaceshieldRequest.status != Status.spam).group_by(func.date_trunc('day', FaceshieldRequest.created)).order_by(func.date_trunc('day', FaceshieldRequest.created)).all() return self.render('admin_index.html', stats=stats, vstats=vstats, dstats=dstats) class MapView(AdminSecurityMixin, flask_admin.BaseView): @flask_admin.expose('/') def index(self): mapdata = [ { key: getattr(request, key).name if isinstance(getattr(request, key), enum.Enum) else getattr(request, key) for key in ['id', 'entity_info', 'shipping_latitude', 'shipping_longitude', 'status', 'handling_orga'] } for request in FaceshieldRequest.query.filter(FaceshieldRequest.shipping_latitude != None, FaceshieldRequest.status != Status.spam, FaceshieldRequest.status != Status.rejected) ] return self.render('admin_map.html', mapdata=mapdata, focus=request.args.get('id', None)) class FaceshieldRequestAdmin(ModelView): column_default_sort = 'created' details_modal_template = 'changelog_details_modal.html' edit_template = 'changelog_edit.html' list_template = 'faceshieldrequest_list.html' column_searchable_list = ('id', 'email', 'remarks', 'extra', 'entity_info', 'full_name') column_filters = ( 'id', 'entity_info', 'full_name', 'phone_number', 'email', 'extra', 'faceshield_front_required', 'faceshield_model', 'faceshield_full_required', 'faceshield_full_delivered', 'faceshield_front_delivered', 'handling_orga', 'created', 'ua', 'ip', 'status', 'remarks', 'shipping_name', 'shipping_street', 'shipping_postalcode', 'shipping_city', 'shipping_latitude', 'shipping_longitude', 'postalcode_info', ) form_overrides = {'entity_info': TextAreaField, 'extra': TextAreaField, 'remarks': TextAreaField} form_excluded_columns = ('changelog', 'postalcode_info') column_export_list = column_filters column_labels = { 'faceshield_front_required': 'Front required', 'faceshield_full_required': 'Full required', 'faceshield_front_delivered': 'Front delivered', 'faceshield_full_delivered': 'Full delivered', } column_list = ('id', 'entity_info', 'full_name', 'faceshield_full_required', 'faceshield_full_delivered', 'faceshield_front_required', 'faceshield_front_delivered', 'handling_orga', 'created', 'status') column_editable_list = ('status', 'remarks', 'handling_orga') form_choices = { 'handling_orga': [ ('hswaw', 'hswaw'), ('hskrk', 'hskrk'), ('hswro', 'hswro'), ], } can_delete = False can_view_details = True details_modal = True can_set_page_size = True @property def can_export(self): return not current_user.external def on_model_change(self, form, model, is_created): if is_created: return try: before, after = get_diff(model) except Exception: # xD before, after = {'diff': 'failed'}, {'diff': 'failed'} if before or after: db.session.add(RequestChange( user_id=current_user.get_id() or 'nobody', request_id=model.id, state_before=before, state_after=after, )) @flask_admin.expose('/label/') def label(self): return render_pdf(HTML(string=self.label_html())) @flask_admin.expose('/label/html') def label_html(self): model = self.get_one(request.args.get('id')) if model.faceshield_full_delivered % 150 == 0 and model.faceshield_full_delivered // 150 > 1: label_count = model.faceshield_full_delivered // 150 else: label_count = 1 return self.render('label.html', model=model, label_count=label_count) def bulk_status_change(self, ids, status): try: query = self.get_query().filter(FaceshieldRequest.id.in_(ids)) count = 0 for req in query.all(): if req.status != status: req.status = status count += 1 self.on_model_change(None, req, False) db.session.commit() flash('{} requests were successfully marked as {}.'.format(count, status.name)) except Exception as ex: if not self.handle_view_exception(ex): raise flash('Failed to mark request as {}: {}'.format(status.name, str(ex)), 'error') @flask_admin.actions.action('new', 'Mark as new', 'Are you sure you want to mark as new?') def action_new(self, ids): self.bulk_status_change(ids, Status.new) @flask_admin.actions.action('fulfilled', 'Mark as fulfilled', 'Are you sure you want to mark as fulfilled?') def action_fulfilled(self, ids): self.bulk_status_change(ids, Status.fulfilled) @flask_admin.actions.action('intransit', 'Mark as intransit', 'Are you sure you want to mark as intransit?') def action_intransit(self, ids): self.bulk_status_change(ids, Status.intransit) @flask_admin.actions.action('shippingpending', 'Mark as shippingpending', 'Are you sure you want to mark as shippingpending?') def action_shippingpending(self, ids): self.bulk_status_change(ids, Status.shippingpending) @flask_admin.actions.action('pickuppending', 'Mark as pickuppending', 'Are you sure you want to mark as pickuppending?') def action_pickuppending(self, ids): self.bulk_status_change(ids, Status.pickuppending) class FilteredFaceshieldRequestAdmin(FaceshieldRequestAdmin): def get_query(self): return super(FilteredFaceshieldRequestAdmin, self).get_query().filter(~FaceshieldRequest.status.in_([Status.rejected, Status.spam, Status.fulfilled, Status.delegated])) def get_count_query(self): return super(FilteredFaceshieldRequestAdmin, self).get_count_query().filter(~FaceshieldRequest.status.in_([Status.rejected, Status.spam, Status.fulfilled, Status.delegated])) class ShippingFaceshieldRequestAdmin(FilteredFaceshieldRequestAdmin): column_editable_list = ('shipping_name', 'shipping_street', 'shipping_postalcode', 'shipping_city', 'shipping_latitude', 'shipping_longitude', 'status') column_list = ['id', 'entity_info', 'full_name', *column_editable_list] class ExternalUserAdmin(ModelViewHighSecurity): column_default_sort = 'id' column_list = ('id', 'email', 'password', 'remarks') form_columns = ('email', 'password', 'remarks') can_delete = True class ChangelogAdmin(ModelView): can_delete = False can_edit = False can_set_page_size = True can_create = False column_list = ('user_id', 'request', 'state_after', 'created') column_filters = ('request_id', 'request', 'user_id', 'created') admin.add_view(FilteredFaceshieldRequestAdmin(FaceshieldRequest, db.session)) admin.add_view(FaceshieldRequestAdmin(FaceshieldRequest, db.session, name='FaceshieldRequest (Unfiltered)', endpoint='request_unfiltered')) admin.add_view(ShippingFaceshieldRequestAdmin(FaceshieldRequest, db.session, name='FaceshieldRequest (Shipping)', endpoint='request_shipping')) admin.add_view(ExternalUserAdmin(ExternalUser, db.session, name='External Users', endpoint='external_user')) admin.add_view(MapView(name='Map', endpoint='map')) admin.add_view(ChangelogAdmin(RequestChange, db.session))