JDASoftwareGroup / kartothek

A consistent table management library in python
https://kartothek.readthedocs.io/en/stable
MIT License
161 stars 53 forks source link

Re-write predicate involving `in` operator to use disjunction of `==` terms #325

Open lr4d opened 3 years ago

lr4d commented 3 years ago

Problem description

We use the in operator internally in predicate parsing, but we can just re-write the predicates to use a disjunction of == terms. e.g. [[('A', 'in', [1, 4, 9, 13])]] -> [[('A', '==', 1)], [('A', '==', 4)], [('A', '==', 9)], [('A', '==', 13)]]

We could implement this re-write when a user passes predicates involving in, before the predicates are evaluated. This seems to be as fast as or faster than our current evaluation of predicates in micro-benchmarks (see below).

Example code (ideally copy-pastable)

import pyarrow as pa
import numpy as np
from tempfile import TemporaryDirectory
from storefact import get_store_from_url
from functools import partial
store_factory = partial(get_store_from_url, f"hfs://{TemporaryDirectory().name}")
dataset_uuid = "test"
import pandas as pd
df = pd.DataFrame({"A": range(10), "B": ["A", "B"] * 5, "C": [np.nan, *range(-10, -1)]})
from kartothek.io.eager import read_table, store_dataframes_as_dataset
dm = store_dataframes_as_dataset(
    store=store_factory, dataset_uuid=dataset_uuid, dfs=[df]*100, # partition_on=["A", "B"]
)

store_dataframes_as_dataset(
    store=store_factory, dataset_uuid="part", dfs=[df]*100, partition_on=["A", "B"]
)
from kartothek.io.eager import read_dataset_as_metapartitions
from kartothek.io_components.read import dispatch_metapartitions_from_factory

target = [1, 4, 9, 13]
predicates_in = [[("A", "in", target)]]
predicates_normal = [[("A", "==", n)] for n in target]

from kartothek.core.factory import DatasetFactory
f = DatasetFactory(dataset_uuid=dataset_uuid, store_factory=store_factory)
f_part = DatasetFactory(dataset_uuid="part", store_factory=store_factory)

%timeit dispatch_metapartitions_from_factory(f, predicates=predicates_in)
# 61 µs ± 9.64 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

%timeit dispatch_metapartitions_from_factory(f, predicates=predicates_normal)
# 50.7 µs ± 2.45 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

%timeit dispatch_metapartitions_from_factory(f_part, predicates=predicates_in)
# 51 µs ± 2.81 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

%timeit dispatch_metapartitions_from_factory(f_part, predicates=predicates_normal)
# 50.3 µs ± 1.59 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
fjetter commented 3 years ago

For the benchmark you should evaluate larger Datasets since for these small Datasets the overhead of the operations are dominant and not the predicate evaluation.

The most performance critical part is probably not the index filtering but rather the filtering on partitions themselves after loading data. In any case you should be able to construct benchmarks using the filter_array_like function of kartothek.serialization since this is the part where this matters most

The point where I expect a significant drawback of the rewrite is when there are many elements in the value, not just four. What's the motivation for rewriting this?

lr4d commented 3 years ago

For the benchmark you should evaluate larger Datasets since for these small Datasets the overhead of the operations are dominant and not the predicate evaluation.

Sure.

What's the motivation for rewriting this?

Less performance-critical code maintenance. And I wonder how this would affect performance. We'd also have simpler predicate handling for internal code, but I'm not sure how important that is

mlondschien commented 3 years ago

We are building predicates automatically from a dataframe of partitions. The naive approach resulted in predicates which are disjunctions of above 1000 (sometimes 10000) conjunctions. Think

[
    [
        ("a", "in", [f"value_{x}" for x in range(8)]),
        ("b", "in", [2012, 2013, 2014, 2015, 2016, 2017, 2018]),
        ("c", "in", [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]),
    ]
]

which would, if translated, result in 728 conjunctions with "==" statements. We failed loading any data this way and thus had to add functionality that simplified predicates (combining disjunctions to "in" statements). So please, when implementing / benchmarking this, consider combined predicates as above and large datasets.