apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.13k stars 2.13k forks source link

performance degradation after migrating to spark 3.3.1 when using iceberg merge into #7998

Open zohar-plutoflume opened 1 year ago

zohar-plutoflume commented 1 year ago

Query engine

spark 3.3.1 iceberg 1.1 (emr 6.10)

Question

Hi, wanted to point to something that was introduced in spark 3.3, https://issues.apache.org/jira/browse/SPARK-38148 , the issue with this is that when we are using a merge command and in case we have a static partition as part of the command, this translates to a join query which does not utilise dynamic filtering and caused our jobs to run much slower. for example this merge into command: f""" MERGE INTO {output_table.catalog_table_ref} {TARGET} USING {TMP_VIEW} {SOURCE} ON {join_str} AND {TARGET}.triggered is true WHEN MATCHED THEN UPDATE SET {update_col_string} """ if we were to use spark joins directly we could just filter on the target table , but as we use the merge api we need to provide the iceberg target so we have to add the static partition condition as part of the on.

one option is putting it in WHEN MATCHED AND {TARGET}.triggered is true , but I think it will not push down the triggered = true to the target table.

any suggestion of how to make the merge use the dynamic filtering like before the spark 3.3.1 upgrade would be very appreciated.

I'm thinking if its a spark issue, where they need to introduce a conf which will decide if to drop the dynamic partitioning or not, or if its an iceberg issue cause its the merge into api.

after the change we can see the merge creates a join as follows:

image

before the change, the join was in two steps where the first step it tried to find the ids needed for the join only and after that another join based on those ids. the difference in performance were due to the fact that before the upgrade , the first join projected only the needed columns for finding out the id which resulted in much smaller shuffle data overall.

image image

and here is the physical plan of after the upgrade:

== Physical Plan ==
ReplaceData IcebergWrite(table=iceberg.iceberg_db.check_logs_merged, format=PARQUET)
+- Sort [tenant_id#342L ASC NULLS FIRST, sent_dt#343 ASC NULLS FIRST, classifier_type#374 ASC NULLS FIRST, triggered#344 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(tenant_id#342L, sent_dt#343, classifier_type#374, triggered#344, 4000), REPARTITION_BY_NUM, [plan_id=118]
      +- Project [id#338L, timestamp#339, synced_user_id#340L, email_details#341, tenant_id#342L, sent_dt#343, triggered#344, filter_result#345, feedback_timestamp#346, feedback_check_log_id#347L, feedback_filter_id#348L, feedback_policy_id#349, feedback_module#350, user_response#351, justification#352, allow_list_requests#353, feedback_tenant_id#354L, priority_timestamp#355, priority_check_log_id#356L, priority_filter_id#357L, priority_policy_id#358, priority_module#359, breach_prevented#360, score#361, ... 13 more fields]
         +- MergeRowsExec[id#338L, timestamp#339, synced_user_id#340L, email_details#341, tenant_id#342L, sent_dt#343, triggered#344, filter_result#345, feedback_timestamp#346, feedback_check_log_id#347L, feedback_filter_id#348L, feedback_policy_id#349, feedback_module#350, user_response#351, justification#352, allow_list_requests#353, feedback_tenant_id#354L, priority_timestamp#355, priority_check_log_id#356L, priority_filter_id#357L, priority_policy_id#358, priority_module#359, breach_prevented#360, score#361, ... 14 more fields]
            +- Sort [__row_id#336L ASC NULLS FIRST], false, 0
               +- SortMergeJoin [coalesce(triggered#299, false), isnull(triggered#299), tenant_id#297L, id#293L, filter_id#326L], [coalesce(true, false), isnull(true), tenant_id#91L, id#119L, filter_id#81L], LeftOuter, ((sent_dt#298 >= 2023-01-06) AND (sent_dt#298 <= 2023-07-05))
                  :- Sort [coalesce(triggered#299, false) ASC NULLS FIRST, isnull(triggered#299) ASC NULLS FIRST, tenant_id#297L ASC NULLS FIRST, id#293L ASC NULLS FIRST, filter_id#326L ASC NULLS FIRST], false, 0
                  :  +- Exchange hashpartitioning(coalesce(triggered#299, false), isnull(triggered#299), tenant_id#297L, id#293L, filter_id#326L, 4000), ENSURE_REQUIREMENTS, [plan_id=83]
                  :     +- Project [id#293L, timestamp#294, synced_user_id#295L, email_details#296, tenant_id#297L, sent_dt#298, triggered#299, filter_result#300, feedback_timestamp#301, feedback_check_log_id#302L, feedback_filter_id#303L, feedback_policy_id#304, feedback_module#305, user_response#306, justification#307, allow_list_requests#308, feedback_tenant_id#309L, priority_timestamp#310, priority_check_log_id#311L, priority_filter_id#312L, priority_policy_id#313, priority_module#314, breach_prevented#315, score#316, ... 16 more fields]
                  :        +- Project [id#293L, timestamp#294, synced_user_id#295L, email_details#296, tenant_id#297L, sent_dt#298, triggered#299, filter_result#300, feedback_timestamp#301, feedback_check_log_id#302L, feedback_filter_id#303L, feedback_policy_id#304, feedback_module#305, user_response#306, justification#307, allow_list_requests#308, feedback_tenant_id#309L, priority_timestamp#310, priority_check_log_id#311L, priority_filter_id#312L, priority_policy_id#313, priority_module#314, breach_prevented#315, score#316, ... 14 more fields]
                  :           +- BatchScan[id#293L, timestamp#294, synced_user_id#295L, email_details#296, tenant_id#297L, sent_dt#298, triggered#299, filter_result#300, feedback_timestamp#301, feedback_check_log_id#302L, feedback_filter_id#303L, feedback_policy_id#304, feedback_module#305, user_response#306, justification#307, allow_list_requests#308, feedback_tenant_id#309L, priority_timestamp#310, priority_check_log_id#311L, priority_filter_id#312L, priority_policy_id#313, priority_module#314, breach_prevented#315, score#316, ... 14 more fields] iceberg.iceberg_db.check_logs_merged [filters=triggered = true, sent_dt >= 19363, sent_dt <= 19543] RuntimeFilters: []
                  +- *(4) Sort [coalesce(true, false) ASC NULLS FIRST, isnull(true) ASC NULLS FIRST, tenant_id#91L ASC NULLS FIRST, id#119L ASC NULLS FIRST, filter_id#81L ASC NULLS FIRST], false, 0
                     +- Exchange hashpartitioning(coalesce(true, false), isnull(true), tenant_id#91L, id#119L, filter_id#81L, 4000), ENSURE_REQUIREMENTS, [plan_id=110]
                        +- *(3) Project [headers#74, kafka_timestamp#75L, offset#76L, partition#77, topic#78, timestamp#79, check_log_id#80L, filter_id#81L, policy_id#82, module#83, breach_prevented#84, score#85, email_diff#86, breach_categories#87, action_after_no#88, subsequent_check_id#89L, expired#90, tenant_id#91L, corrupt_record#92, dt#93, id#119L, true AS __row_from_source#337]
                           +- *(3) Filter ((((isnotnull(kafka_timestamp#75L) AND (rank#166 = 1)) AND (kafka_timestamp#75L >= 1688515200)) AND (kafka_timestamp#75L <= 1688601600)) AND isnull(corrupt_record#92))
                              +- Window [rank(kafka_timestamp#75L, tiebreak#141L) windowspecdefinition(tenant_id#91L, id#119L, filter_id#81L, kafka_timestamp#75L DESC NULLS LAST, tiebreak#141L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#166], [tenant_id#91L, id#119L, filter_id#81L], [kafka_timestamp#75L DESC NULLS LAST, tiebreak#141L ASC NULLS FIRST]
                                 +- *(2) Sort [tenant_id#91L ASC NULLS FIRST, id#119L ASC NULLS FIRST, filter_id#81L ASC NULLS FIRST, kafka_timestamp#75L DESC NULLS LAST, tiebreak#141L ASC NULLS FIRST], false, 0
                                    +- Exchange hashpartitioning(tenant_id#91L, id#119L, filter_id#81L, 4000), ENSURE_REQUIREMENTS, [plan_id=101]
                                       +- *(1) Filter ((isnotnull(tenant_id#91L) AND isnotnull(id#119L)) AND isnotnull(filter_id#81L))
                                          +- *(1) Project [headers#74, kafka_timestamp#75L, offset#76L, partition#77, topic#78, timestamp#79, check_log_id#80L, filter_id#81L, policy_id#82, module#83, breach_prevented#84, score#85, email_diff#86, breach_categories#87, action_after_no#88, subsequent_check_id#89L, expired#90, tenant_id#91L, corrupt_record#92, dt#93, check_log_id#80L AS id#119L, monotonically_increasing_id() AS tiebreak#141L]
                                             +- *(1) Project [headers#74, kafka_timestamp#75L, offset#76L, partition#77, topic#78, timestamp#79, check_log_id#80L, filter_id#81L, policy_id#82, module#83, breach_prevented#84, score#85, email_diff#86, breach_categories#87, action_after_no#88, subsequent_check_id#89L, expired#90, tenant_id#91L, corrupt_record#92, dt#93]
                                                +- *(1) Filter isnull(policy_id#82)
                                                   +- BatchScan[headers#74, kafka_timestamp#75L, offset#76L, partition#77, topic#78, timestamp#79, check_log_id#80L, filter_id#81L, policy_id#82, module#83, breach_prevented#84, score#85, email_diff#86, breach_categories#87, action_after_no#88, subsequent_check_id#89L, expired#90, tenant_id#91L, corrupt_record#92, dt#93] iceberg.iceberg_db.email_check_outbound_priority [filters=policy_id IS NULL] RuntimeFilters: []

and before the upgrade:

== Physical Plan ==
ReplaceData
+- AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      Sort [tenant_id#340L ASC NULLS FIRST, sent_dt#341 ASC NULLS FIRST, classifier_type#372 ASC NULLS FIRST, triggered#342 ASC NULLS FIRST], false, 0
      +- ShuffleQueryStage 6
         +- Exchange hashpartitioning(tenant_id#340L, sent_dt#341, classifier_type#372, triggered#342, 1000), REPARTITION_BY_NUM, [id=#775]
            +- Project [id#336L, timestamp#337, synced_user_id#338L, email_details#339, tenant_id#340L, sent_dt#341, triggered#342, filter_result#343, feedback_timestamp#344, feedback_check_log_id#345L, feedback_filter_id#346L, feedback_policy_id#347, feedback_module#348, user_response#349, justification#350, allow_list_requests#351, feedback_tenant_id#352L, priority_timestamp#353, priority_check_log_id#354L, priority_filter_id#355L, priority_policy_id#356, priority_module#357, breach_prevented#358, score#359, ... 13 more fields]
               +- MergeRowsExec[id#336L, timestamp#337, synced_user_id#338L, email_details#339, tenant_id#340L, sent_dt#341, triggered#342, filter_result#343, feedback_timestamp#344, feedback_check_log_id#345L, feedback_filter_id#346L, feedback_policy_id#347, feedback_module#348, user_response#349, justification#350, allow_list_requests#351, feedback_tenant_id#352L, priority_timestamp#353, priority_check_log_id#354L, priority_filter_id#355L, priority_policy_id#356, priority_module#357, breach_prevented#358, score#359, ... 14 more fields]
                  +- Sort [__row_id#334L ASC NULLS FIRST], false, 0
                     +- ShuffledHashJoin [coalesce(triggered#298, false), isnull(triggered#298), tenant_id#296L, id#292L, filter_id#325L], [coalesce(true, false), isnull(true), tenant_id#391L, id#118L, filter_id#381L], LeftOuter, BuildRight, ((sent_dt#297 >= 2022-12-14) AND (sent_dt#297 <= 2023-06-12))
                        :- AQEShuffleRead coalesced
                        :  +- ShuffleQueryStage 0
                        :     +- Exchange hashpartitioning(coalesce(triggered#298, false), isnull(triggered#298), tenant_id#296L, id#292L, filter_id#325L, 1000), ENSURE_REQUIREMENTS, [id=#335]
                        :        +- Project [id#292L, timestamp#293, synced_user_id#294L, email_details#295, tenant_id#296L, sent_dt#297, triggered#298, filter_result#299, feedback_timestamp#300, feedback_check_log_id#301L, feedback_filter_id#302L, feedback_policy_id#303, feedback_module#304, user_response#305, justification#306, allow_list_requests#307, feedback_tenant_id#308L, priority_timestamp#309, priority_check_log_id#310L, priority_filter_id#311L, priority_policy_id#312, priority_module#313, breach_prevented#314, score#315, ... 16 more fields]
                        :           +- Project [id#292L, timestamp#293, synced_user_id#294L, email_details#295, tenant_id#296L, sent_dt#297, triggered#298, filter_result#299, feedback_timestamp#300, feedback_check_log_id#301L, feedback_filter_id#302L, feedback_policy_id#303, feedback_module#304, user_response#305, justification#306, allow_list_requests#307, feedback_tenant_id#308L, priority_timestamp#309, priority_check_log_id#310L, priority_filter_id#311L, priority_policy_id#312, priority_module#313, breach_prevented#314, score#315, ... 14 more fields]
                        :              +- BatchScan[id#292L, timestamp#293, synced_user_id#294L, email_details#295, tenant_id#296L, sent_dt#297, triggered#298, filter_result#299, feedback_timestamp#300, feedback_check_log_id#301L, feedback_filter_id#302L, feedback_policy_id#303, feedback_module#304, user_response#305, justification#306, allow_list_requests#307, feedback_tenant_id#308L, priority_timestamp#309, priority_check_log_id#310L, priority_filter_id#311L, priority_policy_id#312, priority_module#313, breach_prevented#314, score#315, ... 14 more fields] iceberg.iceberg_db.check_logs_merged [filters=triggered = true, sent_dt >= 19340, sent_dt <= 19520] RuntimeFilters: [dynamicpruningexpression(_file#331 IN dynamicpruning#520)]
                        :                    +- Subquery dynamicpruning#520, [id=#294]
                        :                       +- AdaptiveSparkPlan isFinalPlan=true
                        :                          +- *(9) HashAggregate(keys=[_file#519#571], functions=[], output=[_file#519#571])
                        :                             +- AQEShuffleRead coalesced
                        :                                +- ShuffleQueryStage 4
                        :                                   +- Exchange hashpartitioning(_file#519#571, 1000), ENSURE_REQUIREMENTS, [id=#531]
                        :                                      +- *(6) HashAggregate(keys=[_file#519 AS _file#519#571], functions=[], output=[_file#519#571])
                        :                                         +- *(6) Project [_file#519]
                        :                                            +- *(6) SortMergeJoin [tenant_id#486L, id#482L, filter_id#515L], [tenant_id#91L, id#118L, filter_id#81L], LeftSemi
                        :                                               :- *(3) Sort [tenant_id#486L ASC NULLS FIRST, id#482L ASC NULLS FIRST, filter_id#515L ASC NULLS FIRST], false, 0
                        :                                               :  +- ShuffleQueryStage 2
                        :                                               :     +- Exchange hashpartitioning(tenant_id#486L, id#482L, filter_id#515L, 1000), ENSURE_REQUIREMENTS, [id=#285]
                        :                                               :        +- Project [id#482L, tenant_id#486L, filter_id#515L, _file#519]
                        :                                               :           +- Filter ((((((isnotnull(sent_dt#487) AND (triggered#488 <=> true)) AND (sent_dt#487 >= 2022-12-14)) AND (sent_dt#487 <= 2023-06-12)) AND isnotnull(tenant_id#486L)) AND isnotnull(id#482L)) AND isnotnull(filter_id#515L))
                        :                                               :              +- BatchScan[id#482L, timestamp#483, synced_user_id#484L, email_details#485, tenant_id#486L, sent_dt#487, triggered#488, filter_result#489, feedback_timestamp#490, feedback_check_log_id#491L, feedback_filter_id#492L, feedback_policy_id#493, feedback_module#494, user_response#495, justification#496, allow_list_requests#497, feedback_tenant_id#498L, priority_timestamp#499, priority_check_log_id#500L, priority_filter_id#501L, priority_policy_id#502, priority_module#503, breach_prevented#504, score#505, ... 14 more fields] iceberg.iceberg_db.check_logs_merged [filters=triggered = true, sent_dt >= 19340, sent_dt <= 19520] RuntimeFilters: []
                        :                                               +- *(5) Project [filter_id#81L, tenant_id#91L, id#118L]
                        :                                                  +- *(5) Filter ((((isnotnull(kafka_timestamp#75L) AND (rank#165 = 1)) AND (kafka_timestamp#75L >= 1686528000)) AND (kafka_timestamp#75L <= 1686614400)) AND isnull(corrupt_record#92))
                        :                                                     +- Window [rank(kafka_timestamp#75L, tiebreak#140L) windowspecdefinition(tenant_id#91L, id#118L, filter_id#81L, kafka_timestamp#75L DESC NULLS LAST, tiebreak#140L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#165], [tenant_id#91L, id#118L, filter_id#81L], [kafka_timestamp#75L DESC NULLS LAST, tiebreak#140L ASC NULLS FIRST]
                        :                                                        +- *(4) Sort [tenant_id#91L ASC NULLS FIRST, id#118L ASC NULLS FIRST, filter_id#81L ASC NULLS FIRST, kafka_timestamp#75L DESC NULLS LAST, tiebreak#140L ASC NULLS FIRST], false, 0
                        :                                                           +- ShuffleQueryStage 3
                        :                                                              +- Exchange hashpartitioning(tenant_id#91L, id#118L, filter_id#81L, 1000), ENSURE_REQUIREMENTS, [id=#384]
                        :                                                                 +- *(2) Filter ((isnotnull(tenant_id#91L) AND isnotnull(id#118L)) AND isnotnull(filter_id#81L))
                        :                                                                    +- *(2) Project [kafka_timestamp#75L, filter_id#81L, tenant_id#91L, corrupt_record#92, check_log_id#80L AS id#118L, monotonically_increasing_id() AS tiebreak#140L]
                        :                                                                       +- *(2) Project [kafka_timestamp#75L, check_log_id#80L, filter_id#81L, policy_id#82, tenant_id#91L, corrupt_record#92]
                        :                                                                          +- *(2) Filter isnull(policy_id#82)
                        :                                                                             +- BatchScan[kafka_timestamp#75L, check_log_id#80L, filter_id#81L, policy_id#82, tenant_id#91L, corrupt_record#92] iceberg.iceberg_db.email_check_outbound_priority [filters=policy_id IS NULL] RuntimeFilters: []
                        +- AQEShuffleRead coalesced
                           +- ShuffleQueryStage 5
                              +- Exchange hashpartitioning(coalesce(true, false), isnull(true), tenant_id#391L, id#118L, filter_id#381L, 1000), ENSURE_REQUIREMENTS, [id=#657]
                                 +- *(8) Project [headers#374, kafka_timestamp#375L, offset#376L, partition#377, topic#378, timestamp#379, check_log_id#380L, filter_id#381L, policy_id#382, module#383, breach_prevented#384, score#385, email_diff#386, breach_categories#387, action_after_no#388, subsequent_check_id#389L, expired#390, tenant_id#391L, corrupt_record#392, dt#393, id#118L, true AS __row_from_source#335]
                                    +- *(8) Filter ((((isnotnull(kafka_timestamp#375L) AND (rank#165 = 1)) AND (kafka_timestamp#375L >= 1686528000)) AND (kafka_timestamp#375L <= 1686614400)) AND isnull(corrupt_record#392))
                                       +- Window [rank(kafka_timestamp#375L, tiebreak#140L) windowspecdefinition(tenant_id#391L, id#118L, filter_id#381L, kafka_timestamp#375L DESC NULLS LAST, tiebreak#140L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#165], [tenant_id#391L, id#118L, filter_id#381L], [kafka_timestamp#375L DESC NULLS LAST, tiebreak#140L ASC NULLS FIRST]
                                          +- *(7) Sort [tenant_id#391L ASC NULLS FIRST, id#118L ASC NULLS FIRST, filter_id#381L ASC NULLS FIRST, kafka_timestamp#375L DESC NULLS LAST, tiebreak#140L ASC NULLS FIRST], false, 0
                                             +- AQEShuffleRead coalesced
                                                +- ShuffleQueryStage 1
                                                   +- Exchange hashpartitioning(tenant_id#391L, id#118L, filter_id#381L, 1000), ENSURE_REQUIREMENTS, [id=#351]
                                                      +- *(1) Filter ((isnotnull(tenant_id#391L) AND isnotnull(id#118L)) AND isnotnull(filter_id#381L))
                                                         +- *(1) Project [headers#374, kafka_timestamp#375L, offset#376L, partition#377, topic#378, timestamp#379, check_log_id#380L, filter_id#381L, policy_id#382, module#383, breach_prevented#384, score#385, email_diff#386, breach_categories#387, action_after_no#388, subsequent_check_id#389L, expired#390, tenant_id#391L, corrupt_record#392, dt#393, check_log_id#380L AS id#118L, monotonically_increasing_id() AS tiebreak#140L]
                                                            +- *(1) Project [headers#374, kafka_timestamp#375L, offset#376L, partition#377, topic#378, timestamp#379, check_log_id#380L, filter_id#381L, policy_id#382, module#383, breach_prevented#384, score#385, email_diff#386, breach_categories#387, action_after_no#388, subsequent_check_id#389L, expired#390, tenant_id#391L, corrupt_record#392, dt#393]
                                                               +- *(1) Filter isnull(policy_id#382)
                                                                  +- BatchScan[headers#374, kafka_timestamp#375L, offset#376L, partition#377, topic#378, timestamp#379, check_log_id#380L, filter_id#381L, policy_id#382, module#383, breach_prevented#384, score#385, email_diff#386, breach_categories#387, action_after_no#388, subsequent_check_id#389L, expired#390, tenant_id#391L, corrupt_record#392, dt#393] iceberg.iceberg_db.email_check_outbound_priority [filters=policy_id IS NULL] RuntimeFilters: []
   +- == Initial Plan ==
      Sort [tenant_id#340L ASC NULLS FIRST, sent_dt#341 ASC NULLS FIRST, classifier_type#372 ASC NULLS FIRST, triggered#342 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(tenant_id#340L, sent_dt#341, classifier_type#372, triggered#342, 1000), REPARTITION_BY_NUM, [id=#150]
         +- Project [id#336L, timestamp#337, synced_user_id#338L, email_details#339, tenant_id#340L, sent_dt#341, triggered#342, filter_result#343, feedback_timestamp#344, feedback_check_log_id#345L, feedback_filter_id#346L, feedback_policy_id#347, feedback_module#348, user_response#349, justification#350, allow_list_requests#351, feedback_tenant_id#352L, priority_timestamp#353, priority_check_log_id#354L, priority_filter_id#355L, priority_policy_id#356, priority_module#357, breach_prevented#358, score#359, ... 13 more fields]
            +- MergeRowsExec[id#336L, timestamp#337, synced_user_id#338L, email_details#339, tenant_id#340L, sent_dt#341, triggered#342, filter_result#343, feedback_timestamp#344, feedback_check_log_id#345L, feedback_filter_id#346L, feedback_policy_id#347, feedback_module#348, user_response#349, justification#350, allow_list_requests#351, feedback_tenant_id#352L, priority_timestamp#353, priority_check_log_id#354L, priority_filter_id#355L, priority_policy_id#356, priority_module#357, breach_prevented#358, score#359, ... 14 more fields]
               +- Sort [__row_id#334L ASC NULLS FIRST], false, 0
                  +- SortMergeJoin [coalesce(triggered#298, false), isnull(triggered#298), tenant_id#296L, id#292L, filter_id#325L], [coalesce(true, false), isnull(true), tenant_id#391L, id#118L, filter_id#381L], LeftOuter, ((sent_dt#297 >= 2022-12-14) AND (sent_dt#297 <= 2023-06-12))
                     :- Sort [coalesce(triggered#298, false) ASC NULLS FIRST, isnull(triggered#298) ASC NULLS FIRST, tenant_id#296L ASC NULLS FIRST, id#292L ASC NULLS FIRST, filter_id#325L ASC NULLS FIRST], false, 0
                     :  +- Exchange hashpartitioning(coalesce(triggered#298, false), isnull(triggered#298), tenant_id#296L, id#292L, filter_id#325L, 1000), ENSURE_REQUIREMENTS, [id=#141]
                     :     +- Project [id#292L, timestamp#293, synced_user_id#294L, email_details#295, tenant_id#296L, sent_dt#297, triggered#298, filter_result#299, feedback_timestamp#300, feedback_check_log_id#301L, feedback_filter_id#302L, feedback_policy_id#303, feedback_module#304, user_response#305, justification#306, allow_list_requests#307, feedback_tenant_id#308L, priority_timestamp#309, priority_check_log_id#310L, priority_filter_id#311L, priority_policy_id#312, priority_module#313, breach_prevented#314, score#315, ... 16 more fields]
                     :        +- Project [id#292L, timestamp#293, synced_user_id#294L, email_details#295, tenant_id#296L, sent_dt#297, triggered#298, filter_result#299, feedback_timestamp#300, feedback_check_log_id#301L, feedback_filter_id#302L, feedback_policy_id#303, feedback_module#304, user_response#305, justification#306, allow_list_requests#307, feedback_tenant_id#308L, priority_timestamp#309, priority_check_log_id#310L, priority_filter_id#311L, priority_policy_id#312, priority_module#313, breach_prevented#314, score#315, ... 14 more fields]
                     :           +- BatchScan[id#292L, timestamp#293, synced_user_id#294L, email_details#295, tenant_id#296L, sent_dt#297, triggered#298, filter_result#299, feedback_timestamp#300, feedback_check_log_id#301L, feedback_filter_id#302L, feedback_policy_id#303, feedback_module#304, user_response#305, justification#306, allow_list_requests#307, feedback_tenant_id#308L, priority_timestamp#309, priority_check_log_id#310L, priority_filter_id#311L, priority_policy_id#312, priority_module#313, breach_prevented#314, score#315, ... 14 more fields] iceberg.iceberg_db.check_logs_merged [filters=triggered = true, sent_dt >= 19340, sent_dt <= 19520] RuntimeFilters: [dynamicpruningexpression(_file#331 IN dynamicpruning#520)]
                     :                 +- SubqueryAdaptiveBroadcast dynamicpruning#520, 0, false, Project [_file#519], [_file#519]
                     :                    +- AdaptiveSparkPlan isFinalPlan=false
                     :                       +- Project [_file#519]
                     :                          +- SortMergeJoin [tenant_id#486L, id#482L, filter_id#515L], [tenant_id#91L, id#118L, filter_id#81L], LeftSemi
                     :                             :- Sort [tenant_id#486L ASC NULLS FIRST, id#482L ASC NULLS FIRST, filter_id#515L ASC NULLS FIRST], false, 0
                     :                             :  +- Exchange hashpartitioning(tenant_id#486L, id#482L, filter_id#515L, 1000), ENSURE_REQUIREMENTS, [id=#120]
                     :                             :     +- Project [id#482L, tenant_id#486L, filter_id#515L, _file#519]
                     :                             :        +- Filter ((((((isnotnull(sent_dt#487) AND (triggered#488 <=> true)) AND (sent_dt#487 >= 2022-12-14)) AND (sent_dt#487 <= 2023-06-12)) AND isnotnull(tenant_id#486L)) AND isnotnull(id#482L)) AND isnotnull(filter_id#515L))
                     :                             :           +- BatchScan[id#482L, timestamp#483, synced_user_id#484L, email_details#485, tenant_id#486L, sent_dt#487, triggered#488, filter_result#489, feedback_timestamp#490, feedback_check_log_id#491L, feedback_filter_id#492L, feedback_policy_id#493, feedback_module#494, user_response#495, justification#496, allow_list_requests#497, feedback_tenant_id#498L, priority_timestamp#499, priority_check_log_id#500L, priority_filter_id#501L, priority_policy_id#502, priority_module#503, breach_prevented#504, score#505, ... 14 more fields] iceberg.iceberg_db.check_logs_merged [filters=triggered = true, sent_dt >= 19340, sent_dt <= 19520] RuntimeFilters: []
                     :                             +- Project [filter_id#81L, tenant_id#91L, id#118L]
                     :                                +- Filter ((((isnotnull(kafka_timestamp#75L) AND (rank#165 = 1)) AND (kafka_timestamp#75L >= 1686528000)) AND (kafka_timestamp#75L <= 1686614400)) AND isnull(corrupt_record#92))
                     :                                   +- Window [rank(kafka_timestamp#75L, tiebreak#140L) windowspecdefinition(tenant_id#91L, id#118L, filter_id#81L, kafka_timestamp#75L DESC NULLS LAST, tiebreak#140L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#165], [tenant_id#91L, id#118L, filter_id#81L], [kafka_timestamp#75L DESC NULLS LAST, tiebreak#140L ASC NULLS FIRST]
                     :                                      +- Sort [tenant_id#91L ASC NULLS FIRST, id#118L ASC NULLS FIRST, filter_id#81L ASC NULLS FIRST, kafka_timestamp#75L DESC NULLS LAST, tiebreak#140L ASC NULLS FIRST], false, 0
                     :                                         +- Exchange hashpartitioning(tenant_id#91L, id#118L, filter_id#81L, 1000), ENSURE_REQUIREMENTS, [id=#113]
                     :                                            +- Filter ((isnotnull(tenant_id#91L) AND isnotnull(id#118L)) AND isnotnull(filter_id#81L))
                     :                                               +- Project [kafka_timestamp#75L, filter_id#81L, tenant_id#91L, corrupt_record#92, check_log_id#80L AS id#118L, monotonically_increasing_id() AS tiebreak#140L]
                     :                                                  +- Project [kafka_timestamp#75L, check_log_id#80L, filter_id#81L, policy_id#82, tenant_id#91L, corrupt_record#92]
                     :                                                     +- Filter isnull(policy_id#82)
                     :                                                        +- BatchScan[kafka_timestamp#75L, check_log_id#80L, filter_id#81L, policy_id#82, tenant_id#91L, corrupt_record#92] iceberg.iceberg_db.email_check_outbound_priority [filters=policy_id IS NULL] RuntimeFilters: []
                     +- Sort [coalesce(true, false) ASC NULLS FIRST, isnull(true) ASC NULLS FIRST, tenant_id#391L ASC NULLS FIRST, id#118L ASC NULLS FIRST, filter_id#381L ASC NULLS FIRST], false, 0
                        +- Exchange hashpartitioning(coalesce(true, false), isnull(true), tenant_id#391L, id#118L, filter_id#381L, 1000), ENSURE_REQUIREMENTS, [id=#142]
                           +- Project [headers#374, kafka_timestamp#375L, offset#376L, partition#377, topic#378, timestamp#379, check_log_id#380L, filter_id#381L, policy_id#382, module#383, breach_prevented#384, score#385, email_diff#386, breach_categories#387, action_after_no#388, subsequent_check_id#389L, expired#390, tenant_id#391L, corrupt_record#392, dt#393, id#118L, true AS __row_from_source#335]
                              +- Filter ((((isnotnull(kafka_timestamp#375L) AND (rank#165 = 1)) AND (kafka_timestamp#375L >= 1686528000)) AND (kafka_timestamp#375L <= 1686614400)) AND isnull(corrupt_record#392))
                                 +- Window [rank(kafka_timestamp#375L, tiebreak#140L) windowspecdefinition(tenant_id#391L, id#118L, filter_id#381L, kafka_timestamp#375L DESC NULLS LAST, tiebreak#140L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#165], [tenant_id#391L, id#118L, filter_id#381L], [kafka_timestamp#375L DESC NULLS LAST, tiebreak#140L ASC NULLS FIRST]
                                    +- Sort [tenant_id#391L ASC NULLS FIRST, id#118L ASC NULLS FIRST, filter_id#381L ASC NULLS FIRST, kafka_timestamp#375L DESC NULLS LAST, tiebreak#140L ASC NULLS FIRST], false, 0
                                       +- Exchange hashpartitioning(tenant_id#391L, id#118L, filter_id#381L, 1000), ENSURE_REQUIREMENTS, [id=#134]
                                          +- Filter ((isnotnull(tenant_id#391L) AND isnotnull(id#118L)) AND isnotnull(filter_id#381L))
                                             +- Project [headers#374, kafka_timestamp#375L, offset#376L, partition#377, topic#378, timestamp#379, check_log_id#380L, filter_id#381L, policy_id#382, module#383, breach_prevented#384, score#385, email_diff#386, breach_categories#387, action_after_no#388, subsequent_check_id#389L, expired#390, tenant_id#391L, corrupt_record#392, dt#393, check_log_id#380L AS id#118L, monotonically_increasing_id() AS tiebreak#140L]
                                                +- Project [headers#374, kafka_timestamp#375L, offset#376L, partition#377, topic#378, timestamp#379, check_log_id#380L, filter_id#381L, policy_id#382, module#383, breach_prevented#384, score#385, email_diff#386, breach_categories#387, action_after_no#388, subsequent_check_id#389L, expired#390, tenant_id#391L, corrupt_record#392, dt#393]
                                                   +- Filter isnull(policy_id#382)
                                                      +- BatchScan[headers#374, kafka_timestamp#375L, offset#376L, partition#377, topic#378, timestamp#379, check_log_id#380L, filter_id#381L, policy_id#382, module#383, breach_prevented#384, score#385, email_diff#386, breach_categories#387, action_after_no#388, subsequent_check_id#389L, expired#390, tenant_id#391L, corrupt_record#392, dt#393] iceberg.iceberg_db.email_check_outbound_priority [filters=policy_id IS NULL] RuntimeFilters: []
zohar-plutoflume commented 1 year ago

after looking into it a bit more I think iceberg is not using the spark dynamic filtering but does it on its own, using the RowLevelCommandDynamicPruning however it seems like it has a similar condition check: private def isCandidate(command: RowLevelCommand): Boolean = command.condition match { case Some(cond) if cond != Literal.TrueLiteral => true case _ => false }

and I wonder what would be a work around it

Grumi commented 8 months ago

Hello Zohar,

I am facing the same problem after I upgraded from spark 3.3.0 ->3.3.2 and iceber 1.1.0 -> 1.3.0. Have you been able to solve the problem?

Thank you, Marius

github-actions[bot] commented 2 days ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.