kasownik/fetch/banking-ib.py

606 lines
23 KiB
Python

#!/usr/bin/env/python2
# -*- coding: utf-8 -*-
# Copyright (c) 2017, Remigiusz Marcinkiewicz <remigiusz@marcinkiewicz.me>
# Based on iBRE/mBank CompanyNet crawler by Sergiusz Bazanski <q3k@q3k.org>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from config import CurrentConfig
from datetime import date, datetime
from getopt import getopt, GetoptError
from sqlalchemy import Column, Integer, String, Boolean, Date, create_engine, MetaData
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.schema import CreateTable, CreateIndex
from time import sleep, time
import bs4
import csv
import enum
import hashlib
import os
import random
import re
import requests
import sys
config = {key: getattr(CurrentConfig,key) for key in dir(CurrentConfig) if key.isupper()}
Base = declarative_base()
class RawTransfer(Base):
__tablename__ = 'raw_transfer'
id = Column(Integer, primary_key=True)
raw = Column(String(512))
uid = Column(String(128), index = True)
on_account = Column(String(32), index = True)
amount = Column(Integer)
currency = Column(String(8))
date = Column(Date)
type = Column(String(16))
index = Column(Integer)
title = Column(String(256))
balance = Column(Integer)
balance_currency = Column(String(8))
from_account = Column(String(32))
to_account = Column(String(32))
from_name = Column(String(256))
to_name = Column(String(256))
class IBParseError(Exception):
pass
class IBRow(RawTransfer):
SECRET = config["SECRET"]
OWN_ACCOUNTS = config["OWN_ACCOUNTS"]
def __unicode__(self):
return u"{} *{} #{} @{} -\"{}\" -#{} => +\"{}\" +#{} [{}.{:02d} {}] ({}.{:02d} {}) ~\"{}\"".format(self.type, self.index, self.on_account, self.date, self.from_name, self.from_account, self.to_name, self.to_account, self.amount/100, self.amount%100, self.currency, self.balance/100, self.balance%100, self.balance_currency, self.title)
def __str__(self):
return unicode(self).encode("utf-8")
def __repr__(self):
return str(self)
def __init__(self, row, on_account, raw):
self.raw = raw
self.uid = row[IBField.uid]
self.index = 1
self.date = datetime.strptime(row[IBField.date_completed], "%Y%m%d").date()
self.title = row[IBField.title]
af = re.compile(r"([0-9]+)\.([0-9]{2})")
m = af.match(row[IBField.amount])
if m is None:
raise IBParseError("Can't parse amount value \"{}\"".format(row[IBField.amount]), row)
a,b = m.groups()
self.amount = int(a)*100+int(b)
self.currency = row[IBField.currency]
own_account = IBParser.parse_account_number(row[IBField.own_account])
own_name = "Stowarzyszenie \"Warszawski Hackerspace\""
if own_account not in self.OWN_ACCOUNTS:
raise IBParseError("own_account {} not in OWN_ACCOUNTS - format change?".format(own_account))
self.on_account = own_account
other_account = IBParser.parse_account_number(row[IBField.other_account])
if other_account is None:
raise IBParseError("other_account {} could not be parsed".format(row[IBField.other_account]))
other_name = row[IBField.other_name]
direction = row[IBField.direction]
if direction == "uznanie":
direction = "IN"
self.type = "IN"
elif direction == u"Obiciążenie": # sic!
direction = "OUT"
self.type = "OUT"
else:
raise IBParseError("Can't parse direction specifier \"{}\"", direction)
if own_account == other_account:
self.type = "BANK_FEE"
self.from_account = self.to_account = own_account
self.from_name = self.to_name = own_name
elif own_account in self.OWN_ACCOUNTS and other_account in self.OWN_ACCOUNTS:
self.from_name = self.to_name = own_name
if direction == "IN":
self.type = "IN_FROM_OWN"
self.from_account = other_account
self.to_account = own_account
elif direction == "OUT":
self.type = "OUT_TO_OWN"
self.from_account = own_account
self.to_account = other_account
else:
raise IBParseError("Can't figure out details of an own-to-own transfer")
elif direction == "IN":
self.type = "IN"
self.from_account = other_account
self.to_account = own_account
self.from_name = other_name
self.to_name = own_name
elif direction == "OUT":
self.type = "OUT"
self.from_account = own_account
self.to_account = other_account
self.from_name = own_name
self.to_name = other_name
else:
raise IBParseError("Can't figure out transfer type for current row", row)
if None in (self.type, self.to_account, self.from_account, self.to_name, self.from_name):
raise IBParseError("Something went wrong - one of the mandatory values empty",self.type, self.to_account, self.from_account, self.to_name, self.from_name)
class IBField(enum.Enum):
#Data waluty;Data zlecenia;Numer rachunku nadawcy;Numer banku nadawcy;Kwota w walucie rachunku;Waluta;Kurs;Kwota w walucie zlecenia;Numer rachunku odbiorcy;Odbiorca;Numer banku odbiorcy;Tytuł;Obciążenie/uznanie;Numer transakcji w systemie centralnym;
date_completed = u"Data waluty"
date_issued = u"Data zlecenia"
own_account = u"Numer rachunku nadawcy"
own_bank = u"Numer banku nadawcy"
amount = u"Kwota w walucie rachunku"
currency = u"Waluta"
rate = u"Kurs"
transfer_amount = "Kwota w walucie zlecenia"
other_account = u"Numer rachunku odbiorcy"
other_name = u"Odbiorca"
other_bank = u"Numer banku odbiorcy"
title = u"Tytuł"
direction = u"Obciążenie/uznanie"
uid = u"Numer transakcji w systemie centralnym"
class IBParser(object):
def __init__(self, account_number):
self.account_number = account_number
self.rows = []
self.fields = []
def parse(self, snapshot):
kek = u"IMPLR - STARVING - SKŁADKA ;".encode("utf-8")
#if snapshot.find(kek) == -1:
# raise IBParseError("double ; not found, format changed?")
snapshot = snapshot.replace(kek, kek[:-1])
lines = snapshot.splitlines()
header = lines.pop(0).decode("utf-8").split(";")
if not header[-1] == "":
raise IBParseError("Last column no longer empty?")
header = header[:-1]
for hf in header:
try:
self.fields.append(IBField(hf))
except ValueError as e:
raise IBParseError("Unexpected field name \"{}\"".format(hf),e)
c = csv.reader(reversed(lines), delimiter=";")
for row in c:
row = row[:-1]
if not len(row) == len(self.fields):
raise IBParseError("Row has {} fields, {} expected after parsing the header: \"{}\"".format(len(row), len(self.fields), ';'.join(row)))
d = dict(zip(self.fields, [r.decode("utf-8") for r in row]))
r = IBRow(d, self.account_number,";".join(row))
self.rows.append(r)
def get(self, type = None, on_account = None):
return [row for row in self.rows if (row.type == type or type is None) and (row.on_account == on_account or on_account is None)]
@staticmethod
def parse_account_number(s):
formats = [
"((?:[A-Za-z]{2})?[0-9]{2})[ ]?([0-9]{4})[ ]?([0-9]{4})[ ]?([0-9]{4})[ ]?([0-9]{4})[ ]?([0-9]{4})[ ]?([0-9]{4})", # 26 digits, optional country code - Poland
"((?:[A-Za-z]{2})?[0-9]{2})[ ]?([A-Z]{4})[ ]?([0-9]{4})[ ]?([0-9]{4})[ ]?([0-9]{4})[ ]?([0-9]{2})", # 22 characters including BIC bank code - Ireland
"((?:[A-Za-z]{2})?[0-9]{2})[ ]?([A-Z]{4})[ ]?([0-9]{4})[ ]?([0-9]{4})[ ]?([0-9]{2})", # 18 characters including BIC bank code - Netherlands
]
for f in formats:
m = re.search(f, s)
if m is not None:
break
if m is None:
return None
account = "".join(m.groups())
if len(m.group(1)) == 2:
account = "PL" + account
return account
class IBFetcher(object):
BASE = "https://secure.ideabank.pl/"
START_DATE = "01.11.2016"
def __init__(self):
self._soup = None
self.token = None
self.s = requests.Session()
self.s.headers.update(
{"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:50.0) Gecko/20100101 Firefox/50.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate",
"DNT": "1",
"Upgrade-Insecure-Requests": "1",
"Pragma": "no-cache",
"Cache-Control": "no-cache"
})
def _makesoup(self, data):
self._soup = bs4.BeautifulSoup(data)
return self._soup
def _dump(self):
fn = config["DUMP_FILE"]
print "[w] Dumping the last page to {}".format(fn)
open(fn, 'w').write(unicode(self._soup).encode('utf-8'))
def _getraw(self, page, params = {}):
url = self.BASE + page
r = self.s.get(url, params=params)
print "[i] GET {}?{} -> {}".format(page, "&".join([str(k)+"="+str(v) for k,v in params.items()]), r.status_code)
if r.status_code != 200:
raise Exception("return code %i" % r.status_code)
return r
def _get(self, page):
r = self._getraw(page)
self.s.headers.update({"Referer": r.url})
soup = self._makesoup(r.text)
self._gettoken(soup)
self._hitjstoken(soup)
return soup
def _postraw(self, page, data):
url = self.BASE + page
h = self.s.headers.copy()
h.update({
"Content-Type": "application/x-www-form-urlencoded",
"X-Requested-With": "XMLHttpRequest",
})
r = self.s.post(url, data)
print "[i] POST {} -> {}".format(page, r.status_code)
if r.status_code != 200:
self._dump()
raise Exception("return code %i" % r.status_code)
return r
def _post(self, page, data):
mdata = {}
mdata["banking"] = self.token
mdata.update(data)
r = self._postraw(page, mdata)
if re.search("forbidden",r.text) is not None:
self._dump()
raise Exception("Received \"forbidden3\" response. Bad token?")
self.s.headers.update({"Referer": r.url})
soup = self._makesoup(r.text)
self._gettoken(soup)
self._hitjstoken(soup)
return soup
def _wait(self, seconds):
print "[i] Waiting {} seconds".format(seconds)
sleep(seconds)
def _gettoken(self, soup):
i = soup.find("input", type="hidden", attrs={"name": "banking"})
m = re.search("changeBanking\(\'([0-9a-fA-F]+)\'\)", str(soup))
if i is not None and i["value"] is not None:
t = i["value"]
elif m is not None:
t = m.group(1)
else:
t = None
if t is not None:
self.token = t
print "[i] Token: {}".format(self.token)
else:
print "[i] No new token found"
def _hitjstoken(self, soup):
m = re.search("\/main\/index\/token\/([0-9]+)\/time\/", str(soup.head))
if m is not None:
t = m.group(1)
r = self._getraw("main/index/token/{}/time/{:.0f}.js".format(t, time()*1000), params={"t": "{:.16f}".format(random.random())})
print "[i] Fetched JS timestamp token: \"{}\"".format(r.text)
def process_wallet_page(self, soup):
wallet = {"accounts": {}}
account_ids = []
for button in soup.find_all("button", class_="historia1"):
account_ids.append(re.search("\/accounts\/index\/([0-9]+)\/2", str(button["onclick"])).group(1))
accounts = []
for dt in soup.find_all("table", id="data"):
account = {}
cell = dt.find("td", class_="cell1")
if cell is None or cell.string is None:
continue
account["number"] = IBParser.parse_account_number((cell.string.strip()))
if account["number"] is None:
continue
cells = cell.find_next_siblings("td")
account["currency"] = cells[0].string.strip()
account["balance"] = cells[1].string.strip()
account["available_balance"] = cells[2].string.strip()
account["pln_balance"] = cells[3].string.strip()
accounts.append(account)
for account_id, account in zip(account_ids, accounts):
account["id"] = account_id
wallet["accounts"][account["number"]] = account
if len(wallet["accounts"]) == 0:
print "[e] Empty accounts list. Undetected failed login? Aborting."
self._dump()
sys.exit(4)
return wallet
def login(self, username, password):
login1_page = self._get("main/index")
self._wait(3)
data = {}
data["js"] = "true"
data["login"] = username
login2_page = self._post("main/index", data)
self._wait(3)
data = {}
password2_input = login2_page.find("input", attrs={"name": "password2"})
if password2_input is None:
print "[e] Masked password screen encountered - aborting"
sys.exit(4)
#data["log"] = username
#data["log2"] = ""
#part_inputs = login2_page.find_all("input", class_="password_parts_inputs")
#print "[i] Filling out {} characters".format(len(part_inputs))
#for input in part_inputs:
#_,pos = input["name"].split("_")
#data["pass_"+str(pos)] = password[int(pos)]
else:
print "[i] Regular password screen encountered"
data["log2"] = username
data["password2"] = password2_input["value"]
data["password"] = password
wallet_page = self._post("main/index", data)
if wallet_page.find("div", class_="login_form"):
print "[e] Login failed, aborting"
self._dump()
try:
print "[e] Possible reason: {}".format(','.join(wallet_page.find("ul", class_="error_list").stripped_strings))
except:
pass # screw it, we're fucked anyway
sys.exit(4)
self._wait(2)
return self.process_wallet_page(wallet_page)
def fetch_account_history(self, account_id):
#account_page = self._get("accounts/index/{}/2".format(account_id))
#self._wait(4)
#data = {
#"code": account_id,
#"basic": 1,
#"date_from": self.START_DATE,
#"date_to": '{:02d}.{:02d}.{:04d}'.format(date.today().day, date.today().month, date.today().year),
#"interval_time": "",
#"interval_type": "",
#"last": "",
#"advanced[0]": "0",
#"advanced[1]": "1",
#"operation_type": "3",
#"amount_from": "",
#"amount_to": "",
#"transaction_type": "",
#"from": "",
#"title": "",
#"send": "send",
#"ajaxSend": "true"
#}
#history_page = self._post("accounts/history/{}".format(account_id), data)
#self._wait(2)
#r = self._getraw("accounts/printHistoryFile")
data = {
"code": account_id,
"report_type": "csv_dr",
"start_date": self.START_DATE,
"end_date": '{:02d}.{:02d}.{:04d}'.format(date.today().day, date.today().month, date.today().year),
"banking": self.token
}
r = self._postraw("accounts/getHistoryDailyReportsFile", data)
return r.content.decode("utf-8-sig").encode("utf-8")
def usage():
pass
def lock():
fn = config["LOCK_FILE"]
if os.path.isfile(fn):
print "[e] Lock file {} exists, aborting".format(fn)
sys.exit(3)
print "[i] Setting up lock file {}".format(fn)
open(fn,'w').close()
if not os.path.isfile(fn):
print "[e] Lock file {} somehow does not exist, aborting".format(fn)
sys.exit(3)
def release():
fn = config["LOCK_FILE"]
print "[i] Removing lock file {}".format(fn)
if not os.path.isfile(fn):
print "[e] Lock file {} somehow does not exist, WTF?".format(fn)
sys.exit(3)
os.remove(fn)
if os.path.isfile(fn):
print "[e] Lock file {} somehow still exists, WTF?".format(fn)
sys.exit(3)
if __name__ == "__main__":
try:
opts, args = getopt(sys.argv[1:], "hcl:n", ["help", "cached", "load=", "no-action", "print-schema"])
except GetoptError as err:
# print help information and exit:
print str(err) # will print something like "option -a not recognized"
usage()
sys.exit(2)
CACHE_DIR = config["CACHE_DIR"]
engine = create_engine(config["SQLALCHEMY_DATABASE_URI"])
session = sessionmaker(bind=engine)()
cached = False
noaction = False
load_files = {}
for o, a in opts:
if o in ("-h", "--help"):
usage()
sys.exit()
elif o in ("--print-schema"):
print "[i] Called with --print-schema, will print the create statement and quit."
m = MetaData()
print CreateTable(IBRow.__table__).compile(engine),";"
for index in IBRow.__table__.indexes:
print CreateIndex(index).compile(engine),";"
sys.exit()
elif o in ("-c", "--cached"):
cached = True
elif o in ("-l", "--load"):
an, f = a.split(":")
if an is None or f is None:
print "[e] --load argument \"{}\" appears malformed, could not split account number and file name".format(a)
sys.exit(2)
account_number = IBParser.parse_account_number(an)
if account_number is None:
print "[e] Account number \"{}\" unparseable".format(an)
sys.exit(2)
history = open(f,'r').read()
load_files[account_number] = history
cached = True
print "[i] Loading \"{}\" as \"{}\"".format(f, account_number)
elif o in ("-n", "--no-action"):
print "[i] Called with --no-action, will not do any database operations."
noaction = True
else:
assert False, "unhandled option"
lock()
balances = {}
if cached:
print "[i] Cached run - will not connect to the bank"
if len(load_files) > 0:
print "[i] Using manually supplied files"
history_logs = load_files
else:
print "[i] Loading cached files from {}".format(CACHE_DIR)
for f in os.listdir(CACHE_DIR):
account_number = IBParser.parse_account_number(f)
if account_number is None:
print "[e] File name number \"{}\" unparseable".format(f)
continue
history = open(CACHE_DIR + "/" + f,'r').read()
load_files[account_number] = history
print "[i] Loading \"{}\" as \"{}\"".format(f, account_number)
if len(load_files) == 0:
print "[e] No cached files to process"
sys.exit(2)
history_logs = load_files
else:
print "[i] Normal run - will connect to the bank"
fetcher = IBFetcher()
history_logs = {}
if "IB_LOGIN" not in config.keys() or "IB_PASSWORD" not in config.keys():
wallet = fetcher.login(raw_input("[?] ID: "), raw_input("[?] Password: "))
else:
print "[i] Using saved credentials"
wallet = fetcher.login(config["IB_LOGIN"], config["IB_PASSWORD"])
for account_number, account in wallet["accounts"].items():
print "[i] Fetching history for account {} ({})".format(account_number, account["id"])
history = fetcher.fetch_account_history(account["id"])
cachefile = open(CACHE_DIR+"/"+account_number,'w')
cachefile.write(history)
cachefile.close()
history_logs[account_number] = history
balances[account_number] = (account["available_balance"],account["currency"])
balancefile = open(CACHE_DIR+"/balance-"+account_number,'w')
balancefile.write("{} {}\n".format(account["available_balance"],account["currency"]))
balancefile.close()
parsed = {}
stats = {}
for account_number, history in history_logs.items():
print "[i] Parsing history for account {}".format(account_number)
parser = IBParser(account_number)
parser.parse(history)
stats[account_number] = {}
stats[account_number]["added"] = 0
stats[account_number]["skipped"] = 0
for row in parser.get():
if not session.query(IBRow).filter_by(uid=row.uid).first() and not noaction:
session.add(row)
stats[account_number]["added"] += 1
else:
stats[account_number]["skipped"] += 1
if noaction:
print "[i] --no-action set, skipping row {}".format(row)
session.commit()
if balances:
print "[i] Account balances:"
for account_number,v in balances.items():
balance,currency = v
print "\t{}: {} {}".format(account_number, balance, currency)
print "[i] Done: ", stats
release()
#print f.create_report().read()