pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.88k stars 1.92k forks source link

Avoid reading partitioned files if all information is present in Hive partition paths #14936

Open deanm0000 opened 7 months ago

deanm0000 commented 7 months ago

Description

This is easy to see on the cloud but here's one way to see it locally in a MRE

Setup:

import os
os.environ['POLARS_VERBOSE']='1'
import polars as pl
from pathlib import Path
for i in range(10):
    Path(f"./test/folder={i}").mkdir(exist_ok=True, parents=True)
    pl.select(a=10).write_parquet(f"./test/folder={i}/0000.parquet")

df=pl.scan_parquet("./test/*/*.parquet")

Delete one of the files and then do filtered collect

Path("./test/folder=3/0000.parquet").unlink()
Path("./test/folder=3").rmdir()

df.filter(pl.col('folder')==1).collect()
parquet file can be skipped, the statistics were sufficient to apply the predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file can be skipped, the statistics were sufficient to apply the predicate.
parquet file can be skipped, the statistics were sufficient to apply the predicate.
parquet file can be skipped, the statistics were sufficient to apply the predicate.
parquet file can be skipped, the statistics were sufficient to apply the predicate.
parquet file can be skipped, the statistics were sufficient to apply the predicate.
parquet file can be skipped, the statistics were sufficient to apply the predicate.
parquet file can be skipped, the statistics were sufficient to apply the predicate.
parquet file can be skipped, the statistics were sufficient to apply the predicate.
hive partitioning: skipped 9 files, first file : test/folder=0/0000.parquet

We can see that the filtered collect works and it doesn't mind that one of the files is missing because it skipped it

Now let's try to see all the unique values of folder. Of course, there is no data about folder in any of the files, it only ever existed as part of the directory paths.

df.select(pl.col("folder").unique()).collect()
# FileNotFoundError: No such file or directory (os error 2)

Even if we don't delete the file here, there are no verbose messages indicating if it's reading any/all files. The reason I deleted a file before running this is to demonstrate that it tries to read (at least one) of the files. My point isn't to say we should be able to read from hives from which we deleted files. It was just the only simple way I could think of to show its attempting to read the files when it shouldn't need to. When you're doing this with a big cloud dataset then it becomes incredibly apparent that it is scanning all the files when it doesn't need to.

The request here is to make it not scan any of the files when it has all the info it needs from just the directory names.

stinodego commented 6 months ago

I'm having trouble understanding the issue here. Is there a bug? Is it a performance issue?

deanm0000 commented 6 months ago

@stinodego Sorry I jumped right into the weeds of proving the problem and so I didn't do a good job summarizing the problem.

The problem is that if we do a query where all the responsive info is in the filepaths, it will read every file when it shouldn't read any of them.

Suppose we have this directory structure

mydataset/fruit=apple/0000.parquet mydataset/fruit=banana/0000.parquet

and then we do

df=pl.scan_parquet("mydataset/**/*.parquet")

The initial scan is going to get the list of files and directories immediately so that if we do

df.select(pl.col('fruit').unique()).collect()

then it should be able to give us ['apple', 'banana'] without reading any files. In fact, reading the files provides no help to completing that query because the column doesn't even exist in the files.

Despite there being no relevant data in any of the files, it reads all the files.

For any small/simple MRE you wouldn't notice that it's doing this which is why I have my first example deletes a file so that you can see that it is really trying to read it. For big data, especially on the cloud where we're subject to internet speeds and read charges, that is the difference between a free instant answer and one where we download GBs (or maybe TBs) of data from the cloud for no reason.

I don't know if this is a bug per se but it is definitely a performance issue.

To extend this a bit.

Even if I did something like

df.select(pl.col('fruit')).collect() # not with unique

then it should still only read the meta data of the files to get the number of rows rather than reading any of the underlying data.

deanm0000 commented 6 months ago

@stinodego When the optimizer is looking at stats to determine what it needs to do, can it do a check similar to this pseudo-code?

if stats.min_value==stats.max_value & stats.null_count==0:
    return [stats.min_value] * stats.num_rows
else:
    return read_physical_source()
stinodego commented 6 months ago

Ok, I see what you're saying. So we should push down predicates somehow and determine whether all information is already present in the paths themselves rather than the files.

So this is a performance issue. It would definitely be great if we could implement this.

deanm0000 commented 6 months ago

I don't think it is a predicate pushdown but a projection issue, isn't it? That is to say, it's not filtering anything.

When it does the initial scan of a hive path, it parses the directories and create statistics max=min=[value] for all the hive directories. If those stats are available to the projection optimizer then generically it seems it should be able to check if max==min and null_count=0 then skip reading anything and just project that value. I would think that check would work even beyond hive partitions and would work for any highly repeated data. Am I off base or could that be a way to implement it?

stinodego commented 6 months ago

I meant to refer to projection pushdown indeed.

deanm0000 commented 3 months ago

@nameexhaustion care to take a look at this one? I ask because I see you crushing other hive related issues and there might be some economies of scale.

nameexhaustion commented 3 months ago

I don't think we can do this, we need to know how many rows the files have in order to project the correct number of rows, even if the projection is hive only.