beancount / smart_importer

Augment Beancount importers with machine learning functionality.
MIT License
248 stars 29 forks source link

Can this use metadata to improve matching? #45

Closed philipsd6 closed 4 years ago

philipsd6 commented 6 years ago

One thing I've done to ensure I can track transactions back to original sources is to add metadata to the transactions, i.e.:

original-description: "Publix Supermarkets #1234 000001234 - Any City, Ga"

And the payee is normalized to "Publix". It would be nice if the original-description data could be used to improve matching because as noted in the beancount docs, sometimes the original description does not match at all. I can't remember the exact documentation, but basically if the original description is "*SQ PARENT FOOD COMPANY - 123557 - PARTOFCITYNA" but you know where you ate, so you set the payee to "Sandwich Shoppe".

Without referencing the original-description there's no hope of matching that, right? So how can the original-description meta be fed into the ML?

philipsd6 commented 6 years ago

For that matter, I have a credit card where the csv data contains the card holder for a shared account, and I carry that into my beancount file as txn metadata. If one card-holder's purchases at WalMart are always Expenses:Clothes and the other's are always Expenses:Groceries, that would be relevant to training.

johannesjh commented 6 years ago

Instead of original-description, I think that you could and should use the metadata key __source__ because this is how fava expects to retrieve original source texts from beancount importers, see https://github.com/beancount/fava/blob/master/fava/help/import.md

Can the __source__ metadata field be fed into the machine learning pipeline? Yes, absolutely. Two possibilities:

Since there have been a few requests for tweaking the pipeline in different ways, compare #34 and #37, I think I would prefer option number two.

@tarioch any thoughts and preferences?

tarioch commented 6 years ago

I think option 2 makes sense. I have actually also some metadata fields from importers.

philipsd6 commented 6 years ago

I added the __source__ metadata to my importer, but I still am leaving my original-description as the __source__ gets stripped out by fava.

If the __source__ gets stripped out, how can it be used for machine learning? It won't be there to compare to in the main beancount file passed in with -e. Unless I'm missing something? I don't know much about how ML is setup...

But regardless, I think option 2 is a good choice myself.

johannesjh commented 6 years ago

My mistake, I think you are right. Since fava deletes the __source__ metadata field, it cannot be used to train the machine learning model.

johannesjh commented 6 years ago

I just gave tried to refactor enhance_transactions into multiple methods. I am not sure how this should be done best. The code currently passes data around by storing it as member variables of self.

Variant 1, Member Variables: Should we keep using member variables to pass information around between methods? Advantage: This gives each method a lot of power (they can read and write any member variable they wish). Disadvantage: It is unclear which member variable can be used at which stage of the process.

For example:

    def enhance_transactions(self):
        # note: data flow is implicit because all methods read and write member variables:
        self.define_pipeline()
        self.prepare_training_data()
        self.train()
        self.predict()
        self.suggest()
        return self.transactions

Variant 2, Arguments: Or should we pass the data around as arguments? Advantage: It would be very clear which methods consume and produce what data. But it is difficult to tell in advance which data is needed in which method, especially in future, overwritten methods. Disadvantage: Ease and flexibility?

For example:

    def enhance_transactions(self, imported_transactions, existing_entries, options):
        # note: data flow is made explicit by using arguments and return values:
        pipeline = self.define_pipeline(imported_transactions, existing_entries, options)
        training_data = self.prepare_training_data(existing_entries, options)
        trained_pipeline = self.train(pipeline, training_data, options)
        transactions = self.predict(trained_pipeline, imported_transactions)
        transactions = self.suggest(trained_pipeline, transactions)
        return transactions

Variant C, Hybrid: I.e., pass data around as arguments, but also store it in self?

    def enhance_transactions(self):
        # data flow is explicit, but methods can access additional data if needed.
        self.pipeline = self.define_pipeline(self.imported_transactions, self.existing_entries, self.options)
        self.training_data = self.prepare_training_data(self.existing_entries, self.options)
        self.pipeline = self.train(self.pipeline, self.training_data, self.options)
        self.transactions = self.predict(self.trained_pipeline, self.imported_transactions)
        self.transactions = self.suggest(self.trained_pipeline, self.transactions)
        return self.transactions

Any thoughts? thx, Johannes

philipsd6 commented 6 years ago

