apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.6k stars 1.05k forks source link

[EPIC] Improve the performance of ListingTable #9964

Open alamb opened 3 months ago

alamb commented 3 months ago

Is your feature request related to a problem or challenge?

The ListingTable works quite well in practice, but like all software could be made better. I am writing up this ticket to enumerate some areas for improvement in the hopes people who are interested can collaborate / coordinate their efforts

Background

DataFusion has a ListingTable that effectively reading tables stored in one or more files in a "hive partitioned" directory structure:

So for example, give files like this:

/path/to/my_table/file1.parquet
/path/to/my_table/file2.parquet
/path/to/my_table/file3.parquet

You can create a table with a command like

CREATE EXTERNAL TABLE my_table
LOCATION '/path/to/my_table'

And the ListingTable will handle figuring out schema, and running queries against those files as though they were a single table.

Describe the solution you'd like

Here are some things I suspect could be improved:

All Formats

Object store list caching

For large tables (many files) on remote stores, the actual object store call to LIST may be non trivially expensive and thus doing over and over is expensive

@henrifroese points out a similar thing for pruning partitions https://github.com/apache/arrow-datafusion/issues/9654

Parquet Specific

MetaData caching

ListingTable (code link) prunes files based on statistics, and then inside the ParquetExec itself (link) where it again prunes row groups and data pages based on metadata. Fetching and parsing this metatadata twice (once to prune files and once to prune row groups) could be improved

IO granularity

I have heard it said that the DataFusion ParquetExec reader reads a page at a time -- this is fine if the parquet file is a local file on disk, but it is likely quite inefficient if each page must be fetched with an individual remote object store request. This assertion needs to be researched, but if true we could make queries on remote parquet files much faster by making fewer larger requests

Describe alternatives you've considered

@Ted-Jiang added some APIs in https://github.com/apache/arrow-datafusion/pull/7570 https://github.com/apache/arrow-datafusion/blob/2b0a7db0ce64950864e07edaddfa80756fe0ffd5/datafusion/execution/src/cache/mod.rs but there aren't any default implementations in DataFusion so the metadata is read multiple times

Maybe we can add a default implementation of the caches in SessionContext with a simple policy (like LRU / some max size)

Another potential way to improve performance is to cache the decoded metadata from the Parquet footer rather than checking it once to prune files and then again to prune row groups / pages. This could be taken even farther with pruning files and row groups and pages in one go using and API like https://github.com/apache/arrow-datafusion/issues/9929

Additional context

@matthewmturner mentioned interest in improving listing table performance: https://github.com/apache/arrow-datafusion/issues/9899#issuecomment-2030139830

Note we don't use ListingTable in InfluxDB for some of the reasons described above

Related tickets:

Lordworms commented 3 months ago

Interested in this one

alamb commented 3 months ago

FYI I think this is more like an Epic that can be used to coordinate individual tasks / changes rather than a specific change itself.

Interested in this one

Thanks @Lordworms -- one thing that would probably help to start this project along would be to gather some data.

Specifically, put the LIstingTable against data on a remote object store (eg. figure out how to write a query against 100 parquet files on an S3 bucket).

And then measure how much time is spent:

  1. object store listing
  2. fetching metadata
  3. pruning / fetching IO
  4. How many object store requests are made

Does anyone know a good public data set on S3 that we could use to test / benchmark with?

Lordworms commented 3 months ago

FYI I think this is more like an Epic that can be used to coordinate individual tasks / changes rather than a specific change itself.

Interested in this one

Thanks @Lordworms -- one thing that would probably help to start this project along would be to gather some data.

Specifically, put the LIstingTable against data on a remote object store (eg. figure out how to write a query against 100 parquet files on an S3 bucket).

And then measure how much time is spent:

  1. object store listing
  2. fetching metadata
  3. pruning / fetching IO
  4. How many object store requests are made

Does anyone know a good public data set on S3 that we could use to test / benchmark with?

I got it, I'll search some data first

Lordworms commented 3 months ago

I'll start with a random dataset here https://registry.opendata.aws/

MohamedAbdeen21 commented 3 months ago

