apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.28k stars 2.19k forks source link

Missing DynamicFileFilter when use iceberg in Spark 3.2 #5224

Closed dramaticlly closed 2 years ago

dramaticlly commented 2 years ago

Hey Iceberg Community:

we recently migrated from using iceberg 13 with Spark 3.1 to Spark 3.2 and realized a some existing SQL delete job are producing a lot more shuffling data than it was in spark 3.1, when explain the SQL statement with logical plan, we realized the https://github.com/apache/iceberg/blob/master/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DynamicFileFilter.scala is missing from the Spark 3.2 extensions and want some help to understand why.

Looks like dynamic file filter was introduced https://github.com/apache/iceberg/pull/3415/ on 10/31/2021 and initial spark 3.2 support was merged in https://github.com/apache/iceberg/pull/3335/ on 10/22/2021, so want to check if there's any time implication

delete SQL

DELETE FROM $table1
WHERE $table1.date <= '20211228' AND $table1.date >= '20220627'
AND upper($table1.$column1) IN (SELECT * FROM $table2)

Spark logical plan screenshot privacy-deletion-job_-_Details_for_Query_3_and_privacy-deletion-job_-_Details_for_Query_3

Iceberg Version: 0.13.0 (did not turn on v2/merge-on-read for this table yet)

Spark Version: 3.2.0 (TB of shuffle data generated) vs 3.1.1(works as expected with DynamicFileFilter)

Appreciate your help! CC @szehon-ho @rdblue @aokolnychyi @wypoon

aokolnychyi commented 2 years ago

We are using another approach for dynamic filtering in 3.2 that does not involve DynamicFileFilter. See RowLevelCommandDynamicPruning for more details. Can you share EXPLAIN EXTENDED output? Is dynamic partition pruning enabled in Spark?

dramaticlly commented 2 years ago

Thank you @aokolnychyi for jumping in quickly, here's detailed explain output I found for spark 3.2

so for this particular delete query we are looking at

DELETE FROM somedb.tableA tgt
WHERE tgt.partition_col >= '20220320' AND tgt.partition_col <= '20220321'
AND upper(tgt.col13) IN (SELECT * FROM tableB)

I do observe some dynamicpruning filter is used but I dont see the one specifically mentioned by you.

I also double checked spark.sql.optimizer.dynamicPartitionPruning.enabled is not explicit set in our application, default enabled in OSS and our spark distribution, so I assume it's turned on. I also tried to reproduce with explicit set this spark conf to be enabled, but still generate TB level of shuffle write, so might need second pair of eye here

I redacted some of irrelevant columns here below, hope it helps!


== Parsed Logical Plan ==