I'm not sure if this is off-base but here's my perspective. We have two "things" to predict: Payees and Postings. So all I want is to use the decorators and direct the training pipelines to the items I want to base the predictions on. Something like this maybe?

from mybank import MyBankImporter

payee_items = ['meta.original-description', 'payee', 'units']
posting_items = ['meta.card-holder', 'units', 'cost', 'price']

MyBankImporter = PredictPayees(suggest_payees=False,
                               overwrite_existing_payees=True,
                               payee_items=payee_items)(MyBankImporter)

MyBankImporter = PredictPostings(suggest_accounts=False,
                                 posting_items=posting_items)(MyBankImporter)

CONFIG = [
    MyBankImporter('Assets:US:MyBank:Checking')
]

The idea being that PredictPayees() will from the training data put the payee, the original-description metadata, and the first posting's units into the training pipeline so an incoming transaction from the bank [desc="CHECK #1234" amount="50.00 USD"] can match an existing transaction [payee="Lawn Service" with original-description="CHECK #1233" and units="50 USD"]

I hope this makes sense!

philipsd6 commented 6 years ago

whoops didn't mean to close this.

johannesjh commented 6 years ago

Update @philipsd6: I implemented the refactoring (I chose variant1 because it was simplest to implement) in Pull Request #49. Can you have a look at it, please. This should allow you to modify the pipeline as needed, without touching too much other code.

Does it resolve this ticket?

philipsd6 commented 6 years ago

I'm not sure how to modify the pipeline. Is there an example available? Right now my config.py just uses the decorators:

ChaseCCImporter = PredictPostings(suggest_accounts=False)(
    PredictPayees(suggest_payees=False, overwrite_existing_payees=True)(ChaseCCImporter))

CONFIG = [
    ChaseCCImporter('Liabilities:Chase:CC')
]

and I call bean-extract with the -e flag pointing to my main beancount file.

The main thing I need to do is point smart_importer to my original_description metadata for each posting, and maybe tell it to ignore certain postings that are polluting the training data.

philipsd6 commented 6 years ago

Well, not sure if I'm doing this the best way, but I implemented this in my_smart_import.py:

from datetime import datetime, timedelta
from smart_importer.predict_postings import PredictPostings
from smart_importer.predict_payees import PredictPayees
from beancount.core.data import Transaction
from typing import List, Union
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.svm import SVC
from smart_importer import machinelearning_helpers as ml

class GetOriginalDesc(ml.TransformerMixin, ml.NoFitMixin):
    '''
    Scikit-learn transformer to extract the original-description.
    The input can be of type List[Transaction] or List[TxnPostingAccount],
    the output is a List[str].
    '''

    def transform(self, data: Union[List[ml.TxnPostingAccount], List[Transaction]]):
        return [self._get_original_desc(d) for d in data]

    def _get_original_desc(self, d):
        if isinstance(d, Transaction):
            return d.meta.get('original-description', d.payee or '')
        elif isinstance(d, ml.TxnPostingAccount):
            return d.txn.meta.get('original-description', d.txn.payee or '')

class GetUnits(ml.TransformerMixin, ml.NoFitMixin):
    def transform(self, data: Union[List[ml.TxnPostingAccount], List[Transaction]]):
        return [self._get_units(d) for d in data]

    def _get_units(self, d):
        if isinstance(d, Transaction):
            return str(d.postings[-1].units)
        elif isinstance(d, ml.TxnPostingAccount):
            return str(d.posting.units)

class MyPredictPayees(PredictPayees):
    def prepare_training_data(self):
        one_year_ago = datetime.now().date().replace(day=1) - timedelta(days=365)
        self.training_data = [xact for xact in self.training_data if xact.date > one_year_ago]
        super().prepare_training_data()

    def define_pipeline(self):
        self.pipeline = Pipeline([
            ('union', FeatureUnion(
                transformer_list=[
                    ('narration', Pipeline([
                        ('getNarration', ml.GetNarration()),
                        ('vect', CountVectorizer(ngram_range=(1, 3))),
                    ])),
                    ('payee', Pipeline([  # any existing payee, if one exists
                        ('getPayee', GetOriginalDesc()),
                        ('vect', CountVectorizer(ngram_range=(1, 3))),
                    ])),
                    ('dayOfMonth', Pipeline([
                        ('getDayOfMonth', ml.GetDayOfMonth()),
                        ('caster', ml.ArrayCaster()),  # need for issue with data shape
                    ])),
                    ('units', Pipeline([
                        ('getUnits', GetUnits()),
                        ('caster', CountVectorizer(ngram_range=(1, 3))),
                    ])),
                ],
                transformer_weights={
                    'narration': 0.8,
                    'payee': 0.5,
                    'dayOfMonth': 0.1,
                    'units': 0.2,
                })),
            ('svc', SVC(kernel='linear')),
        ])

