JDASoftwareGroup / kartothek

A consistent table management library in python
https://kartothek.readthedocs.io/en/stable
MIT License
161 stars 53 forks source link

Let merge_datasets_as_delayed merge >2 datasets and filter by predicates #235

Open mlondschien opened 4 years ago

mlondschien commented 4 years ago

It would be nice to be able to supply kartothek.io.dask.delayed.merge_datasets_as_delayed with a list of dataset_uuids to merge an arbitrary number of datasets.

This could be implemented by

to not break existing usages of the function.

The match_how="left" would need to be replaced by match_how="first". I am not sure how to translate match_how="prefix". What is a typical use case here? Additionally it would be nice to supply a merge_func via merge_tasks that takes more than two dataframes as input. This would require a similar change as above, either defining a MetaPartition.concat_dataframes or similar method or allowing MetaPartition.merge_dataframes to be supplied with a list for left.

Questions concerning the current implementation of merge_datasets_as_delayed:

Additionally it would be nice to be able to supply merge_datasets_as_delayed with a predicates argument that filters by partitions before merging and then uses filter_df_from_predicates after merging.

fjetter commented 4 years ago

Firstly, thanks for the interest and sorry for the delayed response.

Secondly, the entire currently existing alignment logic is about as old as the library and was never really refactored. I'm open to breaking some eggs in this situation (We have a few other things which should be addressed in terms of UX so friendly, breaking release would be appropriate in the near future, anyhow)

he keys of the partitions are compared to match partitions. However, since the keys include the name of the stored parquet files, they will never match. What is the idea here?

As I said, this implementation is dated back a long time. In the very first iterations we used hard coded file names (e.g. partition_0, partition_1) which were provided by the user application. In this context this makes a lot of sense and allows for the easiest alignment, obviously. I consider providing these labels as a bad practice and want to remove this in the next breaking release so we can safely consider this merge option as deprecated.

It is annoying to have to supply the names of the tables in merge_tasks

Similar to the labels I would encourage people not to actually use multi-table datasets and stick to a single table per dataset. If we remove this feature this question also becomes obsolete.

it would be nice to be able to supply merge_datasets_as_delayed with a predicates argument

Definitely.

Why not supply the merge_func with all available dataframes

As a first iteration I would propose to stick to a simple deep join, i.e. merge(merge(df1, df2), df3),...) and not include this in the interface. I'm curious, do you need a custom merge function or would this suite your need?

Full disclosure: We do have in in-house implementation of a more advanced multi dataset alignment. The alignment logic is essentially based on "match partition_keys" and does not allow more advanced alignment. We intend to push this OSS as well but the time line is, best case, in a few weeks. I'd be curious if this would suite your need.

Can you elaborate a bit more how you want to use this?

fjetter commented 4 years ago

Depending on how your datasets look like (partitioning and indexing), #226 could also be interesting for you. If a simple join axis suffices, you could let dask figure out the join once you have an index

fjetter commented 4 years ago

I am currently shocked, we forgot to put the tests for the merge pipeline in the upstream package 😱 (that's embarrassing)

xhochy commented 4 years ago

I am currently shocked, we forgot to put the tests for the merge pipeline in the upstream package 😱 (that's embarrassing)

No, there are tests in https://github.com/JDASoftwareGroup/kartothek/blob/9314d66b2b35a64282945c6b6ae24d8bb5a51ed0/kartothek/io/testing/merge.py and https://github.com/JDASoftwareGroup/kartothek/blob/9314d66b2b35a64282945c6b6ae24d8bb5a51ed0/tests/io/dask/delayed/test_merge.py

They are very basic but cover at least the majority of the code to show that #239 breaks these tests.

As a first iteration I would propose to stick to a simple deep join, i.e. merge(merge(df1, df2), df3),...) and not include this in the interface. I'm curious, do you need a custom merge function or would this suite your need?

Supply all DataFrames to the merge function is a enormous performance benefit. If you don't do a plain merge but can do other optimizations like only use pd.concat (you need to be very careful for that), this saves a lot of time in loading.

fjetter commented 4 years ago

If you don't do a plain merge but can do other optimizations like only use pd.concat (you need to be very careful for that), this saves a lot of time in loading.

Agreed, but I would argue this doesn't need to be part of the public interface, does it? I guess it should be possible to find a suitable fast join method which can be used for everything.

mlondschien commented 4 years ago

No worries for taking your time to respond. I guess @xhochy (who is now implementing the merge functionality much better than I could) could answer most of your questions? Simply put, the issue we were facing is that the combined dataset is too big to fit into memory before subselecting columns and applying predicates. We have around 1000 partitions per dataset with only one parquet file per partition, such that sequentially loading chunks (or a subset of columns), aligning (basically pd.concat) and applying predicates seems like the optimal solution (and basically describes what is done in #243).