fetch/banking-pekaobiznes: WIP

This commit is contained in:
informatic 2021-12-09 22:13:49 +01:00
parent d07d696e99
commit d81adaeadb
2 changed files with 408 additions and 0 deletions

348
fetch/banking-pekaobiznes.py Executable file
View file

@ -0,0 +1,348 @@
#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p python3 python3Packages.requests python3Packages.requests-cache python3Packages.beautifulsoup4
import requests
import sys
import hashlib
import json
import re
import pprint
import atexit
import logging
import datetime
import http.cookiejar
import xml.etree.ElementTree as ET
from decimal import Decimal
from urllib.parse import urljoin, urlparse, parse_qs
from bs4 import BeautifulSoup
from binascii import unhexlify
from models import RawTransfer
# loginPass = string zbudowany z wyswietlonych inputów, * w zablokowanycha
# loginMaskArray = hex2bytes ze zmiennej "loginMask"
# createPassMaskedBis ( username, loginPass, loginMaskArray )
# aliasArray → string to bytes
# mieszanie (ANDarray) loginPass z loginMaskArray → pod bajty \xff w loginMaskArray podstawiamy bajt/znak z loginPass
# sha1 ( alias1 ++ mieszany )
def mask_password(password, login_mask, login_alias):
hash_source = login_alias.encode()
for i, b in enumerate(unhexlify(login_mask)):
if b == 255:
hash_source += password[i].encode()
else:
hash_source += bytes([b])
return hashlib.sha1(hash_source).hexdigest()
class CAMT052Parser:
def __init__(self, xmldata):
self.xml = ET.fromstring(ET.canonicalize(xmldata, strip_text=True))
@staticmethod
def parse_account_number(s):
formats = [
"((?:[A-Za-z]{2})?[0-9]{2})[ ]?([0-9]{4})[ ]?([0-9]{4})[ ]?([0-9]{4})[ ]?([0-9]{4})[ ]?([0-9]{4})[ ]?([0-9]{4})", # 26 digits, optional country code - Poland
"((?:[A-Za-z]{2})?[0-9]{2})[ ]?([A-Z]{4})[ ]?([0-9]{4})[ ]?([0-9]{4})[ ]?([0-9]{4})[ ]?([0-9]{2})", # 22 characters including BIC bank code - Ireland
"((?:[A-Za-z]{2})?[0-9]{2})[ ]?([A-Z]{4})[ ]?([0-9]{4})[ ]?([0-9]{4})[ ]?([0-9]{2})", # 18 characters including BIC bank code - Netherlands
"((?:[A-Za-z]{2})?[0-9]{2})[ ]?([0-9]{4})[ ]?([0-9]{4})[ ]?([0-9]{4})[ ]?([0-9]{4})[ ]?([0-9]{2})", # 22 digits - Germany
]
for f in formats:
m = re.search(f, s)
if m is not None:
break
if m is None:
return None
account = "".join(m.groups())
if len(m.group(1)) == 2:
account = "PL" + account
return account
def parse(self):
ns = {"ns": "urn:iso:std:iso:20022:tech:xsd:camt.052.001.02"}
report = self.xml.find("ns:BkToCstmrAcctRpt", ns).find("ns:Rpt", ns)
on_account = self.parse_account_number(
report.find("ns:Acct", ns).find("ns:Id", ns).find("ns:IBAN", ns).text
)
for entry in report.findall("ns:Ntry", ns):
txdtls = entry.find("ns:NtryDtls", ns).find("ns:TxDtls", ns)
tx_type = entry.find("ns:CdtDbtInd", ns).text
if tx_type == "DBIT":
remote_party = txdtls.find("ns:RltdPties", ns).find("ns:Cdtr", ns)
remote_party_acct = txdtls.find("ns:RltdPties", ns).find(
"ns:CdtrAcct", ns
)
elif tx_type == "CRDT":
remote_party = txdtls.find("ns:RltdPties", ns).find("ns:Dbtr", ns)
remote_party_acct = txdtls.find("ns:RltdPties", ns).find(
"ns:DbtrAcct", ns
)
else:
raise Exception("Unknown transaction type %r" % tx_type)
if remote_party_acct:
idelm = remote_party_acct.find("ns:Id", ns)
if idelm.find("ns:IBAN", ns) is not None:
remote_party_acct = idelm.find("ns:IBAN", ns).text
elif idelm.find("ns:Othr", ns) is not None:
remote_party_acct = idelm.find("ns:Othr", ns).find("ns:Id", ns).text
else:
print(ET.tostring(idelm).decode())
raise Exception("No remote party account found %r" % (idelm,))
remote_party_info = remote_party.find("ns:Nm", ns).text
if remote_party_info == "NOTPROVIDED":
remote_party_info = ""
if (
remote_party.find("ns:PstlAdr", ns) is not None
and remote_party.find("ns:PstlAdr", ns).find("ns:AdrLine", ns)
is not None
):
remote_party_info = (
remote_party_info
+ " "
+ remote_party.find("ns:PstlAdr", ns).find("ns:AdrLine", ns).text
).strip()
amt = entry.find("ns:Amt", ns)
transfer = RawTransfer()
transfer.index = 1
transfer.on_account = on_account
transfer.raw = ET.tostring(entry).decode()
transfer.amount = int(Decimal(amt.text) * 100)
transfer.currency = amt.attrib["Ccy"]
transfer.date = datetime.datetime.strptime(
entry.find("ns:BookgDt", ns).find("ns:DtTm", ns).text,
"%Y-%m-%dT%H:%M:%S",
).date()
transfer.title = txdtls.find("ns:RmtInf", ns).find("ns:Ustrd", ns).text
if tx_type == "DBIT":
transfer.type = "BANK_FEE" if remote_party_acct is None else "OUT"
transfer.to_account = (
None
if remote_party_acct is None
else self.parse_account_number(remote_party_acct)
)
transfer.to_name = remote_party_info
transfer.from_account = on_account
else:
transfer.type = "BANK_FEE" if remote_party_acct is None else "IN"
transfer.to_account = on_account
transfer.from_account = (
None
if remote_party_acct is None
else self.parse_account_number(remote_party_acct)
)
transfer.from_name = remote_party_info
print(
tx_type,
amt.attrib["Ccy"],
int(Decimal(amt.text) * 100),
entry.find("ns:BookgDt", ns).find("ns:DtTm", ns).text,
txdtls.find("ns:Refs", ns).find("ns:TxId", ns).text,
remote_party_acct,
txdtls.find("ns:RmtInf", ns).find("ns:Ustrd", ns).text,
"|||",
remote_party_info,
)
yield transfer
class PekaoClient:
def __init__(self, config):
self.config = config
self.logger = logging.getLogger(self.__class__.__name__)
if config.get("use-cache"):
import requests_cache
self.session = requests_cache.CachedSession(
"pekao_cache", allowable_methods=["GET", "POST"]
)
else:
self.session = requests.Session()
if config.get("cookies-file"):
self.session.cookies = http.cookiejar.LWPCookieJar(
filename=config.get("cookies-file")
)
try:
self.session.cookies.load(ignore_discard=True, ignore_expires=True)
except:
pass
atexit.register(
lambda: self.session.cookies.save(
ignore_discard=True, ignore_expires=True
)
)
self.session.headers["User-Agent"] = config.get(
"user-agent",
"Mozilla/5.0 (X11; Linux x86_64; rv:94.0) Gecko/20100101 Firefox/94.0",
)
def login(self):
self._go("https://www.pekaobiznes24.pl/do/login")
self._submit_form(
"LoginAliasForm",
{
"p_alias": self.config["alias"],
"deviceFingerprint": self.config["tdid"],
},
)
self._go("https://www.pekaobiznes24.pl/do/Authorization")
login_mask = re.findall("var loginMask = '([0-9a-f]*)';", self.resp.text)[0]
self._submit_form(
"MaskLoginForm",
{
"p_passmasked_bis": mask_password(
self.config["password"], login_mask, self.config["alias"]
)
},
)
redirect_url = urljoin(
self.resp.url, re.findall("top.location='(.*)'", self.resp.text)[0]
)
self._go(redirect_url)
if self.bs.find("form", {"name": "messagesOnLoginForm"}):
self.logger.info("Confirming messages on login...")
self._submit_form("messagesOnLoginForm", {"task": "SAVE_AS_READED"})
jsredir = re.findall('this.location = "(.*)"', self.resp.text)
if jsredir:
self._go(urljoin(self.resp.url, jsredir[0]))
self._go(urljoin(self.resp.url, self.bs.find("frame", {"name": "main"})["src"]))
url = urlparse(self.resp.url)
self.taglib_token = parse_qs(url.query)["org.apache.struts.taglib.html.TOKEN"][
0
]
self.logger.debug("taglib token: %r", self.taglib_token)
def list_accounts(self):
resp = self.session.post(
"https://www.pekaobiznes24.pl/webcorpo/do/allAccountsSelect?remChckdAcc=",
data={"org.apache.struts.taglib.html.TOKEN": self.taglib_token},
)
resp.raise_for_status()
accounts = json.loads(
re.findall("LB_ALL_PAGE_ACCOUNTS = (\[.*\]);", resp.text)[0]
)
return {acc["p_acc_id"]: acc for acc in accounts}
def fetch_transfers_camt052(self, account_id, date_from=None, date_to=None):
res = self.session.post(
"https://www.pekaobiznes24.pl/webcorpo/do/desktop",
data={
"task": "NAV_REDIRECT#exportTransTemplatesList",
"org.apache.struts.taglib.html.TOKEN": self.taglib_token,
},
)
res.raise_for_status()
date_from = datetime.datetime.now() - datetime.timedelta(days=30)
date_to = datetime.datetime.now()
res = self.session.post(
"https://www.pekaobiznes24.pl/webcorpo/do/exportTransactions",
data={
"org.apache.struts.taglib.html.TOKEN": self.taglib_token,
"clearP_text": "N",
"createZip": "false",
"exportType": "EO",
"p_acc_id": account_id,
# "p_acc_no": "...",
"p_itt_code": "EO",
"saveAsReport": "false",
"showInform": "0",
"synchConfirmed": "N",
"task": "EXPORT",
"templateId": "219697",
"p_date_type": "1",
# "p_last_cnt": last_days,
"p_date_from": date_from.strftime("%d.%m.%Y"),
"p_date_fromDAY": date_from.strftime("%d"),
"p_date_fromMON": date_from.strftime("%m"),
"p_date_fromYEAR": date_from.strftime("%Y"),
"minDate": "01/01/1945",
"p_date_to": date_to.strftime("%d.%m.%Y"),
"p_date_toDAY": date_to.strftime("%d"),
"p_date_toMON": date_to.strftime("%m"),
"p_date_toYEAR": date_to.strftime("%Y"),
},
)
res.raise_for_status()
if "xml" not in res.headers.get("content-disposition", ""):
errormsg = re.findall(
r"""var toast = toastr\['error'\]\(\s*"(.*)",\s*""\);""",
res.text,
re.MULTILINE,
)
if not errormsg or errormsg != ["Brak danych do eksportu."]:
raise Exception("Unknown error: %r" % errormsg)
return None
return res.text
def _go(self, url, method="GET", **args):
self.resp = self.session.request(method, url, **args)
self.resp.raise_for_status()
self.logger.debug("=> %s %s", method, self.resp.url)
self.bs = BeautifulSoup(self.resp.text)
def _submit_form(self, name, values):
form = self.bs.find("form", {"name": name})
form_data = {}
for inp in form.find_all("input"):
if inp.get("name") and not inp.get("disabled"):
form_data[inp.get("name")] = inp.get("value")
data = {**form_data, **values}
target = urljoin(self.resp.url, form.get("action"))
self._go(target, form.get("method").upper(), data=data)
print(target, data)
if __name__ == "__main__":
with open(sys.argv[1]) as fd:
c = PekaoClient(json.load(fd))
c.login()
accounts = c.list_accounts()
for a, info in accounts.items():
print(
r"[{p_acc_id}] {p_acc_no} {p_acc_avail_balance: >10} {p_acc_currency}".format(
**info
)
)
for a, info in accounts.items():
print("*** Fetching transfers for", info["p_acc_no"], info["p_acc_alias"])
transfers = c.fetch_transfers_camt052(a)
if transfers is None:
print("No transfers found")
else:
parser = CAMT052Parser(transfers)
for transfer in parser.parse():
print(transfer)

