apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
823 stars 163 forks source link

chore: Upgrade to DataFusion 43.0.0-rc1 #1057

Closed andygrove closed 2 weeks ago

andygrove commented 2 weeks ago

Which issue does this PR close?

N/A

Rationale for this change

Voting has started on DataFusion 43.0.0-rc1, so we should make sure that it does not cause any regressions in Comet.

What changes are included in this PR?

How are these changes tested?

andygrove commented 2 weeks ago

Test failure in CI:

- SortMergeJoin with join filter *** FAILED *** (1 second, 828 milliseconds)
  Results do not match for query:
  Timezone: sun.util.calendar.ZoneInfo[id="America/Los_Angeles",offset=-28800000,dstSavings=3600000,useDaylight=true,transitions=185,lastRule=java.util.SimpleTimeZone[id=America/Los_Angeles,offset=-28800000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=3,startMonth=2,startDay=8,startDayOfWeek=1,startTime=7200000,startTimeMode=0,endMode=3,endMonth=10,endDay=1,endDayOfWeek=1,endTime=7200000,endTimeMode=0]]
  Timezone Env: 

  == Parsed Logical Plan ==
  Project [_1#465976, _2#465977, _1#465966, _2#465967]
  +- Join FullOuter, ((_2#465967 = _1#465976) AND (_1#465966 > _2#465977))
     :- SubqueryAlias tbl_b
     :  +- View (`tbl_b`, [_1#465976,_2#465977])
     :     +- Relation [_1#465976,_2#465977] parquet
     +- SubqueryAlias tbl_a
        +- View (`tbl_a`, [_1#465966,_2#465967])
           +- Relation [_1#465966,_2#465967] parquet

  == Analyzed Logical Plan ==
  _1: int, _2: int, _1: int, _2: int
  Project [_1#465976, _2#465977, _1#465966, _2#465967]
  +- Join FullOuter, ((_2#465967 = _1#465976) AND (_1#465966 > _2#465977))
     :- SubqueryAlias tbl_b
     :  +- View (`tbl_b`, [_1#465976,_2#465977])
     :     +- Relation [_1#465976,_2#465977] parquet
     +- SubqueryAlias tbl_a
        +- View (`tbl_a`, [_1#465966,_2#465967])
           +- Relation [_1#465966,_2#465967] parquet

  == Optimized Logical Plan ==
  Join FullOuter, ((_2#465967 = _1#465976) AND (_1#465966 > _2#465977))
  :- Relation [_1#465976,_2#465977] parquet
  +- Relation [_1#465966,_2#465967] parquet

  == Physical Plan ==
  AdaptiveSparkPlan isFinalPlan=true
  +- == Final Plan ==
     *(1) ColumnarToRow
     +- CometSortMergeJoin [_1#465976], [_2#465967], FullOuter, (_1#465966 > _2#465977)
        :- CometSort [_1#465976, _2#465977], [_1#465976 ASC NULLS FIRST]
        :  +- AQEShuffleRead coalesced
        :     +- ShuffleQueryStage 0
        :        +- CometExchange hashpartitioning(_1#465976, 10), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=1511432]
        :           +- CometScan parquet [_1#465976,_2#465977] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/Users/runner/work/datafusion-comet/datafusion-comet/spark/target..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:int,_2:int>
        +- CometSort [_1#465966, _2#465967], [_2#465967 ASC NULLS FIRST]
           +- AQEShuffleRead coalesced
              +- ShuffleQueryStage 1
                 +- CometExchange hashpartitioning(_2#465967, 10), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=1511442]
                    +- CometScan parquet [_1#465966,_2#465967] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/Users/runner/work/datafusion-comet/datafusion-comet/spark/target..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:int,_2:int>
  +- == Initial Plan ==
     CometSortMergeJoin [_1#465976], [_2#465967], FullOuter, (_1#465966 > _2#465977)
     :- CometSort [_1#465976, _2#465977], [_1#465976 ASC NULLS FIRST]
     :  +- CometExchange hashpartitioning(_1#465976, 10), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=1511249]
     :     +- CometScan parquet [_1#465976,_2#465977] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/Users/runner/work/datafusion-comet/datafusion-comet/spark/target..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:int,_2:int>
     +- CometSort [_1#465966, _2#465967], [_2#465967 ASC NULLS FIRST]
        +- CometExchange hashpartitioning(_2#465967, 10), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=1511251]
           +- CometScan parquet [_1#465966,_2#465967] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/Users/runner/work/datafusion-comet/datafusion-comet/spark/target..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:int,_2:int>

  == Results ==

  == Results ==
  !== Correct Answer - 15 ==             == Spark Answer - 20 ==
   struct<_1:int,_2:int,_1:int,_2:int>   struct<_1:int,_2:int,_1:int,_2:int>
   [0,2,5,0]                             [0,2,5,0]
  ![1,3,6,1]                             [0,2,null,null]
  ![2,4,7,2]                             [1,3,6,1]
  ![3,5,8,3]                             [1,3,null,null]
  ![4,6,9,4]                             [2,4,7,2]
  ![5,7,null,null]                       [2,4,null,null]
  ![6,8,null,null]                       [3,5,8,3]
  ![7,9,null,null]                       [3,5,null,null]
  ![8,10,null,null]                      [4,6,9,4]
  ![9,11,null,null]                      [4,6,null,null]
  ![null,null,0,0]                       [5,7,null,null]
  ![null,null,1,1]                       [6,8,null,null]
  ![null,null,2,2]                       [7,9,null,null]
  ![null,null,3,3]                       [8,10,null,null]
  ![null,null,4,4]                       [9,11,null,null]
  !                                      [null,null,0,0]
  !                                      [null,null,1,1]
  !                                      [null,null,2,2]
  !                                      [null,null,3,3]
  !                                      [null,null,4,4] (QueryTest.scala:243)
andygrove commented 2 weeks ago

@comphead FYI; it looks like there may be a regression in DF 43 related to sort-merge join with join filter.

I am tempted to ignore this test for now and file a follow on issue given that we disable this feature by default.

comphead commented 2 weeks ago

@comphead FYI; it looks like there may be a regression in DF 43 related to sort-merge join with join filter.

I am tempted to ignore this test for now and file a follow on issue given that we disable this feature by default.

Thanks @andygrove it is FullOuterJoin filtered join and expected behavior, I'm planning to finish the FullOuterJoin in DF this week.

andygrove commented 2 weeks ago

I ran some TPC-H benchmarks and do not see any change to performance with the DF 43 upgrade.