IdeaBank fetcher/parser implemented - initial version, still a bit fugly, does not add transfers to the database yet

master
Remigiusz Marcinkiewicz 2017-01-12 06:53:36 +01:00
parent e444918aee
commit f780d8024f
1 changed files with 400 additions and 0 deletions

400
fetch/banking-ib.py Normal file
View File

@ -0,0 +1,400 @@
#!/usr/bin/env/python2
# -*- coding: utf-8 -*-
# Copyright (c) 2017, Remigiusz Marcinkiewicz <remigiusz@marcinkiewicz.me>
# Based on iBRE/mBank CompanyNet crawler by Sergiusz Bazanski <q3k@q3k.org>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import csv
from getopt import getopt
import datetime
import re
import hashlib
import requests
import bs4
import time
import random
import enum
from datetime import date
if not __name__ == "__main__":
from webapp import app
else:
app = type("",(object,),{"config": {"SECRET": "foobar", "OWN_ACCOUNTS": ["PL48195000012006000648890002", "PL21195000012006000648890003", "PL91195000012006000648890004", "PL64195000012006000648890005", "PL45114010100000541244001003"]}})()
class IBParseError(Exception):
pass
class IBRow(object):
SECRET = app.config["SECRET"]
OWN_ACCOUNTS = app.config["OWN_ACCOUNTS"]
def __unicode__(self):
return u"{} *{} #{} @{} -\"{}\" -#{} => +\"{}\" +#{} [{}.{:02d} {}] ({}.{:02d} {}) ~\"{}\"".format(self.type, self.index, self.current_account, self.time, self.from_name, self.from_account, self.to, self.account, self.amount/100, self.amount%100, self.currency, self.balance/100, self.balance%100, self.balance_currency, self.title)
def __str__(self):
return unicode(self).encode("utf-8")
def __repr__(self):
return str(self)
def __init__(self, row, current_account):
self.raw = row
self.index = 1
self.current_account = current_account
self.time = datetime.datetime.strptime(row[IBField.date_completed], "%d.%m.%Y").date()
self.account = IBParser.parse_account_number(row[IBField.to_account])
self.to = row[IBField.to_name]
self.from_account = IBParser.parse_account_number(row[IBField.from_account])
self.from_name = row[IBField.from_name]
self.title = row[IBField.title]
af = re.compile(r"([0-9]+)\.([0-9]{2}) ([A-Z]+)")
m = af.match(row[IBField.amount])
if m is None:
raise IBParseError("Can't parse amount value \"{}\"".format(row[IBField.amount]), row)
a,b,c = m.groups()
self.amount = int(a)*100+int(b)
self.currency = c
m = af.match(row[IBField.balance])
if m is None:
raise IBParseError("Can't parse balance value \"{}\"".format(row[IBField.balance]), row)
a,b,c = m.groups()
self.balance = int(a)*100+int(b)
self.balance_currency = c
if self.from_account == self.account:
self.type = "BANK_FEE"
elif self.from_account in self.OWN_ACCOUNTS and self.account in self.OWN_ACCOUNTS:
if self.account == self.current_account:
self.type = "OUT_FROM_OWN"
else:
self.type = "OUT_TO_OWN"
elif self.from_account == self.current_account:
self.type = "OUT"
elif self.account == self.current_account:
self.type = "IN"
else:
raise IBParseError("Can't figure out transfer type for current row", row)
self.uid = hashlib.sha256(self.SECRET + str(self)).hexdigest()
print self.uid
class IBField(enum.Enum):
from_name = u"Nadawca"
from_account = u"Rachunek nadawcy"
title = u"Tytułem"
to_name = u"Odbiorca"
to_account = u"Rachunek odbiorcy"
date_issued = u"Data złożenia dyspozycji"
date_completed = u"Data waluty"
amount = u"Kwota operacji"
balance = u"Saldo po operacji"
class IBParser(object):
def __init__(self, account_number):
self.account_number = account_number
self.rows = []
self.fields = []
def parse(self, snapshot):
c = csv.reader(snapshot.splitlines(), delimiter=";")
header = [r.decode("utf-8") for r in next(c, None)]
if header is None:
raise IBParseError("No header in history for {}".format(account_number))
for hf in header:
try:
self.fields.append(IBField(hf))
except ValueError as e:
raise IBParseError("Unexpected field name \"{}\"".format(hf),e)
for row in c:
if not len(row) == len(self.fields):
raise IBParseError("Row has {} fields, {} expected after parsing the header: \"{}\"".format(len(row), len(self.fields), ';'.join(row)))
d = dict(zip(self.fields, [r.decode("utf-8") for r in row]))
r = IBRow(d, account_number)
self.rows.append(r)
def get_by_type(self, y):
return [row for row in self.rows if row.type == y]
@staticmethod
def parse_account_number(s):
formats = [
"((?:[A-Za-z]{2})?[0-9]{2}) ([0-9]{4}) ([0-9]{4}) ([0-9]{4}) ([0-9]{4}) ([0-9]{4}) ([0-9]{4})", # 26 digits, optional country code - Poland
]
for f in formats:
m = re.search(f, s)
if m is not None:
break
if m is None:
return None
account = "".join(m.groups())
if len(m.group(1)) == 2:
account = "PL" + account
return account
class IBFetcher(object):
BASE = "https://secure.ideabank.pl/"
START_DATE = "01.11.2016"
def __init__(self):
self.token = None
self.s = requests.Session()
self.s.headers.update(
{"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:50.0) Gecko/20100101 Firefox/50.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate",
"DNT": "1",
"Upgrade-Insecure-Requests": "1",
"Pragma": "no-cache",
"Cache-Control": "no-cache"
})
def _getraw(self, page, params = {}):
url = self.BASE + page
r = self.s.get(url, params=params)
print "[i] GET {}?{} -> {}".format(page, "&".join([str(k)+"="+str(v) for k,v in params.items()]), r.status_code)
if r.status_code != 200:
raise Exception("return code %i" % r.status_code)
return r
def _get(self, page):
r = self._getraw(page)
self.s.headers.update({"Referer": r.url})
soup = bs4.BeautifulSoup(r.text)
self._gettoken(soup)
self._hitjstoken(soup)
return soup
def _postraw(self, page, data):
url = self.BASE + page
h = self.s.headers.copy()
h.update({
"Content-Type": "application/x-www-form-urlencoded",
"X-Requested-With": "XMLHttpRequest",
})
r = self.s.post(url, data)
print "[i] POST {} -> {}".format(page, r.status_code)
if r.status_code != 200:
raise Exception("return code %i" % r.status_code)
return r
def _post(self, page, data):
mdata = {}
mdata["banking"] = self.token
mdata.update(data)
r = self._postraw(page, mdata)
if re.search("forbidden",r.text) is not None:
raise Exception("Received \"forbidden3\" response. Bad token?")
self.s.headers.update({"Referer": r.url})
soup = bs4.BeautifulSoup(r.text)
self._gettoken(soup)
self._hitjstoken(soup)
return soup
def _wait(self, seconds):
print "[i] Waiting {} seconds".format(seconds)
#time.sleep(seconds)
def _gettoken(self, soup):
i = soup.find("input", type="hidden", attrs={"name": "banking"})
m = re.search("changeBanking\(\'([0-9a-fA-F]+)\'\)", str(soup))
if i is not None and i["value"] is not None:
t = i["value"]
elif m is not None:
t = m.group(1)
else:
t = None
if t is not None:
self.token = t
print "[i] Token: {}".format(self.token)
else:
print "[i] No new token found"
def _hitjstoken(self, soup):
m = re.search("\/main\/index\/token\/([0-9]+)\/time\/", str(soup.head))
if m is not None:
t = m.group(1)
r = self._getraw("main/index/token/{}/time/{:.0f}.js".format(t, time.time()*1000), params={"t": "{:.16f}".format(random.random())})
print "[i] Fetched JS timestamp token: \"{}\"".format(r.text)
def process_wallet_page(self, soup):
wallet = {"accounts": {}}
account_ids = []
for button in soup.find_all("button", class_="historia1"):
account_ids.append(re.search("\/accounts\/index\/([0-9]+)\/2", str(button["onclick"])).group(1))
accounts = []
for dt in soup.find_all("table", id="data"):
account = {}
cell = dt.find("td", class_="cell1")
if cell is None or cell.string is None:
continue
account["number"] = IBParser.parse_account_number((cell.string.strip()))
if account["number"] is None:
continue
cells = cell.find_next_siblings("td")
account["currency"] = cells[0].string.strip()
account["balance"] = cells[1].string.strip()
account["available_balance"] = cells[2].string.strip()
account["pln_balance"] = cells[3].string.strip()
accounts.append(account)
for account_id, account in zip(account_ids, accounts):
account["id"] = account_id
wallet["accounts"][account["number"]] = account
return wallet
def login(self, username, password):
login1_page = self._get("main/index")
self._wait(3)
data = {}
data["js"] = "true"
data["login"] = username
login2_page = self._post("main/index", data)
self._wait(3)
password2 = login2_page.find("input", attrs={"name": "password2"})["value"]
data = {}
data["log2"] = username
data["password"] = password
data["password2"] = password2
wallet_page = self._post("main/index", data)
self._wait(2)
return self.process_wallet_page(wallet_page)
def fetch_account_history(self, account_id):
account_page = self._get("accounts/index/{}/2".format(account_id))
self._wait(4)
data = {
"code": account_id,
"basic": 1,
"date_from": self.START_DATE,
"date_to": '{:02d}.{:02d}.{:04d}'.format(date.today().day, date.today().month, date.today().year),
"interval_time": "",
"interval_type": "",
"last": "",
"advanced[0]": "0",
"advanced[1]": "1",
"operation_type": "3",
"amount_from": "",
"amount_to": "",
"transaction_type": "",
"from": "",
"title": "",
"send": "send",
"ajaxSend": "true"
}
history_page = self._post("accounts/history/{}".format(account_id), data)
self._wait(2)
r = self._getraw("accounts/printHistoryFile")
return r.content.decode("utf-8-sig").encode("utf-8")
def usage():
pass
if __name__ == "__main__":
try:
opts, args = getopt.getopt(sys.argv[1:], "cl:", ["cached", "load="])
except getopt.GetoptError as err:
# print help information and exit:
print str(err) # will print something like "option -a not recognized"
usage()
sys.exit(2)
cached = False
load_files = {}
for o, a in opts:
if o in ("-h", "--help"):
usage()
sys.exit()
elif o in ("-c", "--cached"):
cached = True
elif o in ("-l", "--load"):
account_number, f = a.split(":")
if account_number is None or f is None:
print "[e] --load argument \"{}\" appears malformed, could not split account number and file name".format(a)
sys.exit(2)
account_number = IBParser.parse_account_number(account_number)
if account_number is None:
print "[e] Account number \"{}\" unparseable".format(account_number)
history = open(f,'r').read()
load_files[account_number] = history
cached = True
print "[i] Loading \"{}\" as \"{}\"".format(f, account_number)
else:
assert False, "unhandled option"
accs = ["PL48195000012006000648890002", "PL21195000012006000648890003", "PL91195000012006000648890004", "PL64195000012006000648890005"]
if cached:
print "[i] Cached run - will not connect to the bank"
history_logs = load_files
else:
print "[i] Normal run - will connect to the bank"
fetcher = IBFetcher()
history_logs = {}
wallet = fetcher.login(raw_input("[?] ID: "), raw_input("[?] Password: "))
for account_number, account in wallet["accounts"].items():
if account_number not in accs and False:
print "[i] Skipping {} ({})".format(account_number, account["id"])
continue
print "[i] Fetching history for account {} ({})".format(account_number, account["id"])
history = fetcher.fetch_account_history(account["id"])
tmp = open(account_number,'w')
tmp.write(history)
tmp.close()
history_logs[account_number] = history
parsed = {}
for account_number, history in history_logs.items():
print "[i] Parsing history for account {}".format(account_number)
parser = IBParser(account_number)
parser.parse(history)
parsed[account_number] = parser.rows
for a,p in parsed.items():
print ""
print "{}:".format(a)
for e in p:
print "\t{}".format(e)
print ""
#print f.create_report().read()