pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.52k stars 1.98k forks source link

Parquet reader fails when file has less columns than reader_schema #14980

Closed ion-elgreco closed 1 month ago

ion-elgreco commented 8 months ago

Checks

Reproducible example

Create 2 parquet files, one file having: {"foo":Utf8"}, the other file having {"foo":Utf8", "bar": "int64"}

    let mut file_info = FileInfo::new(
        polars_schema.clone().into(),
        Some(polars_schema.to_arrow(true).into()),
        (None, 10),
    );

    if hive_partitioning {
        file_info
            .init_hive_partitions(file_paths[0].as_path())
            .map_err(|err| DeltaTableError::Generic(err.to_string()))
            .map_err(PythonError::from)?;
    }
    let options = FileScanOptions {
        with_columns: None,
        cache: true,
        n_rows: None,
        rechunk: false,
        row_index: None,
        file_counter: Default::default(),
        hive_partitioning: hive_partitioning,
    };

    let scan_type = FileScan::Parquet {
        options: ParquetOptions {
            parallel: ParallelStrategy::Auto,
            low_memory,
            use_statistics,
        },
        cloud_options: None,
        metadata: None,
    };

    let plan = LogicalPlan::Scan {
        paths: file_paths.into(),
        file_info,
        file_options: options,
        predicate: None,
        scan_type: scan_type,
    };
    let frame: LazyFrame = plan.into();
    Ok(PyLazyFrame(frame))

Log output

thread '<unnamed>' panicked at /home/runner/work/polars/polars/crates/polars-parquet/src/arrow/read/deserialize/mod.rs:144:31:
called `Option::unwrap()` on a `None` value
stack backtrace:
   0:     0x7feab8f11ccf - <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt::h39795cad8f90e005
   1:     0x7feab69fb91c - core::fmt::write::h2b281d9025b7c47b
   2:     0x7feab8edfa1e - std::io::Write::write_fmt::h18044c54acec8470
   3:     0x7feab8f1717f - std::sys_common::backtrace::print::h154e885ac7142937
   4:     0x7feab8f16a5b - std::panicking::default_hook::{{closure}}::hcfb38eaaff34d735
   5:     0x7feab8f17a5e - std::panicking::rust_panic_with_hook::hb2c8227f8d32aff4
   6:     0x7feab8f174e8 - std::panicking::begin_panic_handler::{{closure}}::h987389c1664b681c
   7:     0x7feab8f17476 - std::sys_common::backtrace::__rust_end_short_backtrace::hfacc8ef11205d4dc
   8:     0x7feab8f17463 - rust_begin_unwind
   9:     0x7feab5b6ea94 - core::panicking::panic_fmt::h515a008904190f25
  10:     0x7feab5b6eb84 - core::panicking::panic::had15f1f483dc45e7
  11:     0x7feab5b6ef15 - core::option::unwrap_failed::hd954fbf94e17d53f
  12:     0x7feab77bf90f - polars_io::parquet::read_impl::column_idx_to_series::h938ec85bb1d4d3df
  13:     0x7feab77c0da4 - rayon::iter::plumbing::bridge_producer_consumer::helper::h98cb19b49fdaadca
  14:     0x7feab77c1b5c - <rayon_core::job::StackJob<L,F,R> as rayon_core::job::Job>::execute::h8d2a665270bebbf6
  15:     0x7feab5e6f23e - rayon_core::registry::WorkerThread::wait_until_cold::hd7c4074422a29a96
  16:     0x7feab8cb9dd7 - std::sys_common::backtrace::__rust_begin_short_backtrace::hae8c13be39a07912
  17:     0x7feab8cb9ba7 - core::ops::function::FnOnce::call_once{{vtable.shim}}::h87026f93d05fb30c
  18:     0x7feab8f1a036 - std::sys::pal::unix::thread::Thread::new::thread_start::h32fa493a81385954
  19:     0x7feabb138ac3 - <unknown>
  20:     0x7feabb1ca850 - <unknown>
  21:                0x0 - <unknown>

Issue description

I am trying to make schema evolved delta tables readable with polars-deltalake, however Polars does not seem to automatically create null arrays when columns are missing from a parquet file when you read it with a reader schema.

Expected behavior

When you read a parquet with a reader_schema and only a subset of columns are availabe in the parquet file, then polars should create null arrays of the respective columns and it's types that are missing.

This is also the behavior of datafusion and pyarrow when you scan multiple parquets with a provided schema.

Installed versions

crates 0.38.2
orlp commented 8 months ago

I'm not sure if I'm a huge fan of just silently returning null columns for missing columns - what if you misspelled a column?

Perhaps this could be some sort of opt-in option? Do you know why DataFusion and PyArrow chose this default behavior?

ion-elgreco commented 8 months ago

I'm not sure if I'm a huge fan of just silently returning null columns for missing columns - what if you misspelled a column?

Perhaps this could be some sort of opt-in option? Do you know why DataFusion and PyArrow chose this default behavior?

I am already very happy if it's just an opt-in on plan level for the Scan execution! You likely won't ever run into a misspelled column because the reader_schema is constructed from the Delta Table Log, which in itself enforces always a correct and latest version of the data schema.

Not sure why it's the default but if this wasn't possible then in theory you have to rewrite all old parquet files to simply add a null column just so you could read newer parquet files where you added an additional column.

aersam commented 8 months ago

I also have this use case, but should be opt-in. Sometimes newer parquets have more cols and you don't want to touch old files to add them. I have this in a non-delta extract layer and currently have to read metadata in an extra step to know which cols are in which file

ritchie46 commented 8 months ago

The reader schema should belong to the file. If you want to add null columns if the schemas don't match that should be in the compute layer on top of that.

ion-elgreco commented 8 months ago

@ritchie46 that wouldn't work though, the way delta lake works is there is 1 single schema for the table, but each individual parquet could have a subset of the columns due to how schema evolution works.

If the reader_schema requires to get the schema of each single parquet to be exactly how the parquet is structured then, you would have to query each file metadata to read it.

DataFusion and PyArrow have no problem reading parquet tables with mixed schema's as long as you provide a top-level schema to read the dataset with.

See here the docs on DataFusion: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileScanConfig.html#structfield.file_schema

Schema before projection is applied. It contains the **all columns that may** appear in the files. It does not include table partition columns that may be added.

ritchie46 commented 8 months ago

If you fetch the metadata of the file you can get the file schema. That can be used.

ion-elgreco commented 8 months ago

If you fetch the metadata of the file you can get the file schema. That can be used.

My whole point is you can avoid that if you have an apriori on the correct schema

nameexhaustion commented 1 month ago

Closed as completed via https://github.com/pola-rs/polars/pull/18922