apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
819 stars 163 forks source link

Missing ColumnarToRow when using CometSparkToColumnar #1092

Open bmorck opened 5 hours ago

bmorck commented 5 hours ago

Describe the bug

I'm working on some internal benchmarks using Comet 0.3.0 with Spark 3.3 and Iceberg. To support iceberg, we are including the config, which inserts the CometSparkToColumnar nodes following the BatchScans

"spark.comet.sparkToColumnar.enabled" = "true" "spark.comet.sparkToColumnar.supportedOperatorList" = "BatchScan"

In the following example, we see that there is a missing ColumnarToRow preceding the Project (19) operator. This results in the query failing. After analyzing the query optimization, I've found that the EliminateRedundantTransitions rule, removed a CometSparkToColumnar and subsequent ColumnarToRow following the BatchScan (20) operator, due to the subsequent Filter (21) operator requiring row format.

> CometSort (27)
>    +- CometColumnarExchange (26)
>       +- BroadcastNestedLoopJoin Cross BuildRight (25)
>          :- Project (19)
>          :  +- CometSortMergeJoin (18)
>          :     :- CometSort (6)
>          :     :  +- ShuffleQueryStage (5)
>          :     :     +- CometExchange (4)
>          :     :        +- CometFilter (3)
>          :     :           +- CometSparkToColumnar (2)
>          :     :              +- BatchScan (1)
>          :     +- CometSort (17)
>          :        +- CometExchange (16)
>          :           +- CometFilter (15)
>          :              +- CometHashAggregate (14)
>          :                 +- ShuffleQueryStage (13)
>          :                    +- CometExchange (12)
>          :                       +- CometHashAggregate (11)
>          :                          +- CometProject (10)
>          :                             +- CometFilter (9)
>          :                                +- CometSparkToColumnar (8)
>          :                                   +- BatchScan (7)
>          +- BroadcastQueryStage (24)
>             +- BroadcastExchange (23)
>                +- * Project (22)
>                   +- * Filter (21)
>                      +- BatchScan (20)
> 

I've modified the filter to be able to be converted to native and we see the query inserts the appropriate ColumnarToRow transitions, as shown below. It's unclear if this is a bug in Sparks ApplyColumnarRulesAndInsertTransitions rule or if this is unique to Comet, but it seems like incorrect behavior when using the CometSparkToColumnarNode

>    ColumnarToRow (34)
>    +- CometSort (33)
>       +- AQEShuffleRead (32)
>          +- ShuffleQueryStage (31), Statistics(sizeInBytes=4.2 KiB, rowCount=36)
>             +- CometColumnarExchange (30)
>                +- BroadcastNestedLoopJoin Cross BuildRight (29)
>                   :- Project (21)
>                   :  +- ColumnarToRow (20)
>                   :     +- CometBroadcastHashJoin (19)
>                   :        :- BroadcastQueryStage (8), Statistics(sizeInBytes=319.7 KiB, rowCount=583)
>                   :        :  +- CometBroadcastExchange (7)
>                   :        :     +- AQEShuffleRead (6)
>                   :        :        +- ShuffleQueryStage (5), Statistics(sizeInBytes=409.3 KiB, rowCount=583)
>                   :        :           +- CometExchange (4)
>                   :        :              +- CometFilter (3)
>                   :        :                 +- CometSparkToColumnar (2)
>                   :        :                    +- BatchScan (1)
>                   :        +- CometFilter (18)
>                   :           +- CometHashAggregate (17)
>                   :              +- AQEShuffleRead (16)
>                   :                 +- ShuffleQueryStage (15), Statistics(sizeInBytes=2040.0 B, rowCount=10)
>                   :                    +- CometExchange (14)
>                   :                       +- CometHashAggregate (13)
>                   :                          +- CometProject (12)
>                   :                             +- CometFilter (11)
>                   :                                +- CometSparkToColumnar (10)
>                   :                                   +- BatchScan (9)
>                   +- BroadcastQueryStage (28), Statistics(sizeInBytes=864.0 B, rowCount=36)
>                      +- BroadcastExchange (27)
>                         +- ColumnarToRow (26)
>                            +- CometProject (25)
>                               +- CometFilter (24)
>                                  +- CometSparkToColumnar (23)
>                                     +- BatchScan (22)

Steps to reproduce

No response

Expected behavior

All appropriate ColumnarToRow operators are inserted and query succeeds

Additional context

No response

andygrove commented 3 hours ago

I'm in the process of creating the first 0.4.0 release candidate and am uploading the jar files to a maven staging repository. It may be worth testing with this newer version. I'll post more details once the jars are available.

andygrove commented 3 hours ago

@bmorck You can download 0.4.0-rc1 jar files from https://repository.apache.org/#nexus-search;quick~org.apache.datafusion

huaxingao commented 3 hours ago

@bmorck Thanks for reporting the bug. When you ran the benchmark, did you use the changes from the upstream Iceberg PR?

viirya commented 2 hours ago

I slightly updated the description to make the query plan readable.

viirya commented 2 hours ago

After analyzing the query optimization, I've found that the EliminateRedundantTransitions rule, removed a CometSparkToColumnar and subsequent ColumnarToRow following the BatchScan (20) operator, due to the subsequent Filter (21) operator requiring row format.

Removing CometSparkToColumnar + ColumnarToRow from the top of BatchScan (20) looks correct if the Filter (21) cannot be converted to CometFilter.

In the following example, we see that there is a missing ColumnarToRow preceding the Project (19) operator. This results in the query failing.

I'm not sure how is it related to the issue on Project (19)?

bmorck commented 1 hour ago

@viirya It's not clear to me that the issue is related to the issue on Project (19) however, I noticed that the appropriate ColumnarToRow operators are injected when the filter is able to be converted to native and hence the EliminateRedundantTransitions rule doesn't remove any nodes. I've seen this occur other queries as well. Here is another example:

> +- AdaptiveSparkPlan (46)
>    +- == Current Plan ==
>       Sort (26)
>       +- Project (25)
>          +- Filter (24)
>             +- Window (23)
>                +- Sort (22)
>                   +- Exchange (21)
>                      +- Union (20)
>                         :- Project (10)
>                         :  +- Filter (9)
>                         :     +- Window (8)
>                         :        +- CometSort (7)
>                         :           +- ShuffleQueryStage (6)
>                         :              +- CometExchange (5)
>                         :                 +- CometProject (4)
>                         :                    +- CometFilter (3)
>                         :                       +- CometSparkToColumnar (2)
>                         :                          +- BatchScan (1)
>                         +- Project (19)
>                            +- Filter (18)
>                               +- Window (17)
>                                  +- Sort (16)
>                                     +- ShuffleQueryStage (15)
>                                        +- Exchange (14)
>                                           +- Project (13)
>                                              +- Filter (12)
>                                                 +- BatchScan (11)

@huaxingao I didn't port over the PR yet, that's my next step and perhaps that resolves the issue. Will report here if I find that to be the case

@andygrove Thanks for providing the new jar! Will try it out. We also are using an internal fork of comet (current changes are only enabling Spark 3.3 on some of the version restricted operators) so I will also rebase to include latest changes

viirya commented 1 hour ago

The only place in Comet planner to remove ColumnarToRowExec is when there is a combination ColumnarToRowExec + CometSparkToColumnarExec. As the combination is a no-op actually, we can remove it from the query plan.

And it is removed from the top of BatchScan (20), it is not even close to Project (19), so I'm wondering why it is related. 🤔

Do you have an example query we can reproduce this?