(It's not a "performance" issue, but rather for better user experience. Also, it requires upstream changes to arrow-rs/object_store.)

The recent change in #9912 uses 10 random files to infer the partition columns, this means that we may fail to catch corrupted/manually-changed partitions on table creation (shouldn't be a common case). This is because ObjectStore only provides list function to retrieve objects.

If we can provide a BFS approach to traverse the Object Store and use that in partition inference, that will be a nice QoL change.

Curious to know how we feel about upstream changes for such non-critical changes in DF?

alamb commented 3 months ago

The recent change in https://github.com/apache/arrow-datafusion/pull/9912 uses 10 random files to infer the partition columns, this means that we may fail to catch corrupted/manually-changed partitions on table creation (shouldn't be a common case). This is because ObjectStore only provides list function to retrieve objects.

I think ObjectStore only provides a LIST api as that is the functionality offered by S3, GCP, etc.

The recent change in https://github.com/apache/arrow-datafusion/pull/9912 uses 10 random files to infer the partition columns, this means that we may fail to catch corrupted/manually-changed partitions on table creation (shouldn't be a common case).

In this particular case, I think there are different needs for different users (some might want to pay the cost for a full validation, but some might be happy with just a sanity check) One approach would be to add a config option to DataFusion that controls the maxumum number of paths to check when creating a listing table 🤔

Curious to know how we feel about upstream changes for such non-critical changes in DF?

In general I think upstream changes are great to propose if they help more than just DataFusion

Lordworms commented 3 months ago

I have done some basic play with the bitcoin dataset 17347f2f94015d8396ec20a0817a6f09 and also did some profiling with instrument

FYI I think this is more like an Epic that can be used to coordinate individual tasks / changes rather than a specific change itself.

Interested in this one

Thanks @Lordworms -- one thing that would probably help to start this project along would be to gather some data.

Specifically, put the LIstingTable against data on a remote object store (eg. figure out how to write a query against 100 parquet files on an S3 bucket).

And then measure how much time is spent:

  1. object store listing

    image
  2. fetching metadata

    image
  3. pruning / fetching IO

  4. How many object store requests are made

Does anyone know a good public data set on S3 that we could use to test / benchmark with?

just want to know what is a good start to solving this issue, should I implement the cache https://github.com/apache/arrow-datafusion/blob/2b0a7db0ce64950864e07edaddfa80756fe0ffd5/datafusion/execution/src/cache/mod.rs here first?

alamb commented 3 months ago

And then measure how much time is spent:

that is very interesting

just want to know what is a good start to solving this issue, should I implement the cache

just want to know what is a good start to solving this issue, should I implement the cache https://github.com/apache/arrow-datafusion/blob/2b0a7db0ce64950864e07edaddfa80756fe0ffd5/datafusion/execution/src/cache/mod.rs here first?

If indeed most of the exection time is spent parsing (or fetching) parquet metadata, implementing a basic cache would likely help.

Also, @tustvold brought https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/parquet/trait.ParquetFileReaderFactory.html to my attention which might be able to help avoid the overhead

So what I suggest is:

  1. Do a proof of concept (POC - hack it in, don't worry about tests, etc) with your approach and see if you can show performance improvements (https://github.com/apache/arrow-datafusion/pull/9708 is an example of such a PR)
  2. If you can show it improves performance significantly, then we can work on a final design / tests / etc

The reason to do the POC first is that performance analysis is notoriously tricky at the system lavel so you want to have evidence your work will actually improve performance before you spend a bunch of time polishing up the PR (it is very demotivating, at least to me, to make a beautiful PR only to find out it doesn't really help performance)

Lordworms commented 3 months ago

And then measure how much time is spent:

that is very interesting

just want to know what is a good start to solving this issue, should I implement the cache

just want to know what is a good start to solving this issue, should I implement the cache https://github.com/apache/arrow-datafusion/blob/2b0a7db0ce64950864e07edaddfa80756fe0ffd5/datafusion/execution/src/cache/mod.rs here first?

If indeed most of the exection time is spent parsing (or fetching) parquet metadata, implementing a basic cache would likely help.

Also, @tustvold brought https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/parquet/trait.ParquetFileReaderFactory.html to my attention which might be able to help avoid the overhead

So what I suggest is:

  1. Do a proof of concept (POC - hack it in, don't worry about tests, etc) with your approach and see if you can show performance improvements (WIP: Avoid copying LogicalPlans / Exprs during OptimizerPasses #9708 is an example of such a PR)
  2. If you can show it improves performance significantly, then we can work on a final design / tests / etc

The reason to do the POC first is that performance analysis is notoriously tricky at the system lavel so you want to have evidence your work will actually improve performance before you spend a bunch of time polishing up the PR (it is very demotivating, at least to me, to make a beautiful PR only to find out it doesn't really help performance)

Got it

matthewmturner commented 3 months ago

@Lordworms if i recall correctly, the s3 list call is made on every query, and if the number of files is large this can be non-trivial, so if the listed files can be cached after creating / the first query then we could save that listing time on subsequent queries.

Lordworms commented 3 months ago

@Lordworms if i recall correctly, the s3 list call is made on every query, and if the number of files is large this can be non-trivial, so if the listed files can be cached after creating / the first query then we could save that listing time on subsequent queries.

Yeah, I got it, I am implementing it, but I am confused about the caching granularity. whether to cache the whole parquet file or a portion of the file. I may misunderstood in the beginning since I thought we should change the performance of Create statement. Since we can cache

image

or just the ObjectMeta

matthewmturner commented 3 months ago

I think both could be done (we actually do that) - but starting with the metadata to me would be a good place to start as to my understanding the builtin cache would be better suited for something like object metadata than the whole parquet files.

Lordworms commented 3 months ago

but starting with the metadata to me would be a good place to start as to my understanding the builtin cache would be better suited for something like object metadata than the whole parquet files.

Sure, I got it. Thanks for the advice

matthewmturner commented 3 months ago

One additional comment, I'm not sure how the existing cache would perform for range queries. For example we a use trie in our implementation.

Depending on your findings that could be a follow on though as there might still be enough benefit from a baseline implementation.

Lordworms commented 3 months ago

One additional comment, I'm not sure how the existing cache would perform for range queries. For example we a use trie in our implementation.

Depending on your findings that could be a follow-up on though as there might still be enough benefit from a baseline implementation.

I was testing directly using

image

and the performance has risen up a reasonable degree. I'll add LRU logic and file granularity tomorrow(since the current logic just support directory/path level)

Lordworms commented 3 months ago

One additional comment, I'm not sure how the existing cache would perform for range queries. For example we a use trie in our implementation.

Depending on your findings that could be a follow on though as there might still be enough benefit from a baseline implementation.

Yes, I think the basic implementation could not handle the both directory and file level. I may implement a trie or other BST structure

Lordworms commented 3 months ago

I have implemented a basic LRU metadata cache, and I think just caching the metadata would get slight performance improvement(we call the List_Object API just once but call the Get_Object API more than x times which x is the number of files), most of the time consumed should also be calling get_object API since we should call it for each object. So I think caching the parquet file should be a better way since most of the time consumed is calling the get_range function(which calls the GetObject API and then reads the range of the parquet file)

  1. Should we do the caching for the parquet file? I've seen the current logic of
  2. How to implement the file cache, the current logic is complicated since not just the datafusion but the arrow-rs is calling get_range for example, in arrow readMetaData, it would call the get_range API image also should we cache the whole parquet file or parts of it?
  3. which data structure should be chosen? I have tried the Sequence Trie and the LRU-DashMap.

I have been confused for the last two days and really appreciate your help @alamb @matthewmturner

matthewmturner commented 3 months ago

@Lordworms thanks for the work on this.

Just to confirm - what was the improvement in milliseconds we saw from the object meta cache? For context, in low latency use cases (we have that requirement) even shaving off a few ms can be important so would still want to get that improvement in.

IMO it would be good to close the object meta cache topic first before moving on to file cache - unless there is some connection there that I am missing where we need to consider them together.

Lordworms commented 3 months ago

@Lordworms thanks for the work on this.

Just to confirm - what was the improvement in milliseconds we saw from the object meta cache? For context, in low latency use cases (we have that requirement) even shaving off a few ms can be important so would still want to get that improvement in.

IMO it would be good to close the object meta cache topic first before moving on to file cache - unless there is some connection there that I am missing where we need to consider them together.

Sure, from left to right: unchanged, SequenceTrie cached, ThreadSafeLRU DashMap cached c55455d0e3b0fd70096706ad692cc162

matthewmturner commented 3 months ago

Thats interesting that dashmap performed better. Would you mind also doing a query with a filter / pruning involved and comparing the results (perhaps with a range scan as well)? And just to confirm, the cache is filled at creation time during schema inference, right?

Also is the cache used by both ListingTable and ParquetExec? Im not sure how it works if adding the cache removes the duplicate calls or if additional work is required for that (if it's additional work could probably be done as a separate PR once cache implementation is finalized).

I think it would also be nice to have a new benchmark for this to help monitor moving forward.

Lordworms commented 3 months ago

Thats interesting that dashmap performed better. Would you mind also doing a query with a filter / pruning involved and comparing the results (perhaps with a range scan as well)? And just to confirm, the cache is filled at creation time during schema inference, right? Also is the cache used by both ListingTable and ParquetExec? Im not sure how it works if adding the cache removes

currently used by ListingTable

the duplicate calls or if additional work is required for that (if it's additional work could probably be done as a separate PR once cache implementation is finalized).

Yes, could you give me some review advice when you are available to see if I implemented it right? The closed PR is SequenceTrie and the draft PR is DashMap, Really appreciate your help.

I think it would also be nice to have a new benchmark for this to help monitor moving forward.

I want to do it, too, but I could not find a suitable open-source dataset for it. Currently, I am using bitcoin.

matthewmturner commented 2 months ago

@Lordworms apologies it took me longer than expected to get some free time. I plan to review between tonight and tomorrow.

matthewmturner commented 2 months ago

@Lordworms did you get the chance to compare querying with a filter / pruning involved (ideally with a range) between dashmap and sequence trie? Not sure if the dataset is conducive to that though.

Lordworms commented 2 months ago

@Lordworms did you get the chance to compare querying with a filter / pruning involved (ideally with a range) between dashmap and sequence trie? Not sure if the dataset is conducive to that though.

I haven't added filter test cases yet, but I can do that in the following two days, I'll give you an update then.

alamb commented 2 months ago

I was playing around today and wanted to post an example that might help people

I found a parquet formatted wikipedia dataset: https://huggingface.co/datasets/wikimedia/wikipedia

The english subset is here: https://huggingface.co/datasets/wikimedia/wikipedia/tree/main/20231101.en

Here is how to create an external table that reads the first file:

> create external table wikipedia stored as parquet location 'https://huggingface.co/datasets/wikimedia/wikipedia/resolve/main/20231101.en/train-00000-of-00041.parquet';

> describe wikipedia;
+-------------+-----------+-------------+
| column_name | data_type | is_nullable |
+-------------+-----------+-------------+
| id          | Utf8      | YES         |
| url         | Utf8      | YES         |
| title       | Utf8      | YES         |
| text        | Utf8      | YES         |
+-------------+-----------+-------------+
4 row(s) fetched.
Elapsed 0.001 seconds.

> select id, url, title from wikipedia limit 10;
+-----+----------------------------------------------------------------------------------+------------------------------------------+
| id  | url                                                                              | title                                    |
+-----+----------------------------------------------------------------------------------+------------------------------------------+
| 12  | https://en.wikipedia.org/wiki/Anarchism                                          | Anarchism                                |
| 39  | https://en.wikipedia.org/wiki/Albedo                                             | Albedo                                   |
| 290 | https://en.wikipedia.org/wiki/A                                                  | A                                        |
| 303 | https://en.wikipedia.org/wiki/Alabama                                            | Alabama                                  |
| 305 | https://en.wikipedia.org/wiki/Achilles                                           | Achilles                                 |
| 307 | https://en.wikipedia.org/wiki/Abraham%20Lincoln                                  | Abraham Lincoln                          |
| 308 | https://en.wikipedia.org/wiki/Aristotle                                          | Aristotle                                |
| 309 | https://en.wikipedia.org/wiki/An%20American%20in%20Paris                         | An American in Paris                     |
| 316 | https://en.wikipedia.org/wiki/Academy%20Award%20for%20Best%20Production%20Design | Academy Award for Best Production Design |
| 324 | https://en.wikipedia.org/wiki/Academy%20Awards                                   | Academy Awards                           |
+-----+----------------------------------------------------------------------------------+------------------------------------------+
10 row(s) fetched.
Elapsed 1.143 seconds.

> select avg(length(title)) from wikipedia;
+----------------------------------------+
| AVG(character_length(wikipedia.title)) |
+----------------------------------------+
| 18.739130712974042                     |
+----------------------------------------+
1 row(s) fetched.
Elapsed 20.559 seconds.
Lordworms commented 2 months ago

Sorry for the late update, I was traveling last week, I'll test it today and give you feedback