apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.36k stars 1.2k forks source link

Simplify expressions swallows a cast expression #13481

Closed gruuya closed 6 hours ago

gruuya commented 5 days ago

Describe the bug

It seems SimplifyExpressions will sometimes omit a cast which was introduced by the TypeCoercion analyzer rule, leading to a loss in the precision of the output DataType.

To Reproduce

Run

    #[test]
    fn test_simplify_case_cast_list() {
        use datafusion_functions_nested::expr_fn::make_array;

        let element_field = Arc::new(Field::new("element", DataType::Int32, true));
        let items_field = Field::new(
            "items",
            DataType::List(element_field.clone()),
            true,
        );
        let schema = Schema::new(vec![items_field.clone()]).to_dfschema().unwrap();

        let expr = cast(
            make_array(vec![lit(2), lit(3)]),
            DataType::List(element_field.clone()),
        );

        let result = simplify(expr);

        let expected = cast(
            lit(ScalarValue::List(ScalarValue::new_list_nullable(
                &[ScalarValue::Int32(Some(2)), ScalarValue::Int32(Some(3))],
                &DataType::Int32,
            ))),
            DataType::List(element_field),
        );

        assert_eq!(result, expected);
        assert_eq!(
            result.data_type_and_nullable(&schema).unwrap(),
            expected.data_type_and_nullable(&schema).unwrap(),
        );
    }

The cast clause gets stripped since the constant expression gets evaluated, so the first assert (expectedly) fails with

assertion `left == right` failed
  left: Alias(Alias { expr: Case(Case { expr: Some(Column(Column { relation: None, name: "condition" })), when_then_expr: [(Literal(Boolean(false)), Literal(List([2, 3])))], else_expr: Some(Column(Column { relation: None, name: "items" })) }), relation: None, name: "items" })
 right: Alias(Alias { expr: Case(Case { expr: Some(Column(Column { relation: None, name: "condition" })), when_then_expr: [(Literal(Boolean(false)), Cast(Cast { expr: Literal(List([2, 3])), data_type: List(Field { name: "element", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }) }))], else_expr: Some(Column(Column { relation: None, name: "items" })) }), relation: None, name: "items" })

However the second assert will also (unexpectedly) fail with

assertion `left == right` failed
  left: (List(Field { name: "item", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), false)
 right: (List(Field { name: "element", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), false)

Meaning that the custom list item field name is lost now.

Effects

I believe this in turn causes https://github.com/delta-io/delta-rs/pull/2886.

Basically TypeCoercion injects the CAST, while SimplifyExpressions omits it without replacing the precise data type. Consequently OptimizeProjections folds the two projections, but the schema is computed from the outermost projection expressions, which now lack the proper type info (i.e. the non-standard list item field names).

Minimal repro:

    #[tokio::test]
    async fn test_list_item() -> Result<()> {
        use datafusion_functions_nested::expr_fn::make_array;

        let element_field = Arc::new(Field::new("element", DataType::Int32, true));
        let items_field = Field::new(
            "items",
            DataType::List(element_field.clone()),
            true,
        );
        let schema = Schema::new(vec![items_field.clone()]);

        let mut items_builder =
            ListBuilder::new(Int32Builder::new()).with_field(element_field.clone());
        items_builder.append_value([Some(1)]);
        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(items_builder.finish())])?;

        let ctx = SessionContext::new();
        let df = ctx.read_batch(batch).expect("source DataFrame")
            .with_column("condition", lit(false))?
            .select(vec![case(col("condition")).when(lit(false), make_array(vec![lit(2), lit(3)])).otherwise(col("items"))?.alias("items")])?;

        let _ = df.create_physical_plan().await?;

        Ok(())
    }

Fails with Error: Context("Optimizer rule 'optimize_projections' failed", Context("optimize_projections", Internal("Failed due to a difference in schemas, original schema: DFSchema { inner: Schema { fields: [Field { name: \"items\", data_type: List(Field { name: \"element\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [None], functional_dependencies: FunctionalDependencies { deps: [] } }, new schema: DFSchema { inner: Schema { fields: [Field { name: \"items\", data_type: List(Field { name: \"item\", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }, field_qualifiers: [None], functional_dependencies: FunctionalDependencies { deps: [] } }")))

Expected behavior

SimplifyExpressions should retain exact output type, and there should be no optimizer errors in the second (high-level) example.

Additional context

No response

gruuya commented 4 days ago

To reiterate, the real check for a fix here should be passing test like this

    #[test]
    fn test_simplify_cast_list() {
        use datafusion_functions_nested::expr_fn::make_array;

        let element_field = Arc::new(Field::new("element", DataType::Int32, true));
        let items_field =
            Field::new("items", DataType::List(element_field.clone()), true);
        let schema = Schema::new(vec![items_field.clone()])
            .to_dfschema()
            .unwrap();

        let input = cast(
            make_array(vec![lit(2), lit(3)]),
            DataType::List(element_field.clone()),
        );

        let output = simplify(input.clone());
        assert_eq!(
            input.data_type_and_nullable(&schema).unwrap(),
            output.data_type_and_nullable(&schema).unwrap(),
        );
    }

In other words we'd want the type (and null-ability?) to remain the same after simplification.

gruuya commented 4 days ago

Ok I think that whereas https://github.com/apache/datafusion/pull/13468 restores the data type equivalence after simplification (and thus unblocks the delta-rs issue) it does not help with nullabilty matching as well. Arguably nullability equivalence should not be a blocker, and could be tracked separately I think (if at all).

gruuya commented 4 days ago

Indeed digging a bit more suggests that non-matching nullability is sort of ok, as it can be explained away like this

Incidentally the first one matches the actual nullability in the schema, whereas the second doesn't/

alamb commented 4 days ago

@gruuya I am sort of lost --are you saying that https://github.com/apache/datafusion/pull/13468 does not fix this issue? That would make sense to me

gruuya commented 3 days ago

Sorry for the confusion, the tldr is that :

So I think we should merge it once everything's been addressed.