class MyPredictPostings(PredictPostings):
    def prepare_training_data(self):
        one_year_ago = datetime.now().date().replace(day=1) - timedelta(days=365)
        self.training_data = [xact for xact in self.training_data if xact.date > one_year_ago]
        super().prepare_training_data()

    def define_pipeline(self):
        self.pipeline = Pipeline([
            ('union', FeatureUnion(
                transformer_list=[
                    ('narration', Pipeline([
                        ('getNarration', ml.GetNarration()),
                        ('vect', CountVectorizer(ngram_range=(1, 3))),
                    ])),
                    ('account', Pipeline([
                        ('getReferencePostingAccount', ml.GetReferencePostingAccount()),
                        ('vect', CountVectorizer(ngram_range=(1, 3))),
                    ])),
                    ('payee', Pipeline([  # any existing payee, if one exists
                        ('getPayee', GetOriginalDesc()),
                        ('vect', CountVectorizer(ngram_range=(1, 3))),
                    ])),
                    ('dayOfMonth', Pipeline([
                        ('getDayOfMonth', ml.GetDayOfMonth()),
                        ('caster', ml.ArrayCaster()),  # need for issue with data shape
                    ])),
                    ('units', Pipeline([
                        ('getUnits', GetUnits()),
                        ('caster', CountVectorizer(ngram_range=(1, 3))),
                    ])),
                ],
                transformer_weights={
                    'narration': 0.8,
                    'account': 0.8,
                    'payee': 0.5,
                    'dayOfMonth': 0.1,
                    'units': 0.2,
                })),
            ('svc', SVC(kernel='linear')),
        ])

Then I import that into my importer config.py and use them as my decorators. It's not 100% accurate, but it seems to be at least 90% better than before.

I just don't like how much of the smart_importers code I had to copy-n-paste just to redefine a few bits of it. Is this how we're supposed to do it?

johannesjh commented 6 years ago

I am happy to hear that your solution improved the accuracy of predictions! but you are right, it is a lot of code duplication.

To improve the situation, I propose that we could move the definition of transformers and transformer_weights into a separate method. By overwriting this method, your decorator could add your own transformers. See according changes in branch feature/refs-45-ease-custom-pipeline-definitions

Can you give feedback please: These changes would remove the code duplication, right?

johannesjh commented 5 years ago

Update to this ticket: In the meantime, @yagebu did a major refactoring (see ticket #47, pull request #62). This removed a lot of code duplication and reduced the overall code size. More importantly for this ticket, creating custom pipelines is now much more elegant. For example, have a look at how PredictPayees and PredictPostings is now implemented by overwriting and re-configuring the EntryPredictor base class.

I think this ticket can be closed, right?

philipsd6 commented 5 years ago

Actually, I'm finding the latest version, while it looks much nicer, does not actually work for me. I still will have to dive deep to do what I want. In order to put my original-description into the mix, get_pipeline needs to be modified -- so I'd need to redefine that, and override EntryPredictor to point it to my redefined version. And if I want to get the first posting's unit into the mix, before you know it I'm rewriting most of the codebase again. :/

johannesjh commented 5 years ago

Hm, I see. You could of cource patch (i.e., overwrite and replace) the get_pipeline method. I am not saying this is pretty, just that it should be possible:

import smart_importer.pipelines

def my_get_pipeline(attribute):
    # your implementation here
    pass

smart_importer.pipelines.get_pipeline = my_get_pipeline

@yagebu, I'd be interested in your opinion: wouldn't it be better to make the get_pipeline function a method of the EntryPredictor class? That way, @philipsd6 could overwrite it more easily.

philipsd6 commented 5 years ago

Well unless I'm doing it wrong, patching get_pipeline that way just doesn't work, probably because predictor.py just imports get_pipeline from smart_importer.pipelines and that bypasses my redefinition in my importer config.py.

So yes, I think get_pipeline needs to be a EnterPredictor method -- it'll allow me to run the existing one via super() and then catch the ValueError and do my custom stuff instead. Also, probably a good idea to raise ValueError with a sensible message, as it's kinda hard to know what happened if you just get a bare ValueError. Something like:

    raise ValueError(f"Invalid attribute: {attribute}")
philipsd6 commented 5 years ago

Or... is there a sensible default? Does this make sense?

def get_pipeline(attribute):
    """Make a pipeline for a given entry attribute."""
    if attribute.startswith('date'):
        return make_pipeline(
            AttrGetter(attribute, ''),
            ArrayCaster(),
        )
    return make_pipeline(
        AttrGetter(attribute, ''),
        StringVectorizer(),
    )

I'd still need that to be a method, so I can add a make_pipeline for the units on the first posting of a transaction...

johannesjh commented 5 years ago

I moved get_pipeline into the EntryPredictor class, and refactored get_pipeline as proposed in the above comment, see pull request #79

philipsd6 commented 5 years ago

I'm really confused as to how to do what I want actually. I can't use attribute to point to original-description because a) it's a meta key, not an attribute, and b) I don't want to update it, I just want to use it to enrich the info used to update payee. I wish there was a good example of customizing this because it seems tricky to figure it all out by myself.

