delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
2.21k stars 395 forks source link

`DeltaScanBuilder` does not respect datafusion context's `datafusion.execution.parquet.pushdown_filters` #2739

Closed adamfaulkner-at closed 1 month ago

adamfaulkner-at commented 1 month ago

Environment

Delta-rs version: 0.18.1

Binding: ?

Environment: MacOS & Linux


Bug

What happened:

When I set up a datafusion context with parquet filter pushdown enabled, I expect it to propagate the filters to the parquet scan. However, this does not happen.

let ctx = SessionConfig::default().set_bool("datafusion.execution.parquet.pushdown_filters", true)
ctx.register_table("table", Arc::new(delta_table))?;
let table = ctx.table("table").await?;
let result_batches = table.filter(some_filter_expr)?.collect().await?

When running this with RUST_LOG=debug, I see the following log line, indicating that no predicate was pushed down:

[2024-08-06T21:47:54Z DEBUG datafusion::datasource::physical_plan::parquet] Creating ParquetExec, files: [[PartitionedFile { object_meta: ObjectMeta { location: Path { raw: "part-00000-7090b947-7f4f-4b3f-867e-60f070089207-c000.snappy.parquet" }, last_modified: 2024-08-05T22:39:04.322Z, size: 419156, e_tag: None, version: None }, partition_values: [], range: None, statistics: None, extensions: Non
e }, PartitionedFile { object_meta: ObjectMeta { location: Path { raw: "part-00000-c9b00314-b854-4e65-baf4-1df2384c23cb-c000.snappy.parquet" }, last_modified: 2024-08-05T22:39:00.136Z, size: 3620924, e_tag: None, version: None }, partition_values: [], range: None, statistics: None, extensions: None }]], projection Some([0, 1]), predicate: None, limit: None

(Note the "predicate: None")

What you expected to happen:

I expected predicates to be pushed down.

How to reproduce it:

From inspecting the code in DeltaScanBuilder and the implementation of TableProvider, it seems like the only way to enable pushdown is to use DeltaTableProvider to set the scan config rather than directly registering the DeltaTable with data fusion. However, due to https://github.com/delta-io/delta-rs/issues/2602 this is not possible either. So I don't think it's possible for any use of delta-rs to do filter pushdown right now.

More details:

ion-elgreco commented 1 month ago

So the default scan config sets enable_parquet_pushdown: true. so this should always execute:

        if let Some(predicate) = logical_filter {
            if config.enable_parquet_pushdown {
                exec_plan_builder = exec_plan_builder.with_predicate(predicate);
            }
        };

Taking a look btw

adamfaulkner-at commented 1 month ago

Thanks! I just realized that 0.18.1 is now an old version of delta-rs.

This seems like it was fixed in 0.18.2 with this PR, I'll give it a shot https://github.com/delta-io/delta-rs/pull/2637

It looks like another change was made in 0.19.0 which exactly addresses my comment about not respecting the datafusion session's option. https://github.com/delta-io/delta-rs/pull/2702

adamfaulkner-at commented 1 month ago

I've confirmed that 0.19.0 fixes this, sorry for the noise.