covid-formity/formity/admin.py

253 lines
11 KiB
Python

from flask import redirect, flash, request, url_for
import flask_admin
from formity.extensions import admin, db, ModelView, ModelViewHighSecurity, AdminSecurityMixin
from wtforms import TextAreaField
from formity.models import FaceshieldRequest, RequestChange, Status, PostalCode, ExternalUser
from spaceauth import current_user
import enum
import decimal
from flask_weasyprint import HTML, render_pdf
from sqlalchemy import inspect, func
from sqlalchemy.orm import class_mapper
from sqlalchemy.orm.attributes import get_history
diff_sanitization_rules = [
(enum.Enum, lambda v: v.name),
(decimal.Decimal, float),
]
def get_diff(target, blacklist=['created', 'updated']):
state_before = {}
state_after = {}
inspr = inspect(target)
attrs = class_mapper(target.__class__).column_attrs
for attr in attrs:
if attr.key in blacklist:
continue
hist = getattr(inspr.attrs, attr.key).history
if hist.has_changes():
state_before[attr.key] = get_history(target, attr.key)[2].pop()
state_after[attr.key] = getattr(target, attr.key)
for data in [state_before, state_after]:
for t, c in diff_sanitization_rules:
if isinstance(data[attr.key], t):
data[attr.key] = c(data[attr.key])
if state_after[attr.key] == state_before[attr.key] or (state_after[attr.key] in ['', None] and state_before[attr.key] in ['', None]):
state_after.pop(attr.key)
state_before.pop(attr.key)
return state_before, state_after
class IndexView(AdminSecurityMixin, flask_admin.AdminIndexView):
@flask_admin.expose('/')
def index(self):
stats = FaceshieldRequest.query.with_entities(
FaceshieldRequest.status.label('status'),
FaceshieldRequest.handling_orga.label('handling_orga'),
func.count().label('count'),
func.sum(FaceshieldRequest.faceshield_full_required).label('faceshield_full_required'),
func.sum(FaceshieldRequest.faceshield_full_delivered).label('faceshield_full_delivered'),
func.sum(FaceshieldRequest.faceshield_front_required).label('faceshield_front_required'),
func.sum(FaceshieldRequest.faceshield_front_delivered).label('faceshield_front_delivered'),
).filter(FaceshieldRequest.status != Status.rejected, FaceshieldRequest.status != Status.spam).group_by(FaceshieldRequest.status, FaceshieldRequest.handling_orga).order_by(FaceshieldRequest.status, FaceshieldRequest.handling_orga).all()
vstats = FaceshieldRequest.query.select_from(FaceshieldRequest).with_entities(
func.coalesce(PostalCode.voivodeship, 'unknown').label('voivodeship'),
func.count().label('count'),
func.sum(FaceshieldRequest.faceshield_full_required).label('faceshield_full_required'),
func.sum(FaceshieldRequest.faceshield_full_delivered).label('faceshield_full_delivered'),
func.sum(FaceshieldRequest.faceshield_front_required).label('faceshield_front_required'),
func.sum(FaceshieldRequest.faceshield_front_delivered).label('faceshield_front_delivered'),
).filter(
FaceshieldRequest.status != Status.rejected,
FaceshieldRequest.status != Status.spam,
).join(FaceshieldRequest.postalcode_info, isouter=True).group_by('voivodeship').order_by(func.count().desc()).all()
dstats = FaceshieldRequest.query.select_from(FaceshieldRequest).with_entities(
func.date_trunc('day', FaceshieldRequest.created).label('date'),
func.count().label('count'),
).filter(FaceshieldRequest.status != Status.rejected, FaceshieldRequest.status != Status.spam).group_by(func.date_trunc('day', FaceshieldRequest.created)).order_by(func.date_trunc('day', FaceshieldRequest.created)).all()
return self.render('admin_index.html', stats=stats, vstats=vstats, dstats=dstats)
class MapView(AdminSecurityMixin, flask_admin.BaseView):
@flask_admin.expose('/')
def index(self):
mode = request.args.get('mode', 'all')
if mode == 'new':
query = FaceshieldRequest.query.filter(FaceshieldRequest.shipping_latitude != None, FaceshieldRequest.status == Status.new)
else:
query = FaceshieldRequest.query.filter(FaceshieldRequest.shipping_latitude != None, FaceshieldRequest.status != Status.spam, FaceshieldRequest.status != Status.rejected)
mapdata = [
{
key: getattr(request, key).name if isinstance(getattr(request, key), enum.Enum) else getattr(request, key)
for key in ['id', 'entity_info', 'shipping_latitude', 'shipping_longitude', 'status', 'handling_orga', 'extra', 'remarks', 'faceshield_full_required']
}
for request in query
]
return self.render('admin_map.html', mapdata=mapdata, focus=request.args.get('id', None))
class FaceshieldRequestAdmin(ModelView):
column_default_sort = 'created'
details_modal_template = 'changelog_details_modal.html'
edit_template = 'changelog_edit.html'
list_template = 'faceshieldrequest_list.html'
column_searchable_list = ('id', 'email', 'remarks', 'extra', 'entity_info', 'full_name')
column_filters = (
'id',
'entity_info', 'full_name', 'phone_number', 'email', 'extra',
'faceshield_front_required', 'faceshield_model',
'faceshield_full_required',
'faceshield_full_delivered', 'faceshield_front_delivered', 'handling_orga',
'created', 'ua', 'ip', 'status', 'remarks',
'shipping_name', 'shipping_street', 'shipping_postalcode', 'shipping_city',
'shipping_latitude', 'shipping_longitude',
'postalcode_info',
)
form_overrides = {'entity_info': TextAreaField, 'extra': TextAreaField, 'remarks': TextAreaField}
form_excluded_columns = ('changelog', 'postalcode_info')
column_export_list = column_filters
column_labels = {
'faceshield_front_required': 'Front required',
'faceshield_full_required': 'Full required',
'faceshield_front_delivered': 'Front delivered',
'faceshield_full_delivered': 'Full delivered',
}
column_list = ('id', 'entity_info', 'full_name', 'faceshield_full_required', 'faceshield_full_delivered', 'faceshield_front_required', 'faceshield_front_delivered', 'handling_orga', 'created', 'status')
column_editable_list = ('status', 'remarks', 'handling_orga')
form_choices = {
'handling_orga': [
('hswaw', 'hswaw'),
('hskrk', 'hskrk'),
('hswro', 'hswro'),
],
}
can_delete = False
can_view_details = True
details_modal = True
can_set_page_size = True
@property
def can_export(self):
return not current_user.external
def on_model_change(self, form, model, is_created):
if is_created:
return
try:
before, after = get_diff(model)
except Exception:
# xD
before, after = {'diff': 'failed'}, {'diff': 'failed'}
if before or after:
db.session.add(RequestChange(
user_id=current_user.get_id() or 'nobody',
request_id=model.id,
state_before=before,
state_after=after,
))
@flask_admin.expose('/label/')
def label(self):
return render_pdf(HTML(string=self.label_html()))
@flask_admin.expose('/label/html')
def label_html(self):
model = self.get_one(request.args.get('id'))
return self.render('label.html', models=[model])
def bulk_status_change(self, ids, status):
try:
query = self.get_query().filter(FaceshieldRequest.id.in_(ids))
count = 0
for req in query.all():
if req.status != status:
req.status = status
count += 1
self.on_model_change(None, req, False)
db.session.commit()
flash('{} requests were successfully marked as {}.'.format(count, status.name))
except Exception as ex:
if not self.handle_view_exception(ex):
raise
flash('Failed to mark request as {}: {}'.format(status.name, str(ex)), 'error')
@flask_admin.actions.action('new', 'Mark as new', 'Are you sure you want to mark as new?')
def action_new(self, ids):
self.bulk_status_change(ids, Status.new)
@flask_admin.actions.action('fulfilled', 'Mark as fulfilled', 'Are you sure you want to mark as fulfilled?')
def action_fulfilled(self, ids):
self.bulk_status_change(ids, Status.fulfilled)
@flask_admin.actions.action('intransit', 'Mark as intransit', 'Are you sure you want to mark as intransit?')
def action_intransit(self, ids):
self.bulk_status_change(ids, Status.intransit)
@flask_admin.actions.action('shippingpending', 'Mark as shippingpending', 'Are you sure you want to mark as shippingpending?')
def action_shippingpending(self, ids):
self.bulk_status_change(ids, Status.shippingpending)
@flask_admin.actions.action('pickuppending', 'Mark as pickuppending', 'Are you sure you want to mark as pickuppending?')
def action_pickuppending(self, ids):
self.bulk_status_change(ids, Status.pickuppending)
@flask_admin.actions.action('bulkprint', 'Print')
def action_bulkprint(self, ids):
models = self.get_query().filter(FaceshieldRequest.id.in_(ids)).all()
return render_pdf(HTML(string=self.render('label.html', models=models)))
class FilteredFaceshieldRequestAdmin(FaceshieldRequestAdmin):
def get_query(self):
return super(FilteredFaceshieldRequestAdmin, self).get_query().filter(~FaceshieldRequest.status.in_([Status.rejected, Status.spam, Status.fulfilled, Status.delegated]))
def get_count_query(self):
return super(FilteredFaceshieldRequestAdmin, self).get_count_query().filter(~FaceshieldRequest.status.in_([Status.rejected, Status.spam, Status.fulfilled, Status.delegated]))
class ShippingFaceshieldRequestAdmin(FilteredFaceshieldRequestAdmin):
column_editable_list = ('shipping_name', 'shipping_street', 'shipping_postalcode', 'shipping_city', 'shipping_latitude', 'shipping_longitude', 'status')
column_list = ['id', 'entity_info', 'full_name', *column_editable_list]
class ExternalUserAdmin(ModelViewHighSecurity):
column_default_sort = 'id'
column_list = ('id', 'email', 'password', 'remarks')
form_columns = ('email', 'password', 'remarks')
can_delete = True
class ChangelogAdmin(ModelView):
can_delete = False
can_edit = False
can_set_page_size = True
can_create = False
column_list = ('user_id', 'request', 'state_after', 'created')
column_filters = ('request_id', 'request', 'user_id', 'created')
column_default_sort = ('created', True)
admin.add_view(FilteredFaceshieldRequestAdmin(FaceshieldRequest, db.session))
admin.add_view(FaceshieldRequestAdmin(FaceshieldRequest, db.session, name='FaceshieldRequest (Unfiltered)', endpoint='request_unfiltered'))
admin.add_view(ShippingFaceshieldRequestAdmin(FaceshieldRequest, db.session, name='FaceshieldRequest (Shipping)', endpoint='request_shipping'))
admin.add_view(ExternalUserAdmin(ExternalUser, db.session, name='External Users', endpoint='external_user'))
admin.add_view(MapView(name='Map', endpoint='map'))
admin.add_view(ChangelogAdmin(RequestChange, db.session))