60
fetch/models.py Normal file
View file

@ -0,0 +1,60 @@
import time
from sqlalchemy import (
Column,
Integer,
String,
Date,
BigInteger,
create_engine,
MetaData,
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.schema import CreateTable, CreateIndex
Base = declarative_base()
class RawTransfer(Base):
__tablename__ = "raw_transfer"
id = Column(Integer, primary_key=True)
raw = Column(String)
uid = Column(String(128), index=True)
on_account = Column(String(32), index=True)
amount = Column(Integer)
currency = Column(String(8))
date = Column(Date)
type = Column(String(16))
index = Column(Integer)
title = Column(String(256))
balance = Column(Integer)
balance_currency = Column(String(8))
from_account = Column(String(32))
to_account = Column(String(32))
from_name = Column(String(256))
to_name = Column(String(256))
scrape_timestamp = Column(BigInteger, default=lambda: round(time.time() * 1000000))
def __str__(self):
return u'{} *{} #{} @{} -"{}" -#{} => +"{}" +#{} [{}.{:02d} {}] ~"{}"'.format(
self.type,
self.index,
self.on_account,
self.date,
self.from_name,
self.from_account,
self.to_name,
self.to_account,
self.amount // 100,
self.amount % 100,
self.currency,
self.title,
)
def __repr__(self):
return "<Transfer %s>" % (str(self),)