philipsd6 commented 5 years ago

After all this time, I still cannot seem to figure out how to do what seems to me a natural way to enrich transactions as I described in my original issue/feature request.

  1. I have transactions with the payee set to a correct human readable string, and the 'original-description' meta set to whatever came from the bank's export file as the description, usually with all caps and extraneous things like store numbers, etc...
  2. When doing a smart machine learning match, I want it to match the bank description in the incoming import to the existing entries 'original-description' meta or if that doesn't exist, then to the 'payee' attribute.
  3. I want it to update the 'payee' and 'narration' based on the matched existing entry's values, not what the pipeline matched on.
  4. I want the postings to be updated based on the above matching behavior as well.

So far this is what I've done, and it's really not working well. I'm sure I'm missing something, and yet I'm writing a lot of code as it is:

from smart_importer import apply_hooks, EntryPredictor, PredictPayees, PredictPostings
from smart_importer.pipelines import Getter, AttrGetter, StringVectorizer, ArrayCaster
from sklearn.pipeline import make_pipeline
import operator

class MetaGetter(Getter):
    """Get a transaction meta value"""

    def __init__(self, key, default=None):
        self.default = default
        self._key = key

    def _getter(self, txn):
        return txn.meta.get(self._key, self.default)

class MyEntryPredictor(EntryPredictor):
    """Limit training data to a yearly window, and ensure the payee attribute prioritizes the original-description"""
    def prepare_training_data(self):
        super().prepare_training_data()
        one_year_ago = datetime.now().date().replace(day=1) - timedelta(days=365)
        self.training_data = [xact for xact in self.training_data
                              if xact.date > one_year_ago]

    def get_attribute_pipeline(self, attribute):
        if attribute == 'payee':
            return make_pipeline(
                MetaGetter('original-description', ''),
                StringVectorizer(),
            )
        if attribute in ['narration']:
            return make_pipeline(
                AttrGetter(attribute, ''),
                StringVectorizer(),
            )
        if attribute.startswith('date.'):
            return make_pipeline(
                AttrGetter(attribute, ''),
                ArrayCaster(),
            )
        raise ValueError(f"Invalid attribute: {attribute}")

class MyPredictPayees(MyEntryPredictor, PredictPayees):
    @property
    def targets(self):
        if not self.attribute:
            raise NotImplementedError
        if self.attribute == 'payee':
            return [
                entry.meta.get('original-description', getattr(entry, self.attribute) or '')
                for entry in self.training_data
            ]
        return [
            getattr(entry, self.attribute) or ''
            for entry in self.training_data
        ]

class MyPredictPostings(MyEntryPredictor, PredictPostings):
    pass

predict_payees = MyPredictPayees(suggest=False, overwrite=True)
predict_postings = MyPredictPostings(suggest=False, overwrite=True)
yagebu commented 5 years ago

You want to predict the payee attribute, so you can leave targets unchanged. I've changed get_pipeline to allow all attributes, so something like the following should do it.

