apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.3k stars 3.47k forks source link

[C++][Dataset] Partition level filters should be able to provide filtering to file systems #16972

Open asfimport opened 4 years ago

asfimport commented 4 years ago

When providing a filter for partitions, it should be possible in some cases to use it to optimize file system list calls.  This can greatly improve the speed for reading data from partitions because fewer number of directories/files need to be explored/expanded.  I've fallen behind on the dataset code, but I want to make sure this issue is tracked someplace.  This came up in SO question linked below (feel free to correct my analysis if I missed the functionality someplace).

Reference: https://stackoverflow.com/questions/58868584/pyarrow-parquetdataset-read-is-slow-on-a-hive-partitioned-s3-dataset-despite-u/58951477#58951477

Reporter: Micah Kornfield / @emkornfield

Related issues:

Note: This issue was originally created as ARROW-7224. Please see the migration documentation for further details.

asfimport commented 4 years ago

Francois Saint-Jacques / @fsaintjacques: There's a confusion between the new dataset API (in C++) and the existing ParquetDataset that is purely in python.

asfimport commented 4 years ago

Micah Kornfield / @emkornfield: @fsaintjacques I was intending this to cover the new dataset API. I figured the python one will be replaced (hopefully some point soon). If this is covered in the new C++ Dataset API then lets resolve it (I'm not sure anyone has bandwidth to work on the existing python one).

asfimport commented 3 years ago

Andy Douglas: Is there any update on this?

I'm also finding that instantiation a pyarrrow dataset containing a large number of files is slow even when passing paths explicitly. I've tried dropping to the parquet dataset interface and disabling schema validation, but it's still slow. 

In general, is there a way of caching information regarding partitions/files perhaps within the metadata? I was thinking perhaps of a hierarchical setup which was supported by the query language where the query is initially evaluated on the partition/files cache (if present) to determine the list of relevant files. Then a dataset is instantiated by explicitly passing the list of relevant files before finally evaluating the query on this. This could be supported outside of pyarrow but I've struggled to find a way to evaluate the parts of the query relevant to the partitions without separating out into a separate query which is clunky.

asfimport commented 3 years ago

Ben Kietzman / @bkietz: I'm not sure how this would work; directories viewed by datasets are only listed once (on construction). Thereafter a vector of discovered fragments is maintained internally and (subsets of it, determined by partition expressions) returned by GetFragments(). Filtering on partition expressions will never incur IO

asfimport commented 3 years ago

Andy Douglas: @bkietz  

directories viewed by datasets are only listed once (on construction)

right but for a dataset containing a large number of parquet files (> 100k) the construction can take a long time so too can querying the dataset for a particularly partition. What I was suggesting is the ability to load a cached copy of the mapping from partition to parquet file as a separate metadata dataset. Clearly this cache would be invalidated when the dataset is written to but I have lots of datasets that are read more than they are written, so for me the caching works well. Both the initial load and subsequent querying are much faster (seconds not minutes for the initial load and then tens of seconds for the query)

asfimport commented 3 years ago

Micah Kornfield / @emkornfield: {\{I think this would require some changing of dataset assumptions (I'm not familiar to say how and if it is worth the work). But it could be done as follows. Knowing a directories look like: "/{a}/\{b}/\{c}/\{files}" (either through inference or user provides this) then if a predicate "a=foo" instead of listing all objects and caching them a directory listing of "/foo/*" could be issued.}}

So this might involve some of the following for datasets:

  1. Making construction lazier.
  2. Tracking which top level structures have been explored and which ones haven't.
  3. Constructing listings in parallel given a predicate.

A success metric is latency for the first returned data, it seems like the existing datasets contract is optimized around minimizing total latency across all queries.

If you think about the common case for a date partitioned datawarehouse then the most common queries are going to be on recent data. Listing only the partitions needed can reduce latency (and potentially by quite a bit if the underlying file system doesn't support reverse lexicographic listing).

asfimport commented 3 years ago

Andy Douglas:

  1. Making construction lazier.
  2. Tracking which top level structures have been explored and which ones haven't.
  3. Constructing listings in parallel given a predicate.

    All of these would definitely help.

    I have a use case that involves a dataset with over 1M files on s3. I update the cache file incrementally after an overnight update job completes avoiding having to reindex the entire dataset each time.

    What would be the suggested approach here?

asfimport commented 3 years ago

Ben Kietzman / @bkietz: [~andydoug] the metadata you mentioned makes me think of pyarrow.dataset.parquet_dataset, which allows construction of a dataset from a single metadata-only parquet file. When such a _metadata file can be written filters will be applied not only to partition keys but also to row group statistics without IO; might be worth a look.

The fundamental problem I see is a mismatch in how pq.ParquetDatset and pyarrow.dataset.* consider filters: the former considers a filter during construction while the latter is intended to handle multiple reads with differing filters. In the latter case, the dataset is intended to encapsulate the mapping between partitions and files:


import pyarrow.dataset as ds
dataset = ds.dataset('s3://bucket-name/dataset-name', format='parquet')

kindness_day_filter = ds.field('year') == 2019 & ds.field('month') == 11 & ds.field('day') == 13
kindness_day_table = dataset.to_table(filter=kindness_day_filter)

# repeat with other filters
kindness_week_filter = ds.field('year') == 2019 & ds.field('month') == 11 & ds.field('day') >= 10 & ds.field('day') <= 16

If that's neither of these is acceptable, note that pq.ParquetDataset can be constructed from a list of paths of data files. If an explicit list of files is available then the dataset can be (repeatedly) constructed without invoking a directory listing method.

FWIW, we have https://issues.apache.org/jira/browse/ARROW-8163 to track lazier construction of datasets and the only issue I'm aware of which we had for supporting more sophisticated listing behavior is https://issues.apache.org/jira/browse/ARROW-6257 . It seems before lazy construction of a dataset could benefit this case we'd need to also support producing a stream of results from querying a FileSelector @pitrou

https://github.com/apache/arrow/pull/9632

@jorisvandenbossche

asfimport commented 3 years ago

Ben Kietzman / @bkietz: Also worth noting: the case discussed in this thread (of a filter which references each partition field exactly once and specifies an equality condition for each) corresponds to a single subdirectory which needs to be scanned. This is not the case for all filters, but it would be possible to add a special case when such a prefix can be extracted. This would require the Partitioning be explicitly constructed (so that we know without inspection of paths what partition fields are in play), but that's fairly straightforward.

asfimport commented 3 years ago

Ben Kietzman / @bkietz:

I have a use case that involves a dataset with over 1M files on s3. I update the cache file incrementally after an overnight update job completes avoiding having to reindex the entire dataset each time.

Another potential workaround would be to create a custom FileSystem which replaces directory listing calls with reads of this cache file. In Python, this can be done by subclassing PyFileSystem and FileSystemHandler or through fsspec

asfimport commented 3 years ago

Micah Kornfield / @emkornfield:

Also worth noting: the case discussed in this thread (of a filter which references each partition field exactly once and specifies an equality condition for each) corresponds to a single subdirectory which needs to be scanned. This is not the case for all filters, but it would be possible to add a special case when such a prefix can be extracted. This would require the Partitioning be explicitly constructed (so that we know without inspection of paths what partition fields are in play), but that's fairly straightforward. Agreed.  I'll note that there is actually an interaction with the file system here as well, but specialization for equality is a starting point.  For instance >= partition keys is achievable for S3, as well as a range  lb <= column <= ub.  But strict less then/less than or equal would not achieve the same efficiencies.  FWIW, Spark as has APIs for push-down predicates that allow a source to tell it which predicates it can be pushed down effectively and which need to be done as part of the engine (i.e. using compute kernels).  A similar abstraction might be useful here.

asfimport commented 3 years ago

Antoine Pitrou / @pitrou: @bkietz Can you open a JIRA for the streaming GetFileInfo(FileSelector) request?

asfimport commented 3 years ago

Joris Van den Bossche / @jorisvandenbossche:

FWIW, Spark as has APIs for push-down predicates that allow a source to tell it which predicates it can be pushed down effectively and which need to be done as part of the engine (i.e. using compute kernels).

[~emkornfield@gmail.com] do you have a (doc) reference for this?

As @bkietz mentioned above, a main part of the issue is the "filtering during construction" vs "filtering during query". Currently you can only provide a filter when actually querying. But do we want to consider adding a kind of filter argument for during construction as well? (in case you know that all your subsequent queries will use that filter)

asfimport commented 3 years ago

Ben Kietzman / @bkietz: Streaming GetFileInfo: ARROW-11924

asfimport commented 3 years ago

Micah Kornfield / @emkornfield: @jorisvandenbossche  I think this is the relevant API from DataSourceV2.  

 

It seems like a bad user experience to expose a "filter on construction parameter", but it could be a way to mitigate this.  I think workarounds @bkietz  proposed are also workable.  As I've said before I think supporting the feature that this JIRA is asking for is complex and potentially requires big changes to Datasets so I understand if it isn't immediately prioritized (but I think it can have a large impact for common cases).