import requests import time from formity.extensions import cache class Kurjerzy: def __init__(self, app): self.base_url = 'https://www.kurjerzy.pl/graphql' self.session = requests.Session() self.app = app self.email = None def __repr__(self): return '' % (self.email,) def send_query(self, query, variables={}): resp = self.session.post(self.base_url, json={ 'query': query, 'variables': variables, }) resp.raise_for_status() return resp.json() def authenticate(self): email = self.app.config['SHIPPING_KURJERZY_EMAIL'] password = self.app.config['SHIPPING_KURJERZY_PASSWORD'] cache_key = 'kurjerzy_token_%s' % (email,) token = cache.get(cache_key) if not token or token.get('expire', 0) < time.time(): token = self.login(email, password) cache.set(cache_key, token) if not token: raise Exception('Kurjerzy login failed!') self.email = email self.session.headers.update({"Authorization": "Bearer %s" % (token['token'],)}) def login(self, email, password): resp = self.send_query(r''' mutation ($email: String, $password: String) { createToken( email: $email, password: $password ) { token, expire, expireHuman } } ''', {'email': email, 'password': password}) if resp['data']['createToken']: return resp['data']['createToken'] def create_shipment(self, model): if not model.shipping_street: raise Exception('Invalid shipping_street') street, _, street_number = model.shipping_street.rpartition(' ') if not street: street = street_number street_number = '' result = self.send_query(''' mutation ($recipient: AddressInput, $sender: AddressInput, $refer: String, $params: ParamsInput, $services: ServicesInput) { createShipment( recipient: $recipient, sender: $sender, refer: $refer, params: $params, services: $services, productId: 3, ) { status, shipment { id, status }, error { code, group, description } } } ''', { 'recipient': { 'name': model.shipping_name, 'name2': model.full_name, 'email': model.email, 'street': street, 'number': street_number, 'zip': model.shipping_postalcode, 'city': model.shipping_city, 'phone': model.phone_number, 'country': 'PL', }, 'sender': { 'name': 'Warszawski Hackerspace', 'name2': 'Marek Adamusiński', 'email': 'qrde@hackerspace.pl', 'street': 'Wolność', 'number': '2a', 'zip': '01-018', 'city': 'Warszawa', 'phone': '601233014', 'country': 'PL', }, 'refer': str(model.id), 'params': { 'packageType': 'TYPE_PARCEL', 'weight': 4, 'width': 40, 'height': 30, 'length': 20, 'declareValue': 100, 'content': 'przyłbice dla medyków', 'packingType': 'TYPE_STANDARD', 'count': 1, }, 'services': { 'cod': 0, 'codValue': 0, 'codAccountNumber': '', 'codOneDay': 0, 'smsSenderNotification': 0, 'smsRecipientNotification': 1, 'warrantyDeliveryNextDay': 1, 'rod': 0, }, }) if not result['data']['createShipment']['status'] or result['data']['createShipment']['error']: raise Exception('Kurjerzy createShipment error occured: %s' % result['data']['createShipment']['error']['description']) model.shipping_provider = 'kurjerzy' model.shipping_id = result['data']['createShipment']['shipment']['id'] def shipment_info(self, id): print('fetching shipment_info(%r)' % (id,)) result = self.send_query(r'query($id: Int) { shipments(ids:[$id]) { id, labelRef, labelSource, status, tracking { statuses } } }', { 'id': id, }) try: return result['data']['shipments'][0] except IndexError: return None if __name__ == '__main__': k = Kurjerzy({}) print(k.login('...', '...')) print(k.send_query(r'query {shipments{id,status,number,labelRef,labelSource,params{content}}}')) # query { shipments(limit: 100) {id, number, labelSource, numberParcel,status,tracking{statuses},recipientAddress{name}} }