apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.23k stars 1.18k forks source link

Substrait: Add SubqueryAlias support #6867

Open nseekhao opened 1 year ago

nseekhao commented 1 year ago

Is your feature request related to a problem or challenge?

If there is a SubqueryAlias relation, datafusion-substrait will bypass it. This works for the producer, the generated Substrait plans are correct. However, the DF plan generated with the consumer will be incorrect since it has no way to distinguish between the different relations that read from the same table.

This can be demonstrated in these examples:

#[tokio::test]
async fn roundtrip_non_equi_inner_join_table_reuse() -> Result<()> {
    roundtrip("SELECT d1.a FROM data d1 JOIN data d2 ON d1.e <> d2.e").await
}
//thread 'tests::roundtrip_non_equi_inner_join_table_reuse' panicked at 'assertion failed: `(left == right)`
//  left: `"Projection: d1.a\n  Inner Join:  Filter: d1.e != d2.e\n    SubqueryAlias: d1\n      TableScan: data projection=[a, e]\n    SubqueryAlias: d2\n      TableScan: data projection=[e]"`,
// right: `"Projection: data.a\n  Inner Join: \n    Projection: data.a\n      Filter: data.e != data.e\n        TableScan: data projection=[a, e], partial_filters=[data.e != data.e]\n    TableScan: data projection=[e]"`', datafusion/substrait/tests/roundtrip_logical_plan.rs:628:9

The original DF plan is:

Projection: d1.a
  Inner Join:  Filter: d1.e != d2.e
    SubqueryAlias: d1
      TableScan: data projection=[a, e]
    SubqueryAlias: d2
      TableScan: data projection=[e]

once this plan is fed through the producer, we get the correct Substrait plan:

