covid-formity/formity/scripts.py

137 lines
5.4 KiB
Python

import csv
import re
import time
import click
import requests
from flask.cli import AppGroup
from formity.models import FaceshieldRequest, PostalCode, Status
from formity.extensions import db
cli = AppGroup('formity')
models = {
'FaceshieldRequest': FaceshieldRequest,
'PostalCode': PostalCode,
}
@cli.command('import')
@click.argument('input_file', type=click.File('r'))
@click.option('--model', type=click.Choice(models.keys()), default='FaceshieldRequest')
@click.option('--lookup', help='On which fields to lookup existing records', multiple=True)
@click.option('--columns', help='Override column names in CSV file')
@click.option('--map', help='CSV column -> model field mapping (ex. ID:id)', multiple=True)
@click.option('--nullable', help='If specified field is empty, assume it is None', multiple=True)
@click.option('--delimiter')
@click.option('--ignore-header/--no-ignore-header', help='Ignore first line of output if specifying custom fields', default=False)
@click.option('--commit-chunk', default=0)
def import_file(input_file, model, lookup, columns=None, delimiter=',', ignore_header=False, map=[], commit_chunk=0, nullable=[]):
m = models.get(model)
csv_reader = csv.reader(input_file, delimiter=delimiter)
if columns:
columns = columns.split(',')
if ignore_header:
next(csv_reader, None)
else:
columns = next(csv_reader, None)
mapping = dict([m.split(':') for m in map])
for idx, row in enumerate(csv_reader):
row_data = {mapping.get(k, k): (None if v == '' and mapping.get(k,k) in nullable else v) for k, v in zip(columns, row)}
lookup_query = {f: row_data[f] for f in lookup}
try:
obj = m.query.filter_by(**lookup_query).one()
print('Updating object', lookup_query)
except Exception as exc:
obj = m()
print('Creating new object', lookup_query, exc)
for k, v in row_data.items():
setattr(obj, k, v)
db.session.add(obj)
if commit_chunk and idx % commit_chunk == commit_chunk - 1:
print('Commiting')
db.session.commit()
db.session.commit()
@cli.command('extract-postalcodes')
def extract_postalcodes():
for r in FaceshieldRequest.query.all():
code = re.findall(r'(\d{2})\s*-\s*(\d{3})', str(r.entity_info))
if code and not r.shipping_postalcode:
r.shipping_postalcode = '-'.join(code[0])
r.shipping_city = PostalCode.query.get(r.shipping_postalcode).city
print(r.id, r.shipping_postalcode)
db.session.commit()
@cli.command('extract-places')
@click.option('--apikey')
def extract_places(apikey):
places = FaceshieldRequest.query.filter(FaceshieldRequest.shipping_postalcode == None, FaceshieldRequest.status != Status.rejected, FaceshieldRequest.status != Status.spam).all()
for data in places:
print(' * [%d] %r' % (data.id, data.entity_info,))
try:
results = requests.get('https://maps.googleapis.com/maps/api/place/findplacefromtext/json', params={
'input': data.entity_info,
'inputtype': 'textquery',
'fields': 'formatted_address,geometry',
'key': apikey,
}).json()
if results['status'] == 'OK':
place = results['candidates'][0]
address = place['formatted_address']
re_result = re.findall(r'^(.*), (\d{2}-\d{3}) (.*), Polska$', address)
if not re_result:
print(' -> unparseable', address)
else:
data.shipping_street, data.shipping_postalcode, data.shipping_city = re_result[0]
print(' -> saved', data.shipping_street, data.shipping_postalcode, data.shipping_city)
if place.get('geometry', {}).get('location'):
data.shipping_latitude = place['geometry']['location']['lat']
data.shipping_longitude = place['geometry']['location']['lng']
print(' -> got geolocation %r %r' % (data.shipping_latitude, data.shipping_longitude))
else:
print(' -> not found (%s)' % (results['status'],))
except Exception as exc:
print(' -> failed (%s)' % (exc,))
db.session.commit()
@cli.command('extract-geo')
def extract_geo():
places = FaceshieldRequest.query.filter(FaceshieldRequest.shipping_latitude == None, FaceshieldRequest.shipping_postalcode != None, FaceshieldRequest.status != Status.rejected, FaceshieldRequest.status != Status.spam).all()
for data in places:
try:
query = {
'postalcode': data.shipping_postalcode,
'format': 'json',
'country': 'Poland',
}
if data.shipping_street:
query['street'] = data.shipping_street
print(' * [%s] %r' % (data.id, query))
resp = requests.get('https://nominatim.openstreetmap.org/search', params=query)
resp_json = resp.json()
if not resp_json:
print(' -> not found')
else:
print(' -> %r' % (resp_json[0],))
data.shipping_latitude = resp_json[0]['lat']
data.shipping_longitude = resp_json[0]['lon']
db.session.commit()
except Exception as exc:
print(' -> failed (%s)' % (exc,))
time.sleep(1.0)