pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
28.97k stars 1.83k forks source link

Hive partitioning tracking issue #15441

Open stinodego opened 5 months ago

stinodego commented 5 months ago
deanm0000 commented 5 months ago

If I may, here's another one https://github.com/pola-rs/polars/issues/14936

kszlim commented 5 months ago

I see you just merged the ability to specify a hive partition schema manually, does it allow for partial inference? Ie. if you have multiple keys that are partitioned against but you specify only a subset of them, will it infer the rest?

ion-elgreco commented 5 months ago

@stinodego why not extend the schema to the full table instead of just the partition columns?

stinodego commented 5 months ago

I see you just merged the ability to specify a hive partition schema manually, does it allow for partial inference?

At this point it does not. You have to specify the full schema of the Hive partitions. Similar to other schema arguments in the API. I can see how a schema_overrides type of parameter would be useful though. Not sure if they should be combined, will have to think about it.

@stinodego why not extend the schema to the full table instead of just the partition columns?

At least in the case of Parquet, that part of the schema is already available from the data. Not sure a full schema/schema_overrides would provide much benefit over simply casting after scanning.

ion-elgreco commented 5 months ago

@stinodego it is part of the parquet, but in situations with schema evolution, Polars would not be able to handle those situations. Also if I know the schema ahead, you can esentially skip reading the parquet metadata

stinodego commented 5 months ago

in situations with schema evolution, Polars would not be able to handle those situations

Can you give an example?

you can esentially skip reading the parquet metadata

I don't know, there's other stuff in the metadata besides the schema. Not sure yet exactly what we're actually using.

ion-elgreco commented 5 months ago

in situations with schema evolution, Polars would not be able to handle those situations

Can you give an example? Sure, take these two parquet files that we have written:

df = pl.DataFrame({
    "foo": [1],
    "bar": [2],
}).write_parquet("polars_parquet/test1.parquet")

df = pl.DataFrame({
    "foo": [2],
    "bar": [3],
    "baz": ["hello world"]
}).write_parquet("polars_parquet/test2.parquet")

When you read with Polars, it incorrectly assumes that the first parquet is the schema for all parquets in the table. So when you read you get only foo, bar:

pl.read_parquet("polars_parquet/*.parquet")
shape: (2, 2)
┌─────┬─────┐
│ foo ┆ bar │
│ --- ┆ --- │
│ i64 ┆ i64 │
╞═════╪═════╡
│ 1   ┆ 2   │
│ 2   ┆ 3   │
└─────┴─────┘

Now let's write in the other order, and polars will panick because it cannot handle that a column is missing in a parquet file. See this issue I made a while ago https://github.com/pola-rs/polars/issues/14980:

df = pl.DataFrame({
    "foo": [2],
    "bar": [3],
    "baz": ["hello world"]
}).write_parquet("polars_parquet/test1.parquet")

df = pl.DataFrame({
    "foo": [1],
    "bar": [2],
}).write_parquet("polars_parquet/test2.parquet")

pl.read_parquet("polars_parquet/*.parquet")

thread '<unnamed>' panicked at /home/runner/work/polars/polars/crates/polars-parquet/src/arrow/read/deserialize/mod.rs:144:31:
called `Option::unwrap()` on a `None` value
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

It's a common use case to evolve parquet tables without having to rewrite all the older files to conform to this new schema

ion-elgreco commented 5 months ago

Having something akin to Pyarrow datasets: https://github.com/pola-rs/polars/issues/13086, would make lot's of sense

stinodego commented 5 months ago

Ok, I see what you mean. We should support this.

kszlim commented 4 months ago

This might be interesting inspiration/source of ideas for a dataset abstraction in polars: https://padawan.readthedocs.io/en/latest/

jrothbaum commented 4 months ago

Any chance you would reconsider this as part of the reworking of hive partition handling? https://github.com/pola-rs/polars/issues/12041

deanm0000 commented 4 months ago

Here's another one https://github.com/pola-rs/polars/issues/15586. It's to change the default for write_statistics to True, nothing complicated.

deanm0000 commented 4 months ago

Here are a couple more.

Can't forget to document at the end.

This one might be a bit of a tangent but it's to incorporate the pageindex spec of parquet files https://github.com/pola-rs/polars/issues/12752

Smotrov commented 4 months ago

