delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
2.2k stars 395 forks source link

DataFusion filter on partition column doesn't work. (when the phsical schema ordering is different to logical one) #2494

Closed Veiasai closed 2 months ago

Veiasai commented 4 months ago

Environment

Linux, Rust Delta-rs version: 0.17.3

Binding:

Environment:


Bug

What happened: The filter expr didn't return expected rows. My table is relatively big so I tried to construct a minimal test to reproduce it, see below code. Besides, from what I see in the log, my guess is:

  1. delta scan is good, it successfully prune irrelated files.
  2. https://github.com/delta-io/delta-rs/pull/1071/files#diff-f3a4847c9506848f6f5bf021b5f10fb24602373580e58739bd2a2a24f9878e77R438 we use InExact filter push down, so datafusion apply the same filter again, but however, the physical plan gets wrong column index.
  3. I am not an expert on datafusion or delta-rs.. so I stop here... thank you in advance for any help...

What you expected to happen:

How to reproduce it:

I wrote a unit test to check it, but it seems like I don't have permission to push it?

    #[tokio::test]
    async fn delta_scan_mixed_partition_order_and_filter() {
        let schema = Arc::new(ArrowSchema::new(vec![
            Field::new("modified", DataType::Utf8, true),
            Field::new("id", DataType::Utf8, true),
            Field::new("value", DataType::Int32, true),
        ]));

        let table = crate::DeltaOps::new_in_memory()
            .create()
            .with_columns(get_delta_schema().fields().clone())
            .with_partition_columns(["modified", "id"])
            .await
            .unwrap();
        assert_eq!(table.version(), 0);

        let batch = RecordBatch::try_new(
            schema.clone(),
            vec![
                Arc::new(arrow::array::StringArray::from(vec![
                    "2021-02-01",
                ])),
                Arc::new(arrow::array::StringArray::from(vec!["A"])),
                Arc::new(arrow::array::Int32Array::from(vec![1])),
            ],
        )
        .unwrap();
        // write some data
        let table = crate::DeltaOps(table)
            .write(vec![batch.clone()])
            .with_save_mode(crate::protocol::SaveMode::Append)
            .await
            .unwrap();

        let provider = Arc::new(table);
        let ctx = SessionContext::new();
        let df = ctx.read_table(provider).unwrap();

        let actual = df.clone().collect().await.unwrap();
        let expected = vec![
            "+-------+------------+----+",
            "| value | modified   | id |",
            "+-------+------------+----+",
            "| 1     | 2021-02-01 | A  |",
            "+-------+------------+----+",
        ];
        assert_batches_sorted_eq!(&expected, &actual);

        let actual = df.clone().filter(col("value").eq(lit(1))).unwrap().collect().await.unwrap();
        assert_batches_sorted_eq!(&expected, &actual);

        let actual = df.clone().filter(col("id").eq(lit("A"))).unwrap().collect().await.unwrap();
        assert_batches_sorted_eq!(&expected, &actual);
    }

More details:

expected:

[
    "+-------+------------+----+",
    "| value | modified   | id |",
    "+-------+------------+----+",
    "| 1     | 2021-02-01 | A  |",
    "+-------+------------+----+",
]
actual:

[
    "++",
    "++",
]

  left: ["+-------+------------+----+", "| value | modified   | id |", "+-------+------------+----+", "| 1     | 2021-02-01 | A  |", "+-------+------------+----+"]
 right: ["++", "++"]
Veiasai commented 4 months ago

One more suggestion: Actually, we are able to return dynamic filter push down flag?

pub enum TableProviderFilterPushDown {
    /// The expression cannot be used by the provider.
    Unsupported,
    /// The expression can be used to reduce the data retrieved,
    /// but the provider cannot guarantee it will omit all tuples that
    /// may be filtered. In this case, DataFusion will apply an additional
    /// `Filter` operation after the scan to ensure all rows are filtered correctly.
    Inexact,
    /// The provider **guarantees** that it will omit **all** tuples that are
    /// filtered by the filter expression. This is the fastest option, if available
    /// as DataFusion will not apply additional filtering.
    Exact,
}

when the expr only includes partition columns, we should return Exact.

rtyler commented 4 months ago

Thanks for taking the time to write a test @Veiasai ! I'll take a look at this shortly

Veiasai commented 4 months ago

hey, any updates?

aditanase commented 3 months ago

@rtyler I have a local fix for this issue - I am not sure on what the delta protocol dictates, but in some of our test tables the partitioning columns would appear in a different order in the json schema and in the partition columns array.

_arrow_schema uses an iterator + chain + 2 filters on the schema, while the rest of the code (e.g. DeltaScanBuilder.build) will filter them out, then append them explicitly in the order dictated by partition_columns.

This is the essence of my fix

fn _arrow_schema(snapshot: &Snapshot, wrap_partitions: bool) -> DeltaResult<ArrowSchemaRef> {
    let meta = snapshot.metadata();

    let schema = meta.schema()?;
    let fields = schema
        .fields()
        .filter(|f| !meta.partition_columns.contains(&f.name().to_string()))
        .map(|f| f.try_into())
        .chain(
            // keep consistent order of partitioning columns
            meta.partition_columns.iter().map(|partition_col| {
                let f = schema.field(partition_col).unwrap();
                let field = Field::try_from(f)?;
                // ...

LMK if this is enough as a pointer or I should send a PR with this.

aditanase commented 3 months ago

@rtyler I've sent a PR just in case https://github.com/delta-io/delta-rs/pull/2614

Would be glad to add some tests if you point me at the correct suite or an example, I was looking for a test with more than one partitioning column and didn't find anything.