Skip to content
Snippets Groups Projects
Commit 98381791 authored by Daniele Nicolodi's avatar Daniele Nicolodi
Browse files

salaries: New utility to extract salaries paid from an account

parent c74d1b33
No related branches found
No related tags found
No related merge requests found
def distance(a, b):
if len(b) < len(a):
a, b = b, a
r0 = list(range(len(b) + 1))
r1 = [0] * (len(b) + 1)
for i in range(len(a)):
for j in range(len(b)):
r1[j + 1] = min(r0[j + 1] + 1, r1[j] + 1, r0[j] + (a[i] != b[j]))
r0, r1 = r1, r0
return r0[-1]
def pdist(x, func):
n = len(x)
out = [0] * ((n * (n - 1)) // 2)
k = 0
for i in range(n - 1):
for j in range(i + 1, n):
out[k] = func(x[i], x[j])
k += 1
return out
import click
import datetime
import re
import sap
import petl
import petlutils
import levenshtein
import numpy as np
from scipy.cluster.hierarchy import linkage, fcluster
from beancount.core import inventory
from orders import setDbProfile, get_table_rows, parse_amount, parse_date
def cluster(names, distance):
names = np.asarray(names, dtype='U')
y = levenshtein.pdist(names, levenshtein.distance)
z = linkage(y)
a = fcluster(z, distance, criterion='distance')
clusters = {}
for c in np.unique(a):
elements, counts = np.unique(names[np.nonzero(a == c)], return_counts=True)
elements = elements[np.argsort(counts)[::-1]]
clusters[elements[0]] = elements.tolist()
return clusters
def categorize(value, clusters):
if m := re.match(r'.*/([A-Z][a-z]+$)', value):
value = m.group(1)
for name, aliases in clusters.items():
for alias in aliases:
# When the subsitution above is applied the narration is
# replaced with the payee name thus match on equal string
# first. Otherwise check whether the alias appears in the
# narration.
if value == alias or alias in value:
return name
raise ValueError(value)
def parse_int(s):
return int(s) if s else None
def sum_amounts(values):
acc = inventory.Inventory()
for x in values:
acc.add_amount(x)
return acc.get_only_position().units
@click.command()
def main():
psp = '1K-43045'
fromdate = datetime.date(2020, 1, 1)
todate = datetime.date.today() + datetime.timedelta(days=1)
with sap.session() as session:
session.StartTransaction('CJI3')
setDbProfile(session)
session.findById('wnd[0]/usr/ctxtCN_PROJN-LOW').text = ''
session.findById('wnd[0]/usr/ctxtCN_PSPNR-LOW').text = psp
session.findById('wnd[0]/usr/ctxtR_BUDAT-LOW').text = fromdate.strftime('%d.%m.%Y')
session.findById('wnd[0]/usr/ctxtR_BUDAT-HIGH').text = todate.strftime('%d.%m.%Y')
session.findById('wnd[0]/usr/ctxtP_DISVAR').text = '/NICOLODI'
session.findById('wnd[0]').sendVKey(8)
table = session.findById('wnd[0]/usr/cntlGRID1/shellcont/shell/shellcont[1]/shell')
fields = [
('GJAHR', parse_int, 'year'),
('PERIO', parse_int, 'period'),
('KSTAR', str, 'cost-element'),
('SGTXT', str, 'narration'),
('EBTXT', str, 'po-text'),
('BLTXT', str, 'header-text'),
('GKONT_KTXT', str, 'payee'),
('WKGBTR', parse_amount, 'amount'),
('BUDAT', parse_date, 'date'),
]
def sieve(row):
return row['year'] is not None and row['cost-element'] not in ('710001', 'UML-INDKO')
# This could be probably be done in the SAP query.
rows = list(filter(sieve, get_table_rows(table, fields)))
table = petl.fromdicts(rows) \
.convert('cost-element', int) \
.selecteq('cost-element', 5100900)
# There is no easy way to check whom salary each entry
# corresponds to. The only way is to extract the name from the
# narration string which fortunately most of the times conform
# to a fixed structure. However, there may be typos in the
# names as entered there. To get around this, we extract all
# the possible names from the narration fields and run a
# clustering algorithm to match the mispelled names to the
# correct ones.
names = [m.group(1) for m in [re.match(r'.*/([A-Z][a-z]+$)', v) for v in table.values('narration')] if m]
clusters = cluster(names, 2.0)
table = table.addfield('name', lambda x: categorize(x.narration, clusters))
data = []
for name in sorted(clusters.keys()):
total = sum_amounts(table.selecteq('name', name).values('amount'))
data.append({'account': psp, 'name': name, 'total': total})
res = petl.fromdicts(data)
print(res)
if __name__ == '__main__':
main()
import petl
import re
import unittest
from salaries import cluster, categorize
class TestCluster(unittest.TestCase):
def test_cluster(self):
data = ['Aaa', 'Aaa', 'Bbb', 'Ccc', 'Bbb', 'Ccz', 'Bbq', 'Aay', 'Ccc']
clusters = cluster(data, 2.0)
self.assertCountEqual(clusters.keys(), ('Aaa', 'Bbb', 'Ccc'))
self.assertEqual(clusters['Aaa'], ['Aaa', 'Aay'])
self.assertEqual(clusters['Bbb'], ['Bbb', 'Bbq'])
self.assertEqual(clusters['Ccc'], ['Ccc', 'Ccz'])
class TestCategorize(unittest.TestCase):
def setUp(self):
self.data = [
'Korrektur Krause 06-07/20 zugunsten FV-43106',
'August 2020 /Krause',
'September 2020 /Krause',
'Oktober 2020 /Krause',
'Oktober 2020 /Nicolodi ab 09/20',
'November 2020 /Krause',
'November 2020 /Nicoldi',
'August 2021 /Krause',
'August 2021 /Nicolodi',
'September 2021 /Nicolodi',
'Oktober 2021 /Nicolodi',
'Oktober 2021 /Nachzahlung Krause F.'
]
def test_categorize(self):
names = [m.group(1) for m in [re.match(r'.*/([A-Z][a-z]+$)', v) for v in self.data] if m]
clusters = cluster(names, 2.0)
self.assertCountEqual(clusters.keys(), ('Nicolodi', 'Krause'))
for n in self.data:
v = categorize(n, clusters)
self.assertIn(v, clusters.keys())
with self.assertRaises(ValueError):
v = categorize('Foo', clusters)
import unittest
from itertools import combinations
from levenshtein import distance
class TestDistance(unittest.TestCase):
corpus = [
# equal strings
('', '', 0),
('a', 'a', 0),
('aaa', 'aaa', 0),
# character insertion
('a', '', 1),
('ab', '', 2),
# character replacement
('a', 'b', 1),
('ab', 'ac', 1),
('aba', 'acb', 2),
# character deletion
('abc', 'ac', 1),
('abcd', 'ad', 2),
('abcde', 'ace', 2),
]
strings = [x[0] for x in corpus] + [x[1] for x in corpus]
def test_distance(self):
for a, b, d in self.corpus:
self.assertEqual(distance(a, b), d)
self.assertEqual(distance(b, a), d)
def test_symmetric(self):
for a, b in combinations(self.strings, 2):
self.assertEqual(distance(a, b), distance(b, a))
def test_triangular(self):
for a, b, c in combinations(self.strings, 3):
self.assertLessEqual(distance(a, c), distance(a, b) + distance(b, c))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment