pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
26.9k stars 1.65k forks source link

Problem with `concat` and hive partitioned data in combination with in-memory data frames #16285

Open ericdoerheit opened 2 weeks ago

ericdoerheit commented 2 weeks ago

Checks

Reproducible example

import os

import numpy as np
import polars as pl

def main():
    os.makedirs("tmp/year=2024/month=5", exist_ok=True)

    df1 = pl.DataFrame({
        "a": [1, 2, 3],
        "b": ["d", "e", "f"],
        "c": [1.1, 1.2, 1.3],
        "year": np.array([2024, 2024, 2024], dtype=np.int64),
        "month": np.array([5, 5, 5], dtype=np.int64),
    }).lazy()

    pl.DataFrame({"a": [4, 5, 6], "b": ["d", "e", "f"], "c": [2.1, 2.2, 2.3], }) \
        .write_parquet("tmp/year=2024/month=5/df.parquet")

    df2 = pl.scan_parquet(f'tmp/*/*/*.parquet', hive_partitioning=True)

    df = pl.concat([df1, df2])

    result = df.filter(pl.col("a") >= 4) \
        .select([pl.col("c").mean().alias("mean_c")]).collect()

    print("Test Result:", result)

if __name__ == "__main__":
    main()

Log output

UNION: union is run in parallel
parquet file must be read, statistics not sufficient for predicate.
Traceback (most recent call last):
  File "C:\Users\ericd\dataspree\Projects\dataspree-insights\scripts\test_concat_hot_and_cold_data.py", line 32, in <module>
    main()
  File "C:\Users\ericd\dataspree\Projects\dataspree-insights\scripts\test_concat_hot_and_cold_data.py", line 26, in main
    .select([pl.col("c").mean().alias("mean_c")]).collect()
  File "C:\Users\ericd\AppData\Roaming\Python\Python310\site-packages\polars\lazyframe\frame.py", line 1816, in collect
    return wrap_df(ldf.collect(callback))
polars.exceptions.ShapeError: unable to append to a DataFrame of width 2 with a DataFrame of width 4

Issue description

There is an issue when trying to concat lazy frames read from hive partitioned data with data that rests in-memory. This results in a ShapeError. When we ignore the hive partitioning (hive_partitioning=False), the example above works. Another workaround is to deactivate projection pushdown (projection_pushdown=False).

For reference, this is how the data frames look:

df1 shape: (3, 5)
┌─────┬─────┬─────┬──────┬───────┐
│ a   ┆ b   ┆ c   ┆ year ┆ month │
│ --- ┆ --- ┆ --- ┆ ---  ┆ ---   │
│ i64 ┆ str ┆ f64 ┆ i64  ┆ i64   │
╞═════╪═════╪═════╪══════╪═══════╡
│ 1   ┆ d   ┆ 1.1 ┆ 2024 ┆ 5     │
│ 2   ┆ e   ┆ 1.2 ┆ 2024 ┆ 5     │
│ 3   ┆ f   ┆ 1.3 ┆ 2024 ┆ 5     │
└─────┴─────┴─────┴──────┴───────┘
df2 shape: (3, 5)
┌─────┬─────┬─────┬──────┬───────┐
│ a   ┆ b   ┆ c   ┆ year ┆ month │
│ --- ┆ --- ┆ --- ┆ ---  ┆ ---   │
│ i64 ┆ str ┆ f64 ┆ i64  ┆ i64   │
╞═════╪═════╪═════╪══════╪═══════╡
│ 4   ┆ d   ┆ 2.1 ┆ 2024 ┆ 5     │
│ 5   ┆ e   ┆ 2.2 ┆ 2024 ┆ 5     │
│ 6   ┆ f   ┆ 2.3 ┆ 2024 ┆ 5     │
└─────┴─────┴─────┴──────┴───────┘

I found the following issues which might be related:

I created a similar test in Rust and it yields the same result:

use std::fs;
use polars::io::HiveOptions;
use polars::prelude::*;

const TEST_DIR: &str = "tmp/integration_tests/polars";
const YEAR: i32 = 2024;
const MONTH: i8 = 5;

/// Function that writes example data to a parquet file with hive partitioning.
fn write_example_data() {
    // Write a DataFrame to a parquet file with hive partitioning
    let mut data_frame = df! {
        "a" => [1, 2, 3],
        "b" => ["a", "b", "c"],
        "c" => [1.1, 1.2, 1.3],
    }.unwrap();

    let out_dir = format!("{}/year={}/month={}", TEST_DIR, YEAR, MONTH);
    match fs::create_dir_all(out_dir.clone()) {
        Ok(_) => println!("Successfully created directory: {}", out_dir.clone()),
        Err(e) => println!("Error creating directory: {}", e),
    }

    let file_path = format!("{}/data.parquet", out_dir);
    let file = fs::File::create(&file_path).expect("Failed to create file");
    let writer = ParquetWriter::new(file);
    writer.finish(&mut data_frame).expect("Failed to write data to file");
}

/// Function that scans the saved data from the parquet file.
fn get_saved_data() -> LazyFrame {
    let mut hive_options = HiveOptions::default();

    // Define the fields of the hive schema
    let fields = vec![
        Field::new("year", DataType::Int32),
        Field::new("month", DataType::Int8),
    ];

    // Create the schema
    let schema = Schema::from_iter(fields.iter().map(|f| f.clone()));

    // Create a reference to the schema
    let schema_ref = SchemaRef::new(schema);

    hive_options.schema = Some(schema_ref);
    hive_options.enabled = true;

    let mut scan_args = ScanArgsParquet::default();
    scan_args.hive_options = hive_options;

    // Use the scan parquet function to scan all parquet files in the directory
    let schema_path = format!("{}/*/*/*.parquet", TEST_DIR);

    let lazy_frame = LazyFrame::scan_parquet(schema_path, scan_args.clone()).unwrap();

    // let schema_path = format!("{}/year=2024/month=5/*.parquet", TEST_DIR);
    // let lazy_frame = LazyFrame::scan_parquet(schema_path, ScanArgsParquet::default()).unwrap();

    println!("In storage: {:?}", lazy_frame.schema());

    lazy_frame
}

/// Test that concatenates a saved DataFrame with an in-memory DataFrame and executes a query.
#[test]
fn test_polars_concat() {
    write_example_data();

    let df_stored = get_saved_data();
    let df_in_memory = df! {
        "a" => [4, 5, 6],
        "b" => ["d", "e", "f"],
        "c" => [2.1, 2.2, 2.3],
        "year" => [2024, 2024, 2024],
        "month" => [5, 5, 5],
    }.unwrap().lazy();

    println!("In memory: {:?}", df_stored.schema());

    let args = UnionArgs {
        parallel: false,
        rechunk: false,
        to_supertypes: false,
    };
    let df = concat(&[df_stored, df_in_memory], args).unwrap();

    let result = df.filter(col("a").gt_eq(lit(4)))
        .select([col("c").mean().alias("mean_c")]).collect();

    println!("{:?}", result.unwrap());
}

Log output:

called `Result::unwrap()` on an `Err` value: ShapeMismatch(ErrString("unable to append to a DataFrame of width 4 with a DataFrame of width 2"))
thread 'test_polars_concat' panicked at tests\polars_concat_in_memory_with_saved_data.rs:91:29:
called `Result::unwrap()` on an `Err` value: ShapeMismatch(ErrString("unable to append to a DataFrame of width 4 with a DataFrame of width 2"))
stack backtrace:
   0: std::panicking::begin_panic_handler
             at /rustc/25ef9e3d85d934b27d9dada2f9dd52b1dc63bb04/library\std\src\panicking.rs:647
   1: core::panicking::panic_fmt
             at /rustc/25ef9e3d85d934b27d9dada2f9dd52b1dc63bb04/library\core\src\panicking.rs:72
   2: core::result::unwrap_failed
             at /rustc/25ef9e3d85d934b27d9dada2f9dd52b1dc63bb04/library\core\src\result.rs:1649
   3: enum2$<core::result::Result<polars_core::frame::DataFrame,enum2$<polars_error::PolarsError> > >::unwrap
             at /rustc/25ef9e3d85d934b27d9dada2f9dd52b1dc63bb04\library\core\src\result.rs:1073
   4: polars_concat_in_memory_with_saved_data::test_polars_concat
             at .\tests\polars_concat_in_memory_with_saved_data.rs:91
   5: polars_concat_in_memory_with_saved_data::test_polars_concat::closure$0
             at .\tests\polars_concat_in_memory_with_saved_data.rs:67
   6: core::ops::function::FnOnce::call_once<polars_concat_in_memory_with_saved_data::test_polars_concat::closure_env$0,tuple$<> >
             at /rustc/25ef9e3d85d934b27d9dada2f9dd52b1dc63bb04\library\core\src\ops\function.rs:250
   7: core::ops::function::FnOnce::call_once
             at /rustc/25ef9e3d85d934b27d9dada2f9dd52b1dc63bb04/library\core\src\ops\function.rs:250
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

Expected behavior

I would expect that the concat function yields the same result whether or not we have hive partitioning or projection pushdown activated.

Installed versions

``` --------Version info--------- Polars: 0.20.26 Index type: UInt32 Platform: Windows-10-10.0.22631-SP0 Python: 3.10.5 (tags/v3.10.5:f377153, Jun 6 2022, 16:14:13) [MSC v.1929 64 bit (AMD64)] ----Optional dependencies---- adbc_driver_manager: cloudpickle: connectorx: deltalake: fastexcel: fsspec: gevent: hvplot: matplotlib: nest_asyncio: numpy: 1.26.4 openpyxl: pandas: pyarrow: 16.0.0 pydantic: pyiceberg: pyxlsb: sqlalchemy: torch: 1.12.1+cpu xlsx2csv: xlsxwriter: ```