JDASoftwareGroup / kartothek

A consistent table management library in python
https://kartothek.readthedocs.io/en/stable
MIT License
161 stars 53 forks source link

Make use of Arrow Dataset API? #242

Open jorisvandenbossche opened 4 years ago

jorisvandenbossche commented 4 years ago

In the Apache Arrow C++ project, we have been working the last moths on a Dataset API (original design document: https://docs.google.com/document/d/1bVhzifD38qDypnSjtf8exvpP3sSB5x_Kw9m-n66FB2c/edit). A first iteration with python bindings is available in pyarrow 0.16 (but it's not yet considered stable).

The dataset API has the goal to handle (scan/materialize) data sources larger than memory, and specifically provide:

Think: the current pyarrow.parquet.ParquetDataset functionality, but then not specific to parquet, and not tied to python (so for example the R bindings of arrow can also use this).

I suppose the second item is not of much interest for kartothek, but the first and third might be.

A small illustrative example of how it can look like in python:

import pyarrow.dataset as ds
dataset = ds.dataset("path/to/partitioned/dataset",
                     format="parquet", partitioning="hive", filesystem=..)
table = dataset.to_table(columns=['col1', 'col2'],
                         filters=(ds.field('key2') == 1) & (ds.field('key2'== 2))

(this uses the discovery mechanisms of Arrow, but you can also construct a Dataset manually with a list of files and partition expressions, if those are already known)

The reference docs: https://arrow.apache.org/docs/python/api/dataset.html (but prose docs with examples is still lacking for now)


Long term, we would like to have this replace the python implementation of pyarrow.parquet.ParquetDataset. Now, kartothek doesn't actually use ParquetDataset, but only basic pyarrow.parquet.read/write_table and pyarrow.parquet.ParquetFile (to get metadata, statistics, read row group, etc of a single file). And those APIs are normally there to stay (regardless of the datasets API work).

However, kartothek still implements similar things (eg row group filtering with predicate pushdown) that would also be covered by Arrow Datasets. So although there is no immediate action required for kartothek due to those changes in Arrow (eg to replace deprecated APIs), I think it would still be useful to think about to what extent kartothek could leverage this work being done in Arrow Datasets.

So my purpose for this issue is to see whether such usage of Arrow Datasets is desirable and / or possible, and if so, what requirements kartothek would have (potential missing features in Arrow Datasets).

fjetter commented 4 years ago

I believe most of the kartothek.serialization module should be handled by pyarrow. What we do there is restricted to single file actions, e.g. apply predicates, filter row groups, buffer storage access, etc. We built this at a time where pyarrow didn't support anything like this. I believe we could already get rid of a huge chunk of code if we would migrate to the filters of pa.read_pandas. We would probably discover a lot of issues regarding type stability since we invested a lot into this already but I guess both projects would benefit from this.

Moving on to the actual datasets...

I believe we would benefit the most if there was an interface like

import pyarrow.dataset as ds
part_spec = KartothekPartitionSpec() # This doesn't exist but think of it as a replacement to the filesystem crawling
dataset = ds.dataset(part_spec, format="parquet", ...)
query_plan = dataset.to_table(columns=[...], filter=...)

# The interface should somehow return a pointer to the data and the local filters which need to be applied to this shard.
splits == [
   ("path_to_file", local_filters, ...), # potentially also with more statistics for query optimisation, if available
   ...
]

If there was a way we can interface and you would perform the partition pruning, etc. this would be helpful. For a kartothek integration this would require the concept of secondary indices (which we love dearly), which is a very big topic if you want to define some sort of standard. Our kinds of indices are very simple, though, and we wouldn't need anything sophisticated.

Another part of this is, of course, pre-aggregated or pre-collected file statistics which may or may not be included here. This is something kartothek doesn't do at the moment but I can see this being added in the future.

The output, which I referred to as "splits" for the lack of a better word (please educate me if there is a commonly used term for this data) is where it becomes interesting from my POV. This can be used to construct a dask graph, optimise the query, perform joins with other datasets, etc. I'm wondering what the ambition of arrow is for the tasks/splits. imho, the biggest downside of the arrow dataset is that it cannot work on larger than memory data but this is also why we built kartothek (one of the reasons)

jorisvandenbossche commented 4 years ago

@fjetter Thanks for the quick response!

Already some answers / questions:

I believe we could already get rid of a huge chunk of code if we would migrate to the filters of pa.read_pandas. We would probably discover a lot of issues regarding type stability since we invested a lot into this already but I guess both projects would benefit from this.

Yes, that would indeed be interesting to exercise your tests for filtering to ensure the new filtering implementation in Arrow Dataset C++ is robust (and so that it can be used by kartothek once it stabilizes a bit).

I think one blocker to try this out is support for buffer input (right now only file paths are supported in the interface, opened https://issues.apache.org/jira/browse/ARROW-8074 for this), as it seems that eg the ParquetSerializer relies on passing buffer readers to read_pandas/ParquetFile.

I believe we would benefit the most if there was an interface like

I believe that should mostly be possible. The lower level Dataset class constructor allow to construct a dataset from a specific spec (the overall schema, all file paths, the partition expressions for each file path).

What you are referring to as "splits" would map to what is called "fragments" in arrow, I think (and more or less would map to "partitions" of the dataframe in dask?). The dataset API currently gives access to those fragments' file paths and partition information ("partition expression"):

(example with the typical NYC taxi partitioned dataset where I have file paths like "/year=2016/month=10/data.parquet'")

In [2]: dataset = ds.dataset("nyc-taxi-data/dask-partitioned/", partitioning="hive")  

In [3]: fragments = list(dataset.get_fragments()) 

In [4]: fragments[0].path 
Out[4]: 'nyc-taxi-data/dask-partitioned/year=2016/month=10/2e758b3ee521402998915104e54cc2d2.parquet'

In [5]: fragments[0].partition_expression
Out[5]: <pyarrow.dataset.AndExpression ((year == 2016:int32) and (month == 10:int32))>

The partition expression gives the information of the partition keys for this specific fragment (a file here), and could be combined with the overall filter to get a local filter (in arrow we don't have this distinction, though, the overall filter will work fine for the fragment).

For a kartothek integration this would require the concept of secondary indices (which we love dearly), which is a very big topic if you want to define some sort of standard.

For people not familiar with this concept, can you give a brief explanation of what secondary indices are and how they are used in kartothek? (or point to the docs about this?)

I'm wondering what the ambition of arrow is for the tasks/splits. imho, the biggest downside of the arrow dataset is that it cannot work on larger than memory data but this is also why we built kartothek (one of the reasons)

Right now, Arrow is single-node, in-memory (but can be mmapped). The longer term scope, I am not sure about. But that is also one of the reasons that I started experimenting on the proof of concept to connect dask with Arrow Datasets, to already enable a larger than memory / distributed use case on top of Arrow Datasets.

fjetter commented 4 years ago

as it seems that eg the ParquetSerializer relies on passing buffer readers to read_pandas/ParquetFile.

Yes, this is mostly because we implemented a simple block buffering for consecutive reads (due to projection, pushdown, etc.) (might also be implemented in pyarrow, of course) see https://github.com/JDASoftwareGroup/kartothek/blob/205fd8d3406bff58cf1dd6d1bf50649b10067cd4/kartothek/serialization/_io_buffer.py#L10

For people not familiar with this concept, can you give a brief explanation of what secondary indices are and how they are used in kartothek? (or point to the docs about this?)

The indices we have are currently a simple mapping between uniquely observed values of a given column to the partitions they are stored in. I just realise we never properly documented this... @xhochy do you have something practical to share here?

I have this slide where a representation of the indices are shown image

this information is used for the partition pruning / query planing to determine which fragments we actually need to read, on top of the plain hive-partitioning (in fact, even the filesystem layout is converted to an index internally, see primary index)

See conversion_to_dataframe and calculate_fragments with this (The class MetaPartition represents the fragment) for the usage and implementation (the implementation is currently horrifyingly inefficient because we transform frequently between dict/array, please don't judge :) )

xhochy commented 4 years ago

I had a first look at this and it seems that we need some of our current very basic schema evolution also in pyarrow.dataset implemented first:

Speed is pleasantly a lot faster than currently in kartothek as we convert much later into pandas.