Plan {
    version: Some(
        Version {
            major_number: 0,
            minor_number: 31,
            patch_number: 0,
            git_hash: "",
            producer: "datafusion",
        },
    ),
    extension_uris: [],
    extensions: [
        SimpleExtensionDeclaration {
            mapping_type: Some(
                ExtensionFunction(
                    ExtensionFunction {
                        extension_uri_reference: 4294967295,
                        function_anchor: 0,
                        name: "not_equal",
                    },
                ),
            ),
        },
    ],
    relations: [
        PlanRel {
            rel_type: Some(
                Root(
                    RelRoot {
                        input: Some(
                            Rel {
                                rel_type: Some(
                                    Project(
                                        ProjectRel {
                                            common: None,
                                            input: Some(
                                                Rel {
                                                    rel_type: Some(
                                                        Join(
                                                            JoinRel {
                                                                common: None,
                                                                left: Some(
                                                                    Rel {
                                                                        rel_type: Some(
                                                                            Read(
                                                                                ReadRel {
                                                                                    common: None,
                                                                                    base_schema: Some(
                                                                                        NamedStruct {
                                                                                            names: [
                                                                                                "a",
                                                                                                "b",
                                                                                                "c",
                                                                                                "d",
                                                                                                "e",
                                                                                                "f",
                                                                                            ],
                                                                                            r#struct: None,
                                                                                        },
                                                                                    ),
                                                                                    filter: None,
                                                                                    best_effort_filter: None,
                                                                                    projection: Some(
                                                                                        MaskExpression {
                                                                                            select: Some(
                                                                                                StructSelect {
                                                                                                    struct_items: [
                                                                                                        StructItem {
                                                                                                            field: 0,
                                                                                                            child: None,
                                                                                                        },
                                                                                                        StructItem {
                                                                                                            field: 4,
                                                                                                            child: None,
                                                                                                        },
                                                                                                    ],
                                                                                                },
                                                                                            ),
                                                                                            maintain_singular_struct: false,
                                                                                        },
                                                                                    ),
                                                                                    advanced_extension: None,
                                                                                    read_type: Some(
                                                                                        NamedTable(
                                                                                            NamedTable {
                                                                                                names: [
                                                                                                    "data",
                                                                                                ],
                                                                                                advanced_extension: None,
                                                                                            },
                                                                                        ),
                                                                                    ),
                                                                                },
                                                                            ),
                                                                        ),
                                                                    },
                                                                ),
                                                                right: Some(
                                                                    Rel {
                                                                        rel_type: Some(
                                                                            Read(
                                                                                ReadRel {
                                                                                    common: None,
                                                                                    base_schema: Some(
                                                                                        NamedStruct {
                                                                                            names: [
                                                                                                "a",
                                                                                                "b",
                                                                                                "c",
                                                                                                "d",
                                                                                                "e",
                                                                                                "f",
                                                                                            ],
                                                                                            r#struct: None,
                                                                                        },
                                                                                    ),
                                                                                    filter: None,
                                                                                    best_effort_filter: None,
                                                                                    projection: Some(
                                                                                        MaskExpression {
                                                                                            select: Some(
                                                                                                StructSelect {
                                                                                                    struct_items: [
                                                                                                        StructItem {
                                                                                                            field: 4,
                                                                                                            child: None,
                                                                                                        },
                                                                                                    ],
                                                                                                },
                                                                                            ),
                                                                                            maintain_singular_struct: false,
                                                                                        },
                                                                                    ),
                                                                                    advanced_extension: None,
                                                                                    read_type: Some(
                                                                                        NamedTable(
                                                                                            NamedTable {
                                                                                                names: [
                                                                                                    "data",
                                                                                                ],
                                                                                                advanced_extension: None,
                                                                                            },
                                                                                        ),
                                                                                    ),
                                                                                },
                                                                            ),
                                                                        ),
                                                                    },
                                                                ),
                                                                expression: None,
                                                                post_join_filter: Some(
                                                                    Expression {
                                                                        rex_type: Some(
                                                                            ScalarFunction(
                                                                                ScalarFunction {
                                                                                    function_reference: 0,
                                                                                    arguments: [
                                                                                        FunctionArgument {
                                                                                            arg_type: Some(
                                                                                                Value(
                                                                                                    Expression {
                                                                                                        rex_type: Some(
                                                                                                            Selection(
                                                                                                                FieldReference {
                                                                                                                    reference_type: Some(
                                                                                                                        DirectReference(
                                                                                                                            ReferenceSegment {
                                                                                                                                reference_type: Some(
                                                                                                                                    StructField(
                                                                                                                                        StructField {
                                                                                                                                            field: 1,
                                                                                                                                            child: None,
                                                                                                                                        },
                                                                                                                                    ),
                                                                                                                                ),
                                                                                                                            },
                                                                                                                        ),
                                                                                                                    ),
                                                                                                                    root_type: None,
                                                                                                                },
                                                                                                            ),
                                                                                                        ),
                                                                                                    },
                                                                                                ),
                                                                                            ),
                                                                                        },
                                                                                        FunctionArgument {
                                                                                            arg_type: Some(
                                                                                                Value(
                                                                                                    Expression {
                                                                                                        rex_type: Some(
                                                                                                            Selection(
                                                                                                                FieldReference {
                                                                                                                    reference_type: Some(
                                                                                                                        DirectReference(
                                                                                                                            ReferenceSegment {
                                                                                                                                reference_type: Some(
                                                                                                                                    StructField(
                                                                                                                                        StructField {
                                                                                                                                            field: 2,
                                                                                                                                            child: None,
                                                                                                                                        },
                                                                                                                                    ),
                                                                                                                                ),
                                                                                                                            },
                                                                                                                        ),
                                                                                                                    ),
                                                                                                                    root_type: None,
                                                                                                                },
                                                                                                            ),
                                                                                                        ),
                                                                                                    },
                                                                                                ),
                                                                                            ),
                                                                                        },
                                                                                    ],
                                                                                    options: [],
                                                                                    output_type: None,
                                                                                    args: [],
                                                                                },
                                                                            ),
                                                                        ),
                                                                    },
                                                                ),
                                                                r#type: Inner,
                                                                advanced_extension: None,
                                                            },
                                                        ),
                                                    ),
                                                },
                                            ),
                                            expressions: [
                                                Expression {
                                                    rex_type: Some(
                                                        Selection(
                                                            FieldReference {
                                                                reference_type: Some(
                                                                    DirectReference(
                                                                        ReferenceSegment {
                                                                            reference_type: Some(
                                                                                StructField(
                                                                                    StructField {
                                                                                        field: 0,
                                                                                        child: None,
                                                                                    },
                                                                                ),
                                                                            ),
                                                                        },
                                                                    ),
                                                                ),
                                                                root_type: None,
                                                            },
                                                        ),
                                                    ),
                                                },
                                            ],
                                            advanced_extension: None,
                                        },
                                    ),
                                ),
                            },
                        ),
                        names: [
                            "d1.a",
                        ],
                    },
                ),
            ),
        },
    ],
    advanced_extensions: None,
    expected_type_urls: [],
}

