pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.54k stars 1.98k forks source link

Add optional schema to scan_parquet #15111

Open ion-elgreco opened 8 months ago

ion-elgreco commented 8 months ago

Description

If a schema is passed to scan_parquetit should be used to coerce every read parquet file into this schema. This also means Polars does not to fetch the metadata of the first parquet.

There are many cases where you know what the schema will be, such as using (Delta, Iceberg, Hudi). If the API can be made more flexible here it opens the door for fully native readers for these lakehouse formats, which is widely used.

deanm0000 commented 8 months ago

I like this or even just an option to default to the pl.concat(..., how='diagonal_relaxed') behavior and I think similar requests have come up in the past (although I can't find them at the moment) but I think there's been pushback from the core team in that they want the strictness.

A work around to get part of the behavior is

df=pl.scan_parquet(...)
your_schema = {} #actual schema here, of course
import json
files = json.loads(df.serialize())['Scan']['paths']
df=pl.concat([
    pl.scan_parquet(x)
    .select([pl.col(x).cast(y) for x,y in your_schema.items()])
    for x in files
])

Of course this workaround goes the opposite direction with respect to avoiding scanning the first file and instead scans all the files' metadata. It's awkward that serialize doesn't have an option to output a dict so we have to parse the json but oh well.

ion-elgreco commented 8 months ago

@deanm0000 I don't really get the pushback from the core team on this. This strictness shouldn't be there because if you are passing a schema, you are doing this willingly and know that it will fit. Datafusion and Pyarrow have no issue handling this type of behavior.

It's a workaround that unfortunately ruins performance. I've done it here as well: https://github.com/ion-elgreco/polars-deltalake/blob/d9fcb4d9d7337bd163ce3ee344225516e53da4da/python/src/lib.rs#L126

It should be a single scan for the optimizer to properly work.

ion-elgreco commented 8 months ago

@deanm0000 Actually a diagonal concat wouldn't be enough. Take this example:

I have two parquets, with the columns ["foo", "bar"]. I pass a schema of ["foo","bar","baz"]. Simply reading and diagonally concatenating means it will ignore the column "baz". This column should get a null array of the dtype since I provided it in the schema