dask / dask-expr

BSD 3-Clause "New" or "Revised" License
79 stars 18 forks source link

Reading a list of S3 parquet files with query planning enabled is ~25x slower #1061

Open b-phi opened 1 month ago

b-phi commented 1 month ago

Was struggling to understand why creating a dask dataframe from a large list of parquet files was taking ages. Eventually tried disabling query planning and saw normal timing again. These are all relatively small S3 files ~1MB. There is no metadata file or similar.

Screenshot 2024-05-10 at 3 21 23 PM

Environment:

phofl commented 1 month ago

Hi, thanks for your report. We will look into this

phofl commented 1 month ago

cc @fjetter the checksum calculation causes the slowdown here. Looks like the info isn't cached for lists of paths.

files = [f"s3://coiled-data/uber/part.{i}.parquet" for i in range(300)]
dd.read_parquet(files)

99% of the runtime is the checksum calculation, just passing the path takes 1 seconds instead of 13

fjetter commented 1 month ago

Well, this is not strictly about caching. The legacy reader did indeed not calculate a checksum. The checksum is currently used to cache division statistics per dataset. If we drop that cache (i.e. we'd have to re-calculate the divisions for every distinct read_parquet. Right now, multiple read_parquet calls on the same unaltered dataset would re-use the cache) we could skip the checksum.

As I said, the problem is not that we don't cache the checksum calculation but that to calculate the checksum we have to perform N requests to S3. If one provides simply a prefix, we only have to perform a single request.

I was already considering dropping this API entirely. Accepting a list of files introduces this and other weird artifacts. @b-phi can you tell us a bit more about why you are choosing this approach instead of simply providing a common prefix?

b-phi commented 1 month ago

Hey @fjetter, happy to provide more details. A simplified view of how our data is laid out in S3 looks like this. Where multiple files within an S3 "folder" can indicate either multiple versions, or a large file that was split into multiple smaller chunks. We have an internal application that translates user queries into a list of files to load, for example "give me all symbol=FOO files" might return [file_3, file_5] and "give me all files as of this time" might return [file_1, file_7]. These types of load operations cannot be fit into a prefix-only structure.

bucket/dataset_1/date=2020-01-01/symbol=FOO/file_1.parquet
                                        .../file_2.parquet
                                        .../file_3.parquet
bucket/dataset_1/date=2020-01-01/symbol=BAR/file_4.parquet
bucket/dataset_1/date=2020-01-02/symbol=FOO/file_5.parquet
bucket/dataset_1/date=2020-01-02/symbol=BAR/file_6.parquet
                                        .../file_7.parquet
                                        .../file_8.parquet

I'm a big fan of the recent work to add query planning to dask. While I can appreciate that supporting list inputs introduces some complexity here, loading a provided list of files in parallel seems to me to be one of the fundamental use cases of distributed dataframes.

For my knowledge, is it possible to shortly summarize the difference between dd.read_parquet([...]) and dd.from_delayed([delayed(pd.read_parquet)(file) for file in ...], meta=...)?

fjetter commented 1 month ago

For my knowledge, is it possible to shortly summarize the difference between dd.read_parquet([...]) and dd.from_delayed([delayed(pd.read_parquet)(file) for file in ...], meta=...)?

Well, I'm trying to stay very short

These types of load operations cannot be fit into a prefix-only structure.

Dask should be able to handle the date and symbol stuff you are posting here. This is called "hive like" partitioning. Try something like dd.read_parquet("bucket/dataset_1", filters=[[("date", ">", "2020-01-01"), ("symbol", "=", "BAR")]]) If there are no bugs this should filter the files and only select those which match the provided filters. In fact, with the optimizer, this should just look like

ddf = read_parquet("bucket/dataset_1")
ddf = ddf[ddf.data > datetime.date(2020, 1, 1)]  # I'm actually not sure which type the value has to be
ddf = ddf[ddf.symbol == "BAR"]

and the optimizer should rewrite it such that the filters are provided to the parquet layer automatically.

b-phi commented 1 month ago

Dask should be able to handle the date and symbol stuff you are posting here. This is called "hive like" partitioning. Try something like dd.read_parquet("bucket/dataset_1", filters=[[("date", ">", "2020-01-01"), ("symbol", "=", "BAR")]])

Yes understood, and if our use case was limited to filtering on the hive partitions that would cover it. However there's additional metadata that we often need to filter on that isn't represented in the S3 folder structure. This use case for example "give me all files as of this time" would refer to file creation time as stored in the internal application, rather than a hive partition filter we could create.

Another issue is that while most python libraries with similar functionality generally support accepting a list of parquet files as input (arrow, ray) there isn't a standard way of filtering partitions and file paths. Ray for example seems to have a callable based approach (disclaimer I haven't used this personally). As a result, if we need to support multiple tools, I'd much rather filter for partitions myself and pass the resulting files to different libraries rather than translate a given filter into each libraries preferred partition filtering approach.

b-phi commented 1 month ago

I will take a look at from_map, we had been using from_delayed for some use cases but that seems like it may be faster. Thanks!

fjetter commented 1 month ago

I empathize with your situation. I know that other libraries are offering this kind of interface but there is different context. Most importantly, we are running an optimizer on this and have stricter requirements for the input than other libraries might have.

A solution to this might be for us to implement a simpler reader for this kind API request that supports a smaller feature set that is essentially a from_map behind the scenes without further optimization. Supporting all those different inputs is what made the initial implementation unmaintainable and I don't want to go down that path again.

As a result, if we need to support multiple tools, I'd much rather filter for partitions myself and pass the resulting files to different libraries rather than translate a given filter into each libraries preferred partition filtering approach.

This is certainly not always an option but you may want to consider writing this information into the file itself. If the file has a single value in a column, the parquet file compresses this exceptionally well. The parquet metadata is then sufficient to decide on whether this file has to be loaded and you would never have to read the column back in.

b-phi commented 1 month ago

In our case, we could easily have 5-10 million files under bucket/dataset_1/.... We're able to find the files to load in <1s for most queries. Could we expect similar performance from this approach?

dd.read_parquet("bucket/dataset_1", filters=[[("date", ">", "2020-01-01"), ("symbol", "=", "BAR")]])
b-phi commented 1 month ago

In our case, we could easily have 5-10 million files under bucket/dataset_1/.... We're able to find the files to load in <1s for most queries. Could we expect similar performance from this approach?

dd.read_parquet("bucket/dataset_1", filters=[[("date", ">", "2020-01-01"), ("symbol", "=", "BAR")]])

I'll do some performance testing with the expression filters, but it sounds like the overall takeaway is to use dd.from_map rather than dd.from_parquet if a use case requires providing an explicit list of parquet files.

fjetter commented 1 month ago

We're able to find the files to load in <1s for most queries. Could we expect similar performance from this approach?

probably not but I don't know for sure