however, if we want to get back a DF plan, and use the consumer, we'll get:

[Unoptimized plan]
Projection: data.a
  Inner Join:  Filter: data.e != data.e
    TableScan: data projection=[a, e]
    TableScan: data projection=[e]

[Optimized plan]
Projection: data.a
  Inner Join: 
    Projection: data.a
      Filter: data.e != data.e
        TableScan: data projection=[a, e], partial_filters=[data.e != data.e]
    TableScan: data projection=[e]

Notice that because there is no way for DF to distinguish between the left data table and the right data table, DF thinks they are they are from the same TableScan relation. Thus, the output DF plan is incorrect.

Describe the solution you'd like

Preserve aliases in Substrait.

Describe alternatives you've considered

N/A

Additional context

Additional example:

#[tokio::test]
async fn roundtrip_exists_filter() -> Result<()> {
    roundtrip("SELECT b FROM data d1 WHERE EXISTS (SELECT * FROM data d2 WHERE d2.a = d1.a AND d2.e != d1.e)").await
}

// thread 'tests::roundtrip_exists_filter' panicked at 'assertion failed: `(left == right)`
//   left: `"Projection: d1.b\n  LeftSemi Join: d1.a = __correlated_sq_1.a Filter: __correlated_sq_1.e != d1.e\n    SubqueryAlias: d1\n      TableScan: data projection=[a, b, e]\n    SubqueryAlias: __correlated_sq_1\n      SubqueryAlias: d2\n        TableScan: data projection=[a, e]"`,
//  right: `"Projection: data.b\n  LeftSemi Join: data.a = data.a\n    Projection: data.a, data.b\n      Filter: data.e != data.e\n        TableScan: data projection=[a, b, e], partial_filters=[data.e != data.e]\n    TableScan: data projection=[a]"`', datafusion/substrait/tests/roundtrip_logical_plan.rs:625:9
[Original plan]
Projection: d1.b
  LeftSemi Join: d1.a = __correlated_sq_1.a Filter: __correlated_sq_1.e != d1.e
    SubqueryAlias: d1
      TableScan: data projection=[a, b, e]
    SubqueryAlias: __correlated_sq_1
      SubqueryAlias: d2
        TableScan: data projection=[a, e]

[Unoptimized plan from consumer]
Projection: data.b
  LeftSemi Join: data.a = data.a Filter: data.e != data.e
    TableScan: data projection=[a, b, e]
    TableScan: data projection=[a, e]

[Optimized plan from consumer (incorrect)]
Projection: data.b
  LeftSemi Join: data.a = data.a
    Projection: data.a, data.b
      Filter: data.e != data.e
        TableScan: data projection=[a, b, e], partial_filters=[data.e != data.e]
    TableScan: data projection=[a]
westonpace commented 12 months ago

Is your argument/concern that these two plans would produce different results?

[Original plan]
Projection: d1.b
  LeftSemi Join: d1.a = __correlated_sq_1.a Filter: __correlated_sq_1.e != d1.e
    SubqueryAlias: d1
      TableScan: data projection=[a, b, e]
    SubqueryAlias: __correlated_sq_1
      SubqueryAlias: d2
        TableScan: data projection=[a, e]

[Unoptimized plan from consumer]
Projection: data.b
  LeftSemi Join: data.a = data.a Filter: data.e != data.e
    TableScan: data projection=[a, b, e]
    TableScan: data projection=[a, e]

The aliases won't actually change the results. They appear identical to me.

Or is your concern that the aliases are lost because your application is depending on the aliases for some reason unrelated to the results?

westonpace commented 12 months ago

Or is the concern that the lack of aliases is somehow causing the optimizer to generate an incorrect optimization?

westonpace commented 12 months ago

Sorry to triple post my stream-of-consciousness. For context, I am asking because this came up in the Substrait community meeting today and the consensus is that this seems to be more of a datafusion issue (if datafusion's optimizer is giving different results with and without aliases) than a Substrait issue. That being said, I think there are things we can do to support aliases in Substrait. I'll post a comment on your issue there as well.

Blizzara commented 4 months ago

Doing this properly with aliases depends on https://github.com/substrait-io/substrait/issues/571 / https://github.com/substrait-io/substrait/pull/649.

However the problem is more within DataFusion - the Substrait plans are valid, since Substrait only cares about column indices, but DF handles columns by (qualified) name and thus cannot handle duplicate columns. https://github.com/apache/datafusion/pull/11049 I think does fix most of these cases in practice for DF, but not all.