class PredictPayeesUsingMeta(EntryPredictor):
    attribute = "payee"
    weights = {"narration": 0.8, "meta.original-description": 0.5}
belidzs commented 5 years ago

@yagebu using the example from your comment above I've created a small predictor class using a meta attribute, but I get the following exception even though every extracted transaction has a raiffeisen_payee meta attribute (none of the learning dataset has it though):

from smart_importer import EntryPredictor

class PredictPayeeRaiffeisen(EntryPredictor):
    attribute = "payee"
    weights = {
        "meta.raiffeisen_payee": 0.8
    }
ERROR:root:Importer beancount_importers.bank.raiffeisen.Importer.extract() raised an unexpected error: 'dict' object has no attribute 'raiffeisen_payee'
ERROR:root:Traceback: Traceback (most recent call last):
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\beancount\ingest\extract.py", line 191, in extract
    allow_none_for_tags_and_links=allow_none_for_tags_and_links)
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\beancount\ingest\extract.py", line 69, in extract_from_file
    new_entries = importer.extract(file, **kwargs)
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\smart_importer\hooks.py", line 45, in patched_extract_method
    importer, file, imported_entries, existing_entries
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\smart_importer\predictor.py", line 64, in __call__
    self.train_pipeline()
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\smart_importer\predictor.py", line 133, in train_pipeline
    self.pipeline.fit(self.training_data, targets)
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\sklearn\pipeline.py", line 265, in fit
    Xt, fit_params = self._fit(X, y, **fit_params)
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\sklearn\pipeline.py", line 230, in _fit
    **fit_params_steps[name])
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\sklearn\externals\joblib\memory.py", line 342, in __call__
    return self.func(*args, **kwargs)
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\sklearn\pipeline.py", line 614, in _fit_transform_one
    res = transformer.fit_transform(X, y, **fit_params)
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\sklearn\pipeline.py", line 793, in fit_transform
    for name, trans, weight in self._iter())
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\sklearn\externals\joblib\parallel.py", line 920, in __call__
    while self.dispatch_one_batch(iterator):
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\sklearn\externals\joblib\parallel.py", line 759, in dispatch_one_batch
    self._dispatch(tasks)
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\sklearn\externals\joblib\parallel.py", line 716, in _dispatch
    job = self._backend.apply_async(batch, callback=cb)
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\sklearn\externals\joblib\_parallel_backends.py", line 182, in apply_async
    result = ImmediateResult(func)
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\sklearn\externals\joblib\_parallel_backends.py", line 549, in __init__
    self.results = batch()
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\sklearn\externals\joblib\parallel.py", line 225, in __call__
    for func, args, kwargs in self.items]
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\sklearn\externals\joblib\parallel.py", line 225, in <listcomp>
    for func, args, kwargs in self.items]
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\sklearn\pipeline.py", line 614, in _fit_transform_one
    res = transformer.fit_transform(X, y, **fit_params)
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\sklearn\pipeline.py", line 298, in fit_transform
    Xt, fit_params = self._fit(X, y, **fit_params)
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\sklearn\pipeline.py", line 230, in _fit
    **fit_params_steps[name])
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\sklearn\externals\joblib\memory.py", line 342, in __call__
    return self.func(*args, **kwargs)
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\sklearn\pipeline.py", line 614, in _fit_transform_one
    res = transformer.fit_transform(X, y, **fit_params)
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\sklearn\base.py", line 467, in fit_transform
    return self.fit(X, y, **fit_params).transform(X)
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\smart_importer\pipelines.py", line 36, in transform
    return [self._getter(d) for d in data]
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\smart_importer\pipelines.py", line 36, in <listcomp>
    return [self._getter(d) for d in data]
  File "C:\Users\belidzs\PycharmProjects\beancount_virtualenv\lib\site-packages\smart_importer\pipelines.py", line 50, in _getter
    return self._txn_getter(txn) or self.default
AttributeError: 'dict' object has no attribute 'raiffeisen_payee'

I'm not really sure what's wrong here but I suppose this should work. Can you give me a pointer how to fix this?

I have a hunch that the problem might be that not every transaction in the training data has this meta tag, but if that's the case I suppose those transactions should be skipped silently

yagebu commented 4 years ago

@belidzs: I actually didn't check the example and yes, as you noticed, it didn't work yet. Should be fixed now.