'DeleteFromTable ((('tgt.partition_col >= 20220320) AND ('tgt.partition_col <= 20220321)) AND 'upper('tgt.col13) IN (list#267 []))
:  +- 'Project [*]
:     +- 'UnresolvedRelation [tableB], [], false
+- 'SubqueryAlias tgt
   +- 'UnresolvedRelation [somedb, tableA], [], false

== Analyzed Logical Plan ==

DeleteFromTable (((partition_col#670 >= cast(20220320 as int)) AND (partition_col#670 <= cast(20220321 as int))) AND upper(col13#682) IN (list#267 []))
:  +- Project [col12#258]
:     +- SubqueryAlias tableB
:        +- View (`tableB`, [col12#258])
:           +- Deduplicate [col12#258]
:              +- Project [col12#258]
:                 +- Filter ((date#741 >= 20220605) AND (date#741 <= 20220705))
:                    +- Project [upper(col12)#252 AS col12#258, date#741]
:                       +- Project [upper(col12#739) AS upper(col12)#252, date#741]
:                          +- RelationV2[id#738, col12#739, ] spark_catalog.anotherdb.tableC
:- RelationV2[partition_col#670, col13#682, ] spark_catalog.somedb.tableA
+- ReplaceData RelationV2[partition_col#670, col13#682, ] spark_catalog.somedb.tableA
   +- Filter NOT ((((partition_col#670 >= cast(20220320 as int)) AND (partition_col#670 <= cast(20220321 as int))) AND upper(col13#682) IN (list#267 [])) <=> true)
      :  +- Project [col12#258]
      :     +- SubqueryAlias tableB
      :        +- View (`tableB`, [col12#258])
      :           +- Deduplicate [col12#258]
      :              +- Project [col12#258]
      :                 +- Filter ((date#239 >= 20220605) AND (date#239 <= 20220705))
      :                    +- Project [upper(col12)#252 AS col12#258, date#239]
      :                       +- Project [upper(col12#237) AS upper(col12)#252, date#239]
      :                          +- RelationV2[id#236, col12#237, ] spark_catalog.anotherdb.tableC
      +- RelationV2[partition_col#670, col13#682, _file#736, _pos#737L] spark_catalog.somedb.tableA

== Optimized Logical Plan ==

ReplaceData RelationV2[partition_col#670, col13#682, ] spark_catalog.somedb.tableA, IcebergWrite(table=spark_catalog.somedb.tableA, format=PARQUET)
+- Project [partition_col#670, col13#682, ]
   +- Sort [partition_col#670 ASC NULLS FIRST, _file#736 ASC NULLS FIRST, _pos#737L ASC NULLS FIRST], false
      +- RepartitionByExpression [_file#736], 960
         +- Project [partition_col#670, col13#682, _file#736, _pos#737L]
            +- Filter NOT ((((partition_col#670 >= 20220320) AND (partition_col#670 <= 20220321)) AND exists#967) <=> true)
               +- Join ExistenceJoin(exists#967), (upper(col13#682) = col12#258)
                  :- Filter dynamicpruning#1034 [_file#736]
                  :  :  +- Project [_file#1032]
                  :  :     +- Join LeftSemi, (upper(col13#980) = col12#258)
                  :  :        :- Project [col13#980, _file#1032]
                  :  :        :  +- Filter ((isnotnull(partition_col#968) AND (partition_col#968 >= 20220320)) AND (partition_col#968 <= 20220321))
                  :  :        :     +- RelationV2[partition_col#968,  col13#980, ] spark_catalog.somedb.tableA
                  :  :        +- InMemoryRelation [col12#258], StorageLevel(disk, memory, deserialized, 1 replicas)
                  :  :              +- *(2) HashAggregate(keys=[col12#258], functions=[], output=[col12#258])
                  :  :                 +- Exchange hashpartitioning(col12#258, 960), ENSURE_REQUIREMENTS, [id=#89]
                  :  :                    +- *(1) HashAggregate(keys=[col12#258], functions=[], output=[col12#258])
                  :  :                       +- *(1) Project [upper(col12#237) AS col12#258]
                  :  :                          +- *(1) Filter ((isnotnull(date#239) AND (date#239 >= 20220605)) AND (date#239 <= 20220705))
                  :  :                             +- BatchScan[col12#237, date#239] spark_catalog.anotherdb.tableC [filters=date IS NOT NULL, date >= '20220605', date <= '20220705'] RuntimeFilters: []
                  :  +- RelationV2[partition_col#670, col13#682, _file#736, _pos#737L] spark_catalog.somedb.tableA
                  +- InMemoryRelation [col12#258], StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- *(2) HashAggregate(keys=[col12#258], functions=[], output=[col12#258])
                           +- Exchange hashpartitioning(col12#258, 960), ENSURE_REQUIREMENTS, [id=#89]
                              +- *(1) HashAggregate(keys=[col12#258], functions=[], output=[col12#258])
                                 +- *(1) Project [upper(col12#237) AS col12#258]
                                    +- *(1) Filter ((isnotnull(date#239) AND (date#239 >= 20220605)) AND (date#239 <= 20220705))
                                       +- BatchScan[col12#237, date#239] spark_catalog.anotherdb.tableC [filters=date IS NOT NULL, date >= '20220605', date <= '20220705'] RuntimeFilters: []

== Physical Plan ==

ReplaceData IcebergWrite(table=spark_catalog.somedb.tableA, format=PARQUET)
+- *(2) Project [partition_col#670, col13#682, ]
   +- *(2) Sort [partition_col#670 ASC NULLS FIRST, _file#736 ASC NULLS FIRST, _pos#737L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(_file#736, 960), REPARTITION_BY_NUM, [id=#469]
         +- *(1) Project [partition_col#670, col13#682, _file#736, _pos#737L]
            +- *(1) Filter NOT ((((partition_col#670 >= 20220320) AND (partition_col#670 <= 20220321)) AND exists#967) <=> true)
               +- *(1) BroadcastHashJoin [upper(col13#682)], [col12#258], ExistenceJoin(exists#967), BuildRight, false
                  :- *(1) Project [partition_col#670, col13#682, _file#736, _pos#737L]
                  :  +- BatchScan[partition_col#670, col13#682, _file#736, _pos#737L] spark_catalog.somedb.tableA [filters=partition_col >= 20220320, partition_col <= 20220321] RuntimeFilters: [dynamicpruningexpression(_file#736 IN subquery#1051)]
                  :        +- Subquery subquery#1051, [id=#408]
                  :           +- *(2) HashAggregate(keys=[_file#1032#1050], functions=[], output=[_file#1032#1050])
                  :              +- Exchange hashpartitioning(_file#1032#1050, 960), ENSURE_REQUIREMENTS, [id=#404]
                  :                 +- *(1) HashAggregate(keys=[_file#1032 AS _file#1032#1050], functions=[], output=[_file#1032#1050])
                  :                    +- *(1) Project [_file#1032]
                  :                       +- *(1) BroadcastHashJoin [upper(col13#980)], [col12#258], LeftSemi, BuildRight, false
                  :                          :- *(1) Project [col13#980, _file#1032]
                  :                          :  +- *(1) Filter ((isnotnull(partition_col#968) AND (partition_col#968 >= 20220320)) AND (partition_col#968 <= 20220321))
                  :                          :     +- BatchScan[partition_col#968,  col13#980, ] spark_catalog.somedb.tableA [filters=partition_col >= 20220320, partition_col <= 20220321] RuntimeFilters: []
                  :                          +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#389]
                  :                             +- InMemoryTableScan [col12#258]
                  :                                   +- InMemoryRelation [col12#258], StorageLevel(disk, memory, deserialized, 1 replicas)
                  :                                         +- *(2) HashAggregate(keys=[col12#258], functions=[], output=[col12#258])
                  :                                            +- Exchange hashpartitioning(col12#258, 960), ENSURE_REQUIREMENTS, [id=#89]
                  :                                               +- *(1) HashAggregate(keys=[col12#258], functions=[], output=[col12#258])
                  :                                                  +- *(1) Project [upper(col12#237) AS col12#258]
                  :                                                     +- *(1) Filter ((isnotnull(date#239) AND (date#239 >= 20220605)) AND (date#239 <= 20220705))
                  :                                                        +- BatchScan[col12#237, date#239] spark_catalog.anotherdb.tableC [filters=date IS NOT NULL, date >= '20220605', date <= '20220705'] RuntimeFilters: []
                  +- ReusedExchange [col12#258], BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#389]

-- edit for one more round of redaction

aokolnychyi commented 2 years ago

I see dynamic filtering working correctly:

RuntimeFilters: [dynamicpruningexpression(_file#736 IN subquery#1051)]

The only non-broadcast shuffle happens later to cluster records by _file. You can disable it by setting write.delete.distribution-mode to none as you are using broadcast joins otherwise.

dramaticlly commented 2 years ago

Thank you @aokolnychyi for all of your help, after set write.delete.distribution-mode to none instead of default hash, our job on spark3.2 /iceberg 13 work as expected, really appreciate your insights!