As I understand adding partitioned fields to the schema supposed to enable hive partitions support. However in my case it shows an error instead

const TEST_S3: &str = "s3://my_bucket/data_lake/some_dir/partitioned_table_root_dir/*";

 let mut schema = Schema::new();
    schema.with_column("year".into(), DataType::Int8);
    schema.with_column("month".into(), DataType::Int8);

    let schema = Arc::new(schema);

    let cloud_options = cloud::CloudOptions::default().with_aws([
        (Key::AccessKeyId, &cred.access_key.unwrap()),
        (Key::SecretAccessKey, &cred.secret_key.unwrap()),
        (Key::Region, &"eu-west-1".into()),
    ]);

    let mut args = ScanArgsParquet::default();
    args.hive_options.enabled = true;
    args.hive_options.schema = Some(schema);
    args.cloud_options = Some(cloud_options);

    // Check time required to read the data.
    let start = std::time::Instant::now();

    let df = LazyFrame::scan_parquet(TEST_S3, args)?
        .with_streaming(true)
        .collect()?;

the result is Error: Context { error: ComputeError(ErrString("Object at location data_lake/some_dir/partitioned_table_root_dir not found: Client error with status 404 Not Found: No Body")), msg: ErrString("'parquet scan' failed") }

lmocsi commented 3 months ago

Enhancement request for "Support directory input": https://github.com/pola-rs/polars/issues/14342

Smotrov commented 3 months ago

Enhancement request for "Support directory input": #14342

Thank you. To be honest I'm quite surprised. How anyone can use this tool in any serious work without ability to load data from a directory. All tables are partitioned multi file. 👀

stinodego commented 3 months ago

You can already achieve this by appending **/*.parquet to your directory, which will read all parquet files in that directory.

Directory support will function slightly differently, as it will do some additional validation, but it's mostly the same.

lmocsi commented 3 months ago

Yes, it is described in the referenced enhancement request (the /*/.parquet part).

Smotrov commented 3 months ago

You can already achieve this by appending **/*.parquet to your directory, which will read all parquet files in that directory.

Thank you but my parquet tiles do not have any extensions. And adding /**/* does not help. It shows following error

const TEST_S3: &str = "s3://my_bucket/data_lake/some_dir/partitioned_table_root_dir/**/*
Error: Context { error: ComputeError(ErrString("Object at location partitioned_table_root_dir/year=2024/month=5 not found: Client error with status 404 Not Found: No Body")), msg: ErrString("'parquet scan' failed") }

Meanwhile if I manually set some specific combination of my partition values it works.

const TEST_S3: &str = "s3://my_bucket/data_lake/some_dir/partitioned_table_root_dir/year=2024/month=5/*";

But is believe manually adding values it is not how HIVE partitioning supposed to work? Or I'm doing something wrong?

If I'm adding extensions to all files the **/*.parquet trick works well.

Smotrov commented 3 months ago

Would be grate to add Support Hive partitioning logic in other readers besides Parquet to JSON

stinodego commented 3 months ago

Would be grate to add Support Hive partitioning logic in other readers besides Parquet to JSON

It's on the list!

talawahtech commented 3 months ago

Tossing in a suggestion to also support reading/writing Pyarrow/Spark compatible parquet _metadata files. See #7707

deanm0000 commented 2 months ago

https://github.com/pola-rs/polars/issues/15823 probably belongs here.

couling commented 2 months ago

@stinodego, Regarding this comment

Ok, I see what you mean. We should support this.

Is there a github issue tracking this? It's not noted in the issue checklist here and, as far as I can see, the trail goes cold with the comment and #15508.

For us, the lack of ability to explicitly set schemas for the table has prevented us using scan_parquet. We are forced to go via scan_pyarrow_dataset instead, which is suboptimal and messy code.

ritchie46 commented 2 months ago

Others to track:

danielgafni commented 1 month ago

Not sure if this is the correct place to write this, but...

For the native partitioned Parquet reader, would it be possible to support loading unions of columns from different partitions when they contain different sets of columns? This would correspond to "diagonal" concat.

For example, when working with limit order book data, daily partitions of orderbook levels have varying amount of columns.

The pyarrow reader silently drops colums which are not present in all partitions at the same time.

I wonder if it would be possible to surface concatenation option to the top-level API in the native polars reader?