spark-redshift-community / spark-redshift

Performant Redshift data source for Apache Spark
Apache License 2.0
135 stars 62 forks source link

spark-redshfit 5.0.3 rewrite user query into multiple sub queries that caused very low performance #96

Open jackwang2 opened 2 years ago

jackwang2 commented 2 years ago

Here is one of sub queries:

21/11/09 06:38:23 INFO RedshiftRelation: SELECT "delivery_ingested_at", "timestamp_at_delivery", "sdk_user_agent", "pub_account_id", "pub_app_id", "placement_id", "platform", "platform_version", "placement_type", "placement_size", "placement_is_flat_cpm_enabled", "placement_flat_cpm", "placement_cpm_floor", "is_header_bidding", "ad_type", "ad_size", "supply_name", "hbp_ordinal_view", "flat_cpm_model_type", "do_not_track", "settlement_price", "rev_share", "hosting_cost", "publisher_payout_type", "device_id_source", "country", "incentivized" FROM (
              SELECT delivery_ingested_at,
                     impression_ingested_at,
                     hbp_win_ingested_at,
                     s2s_win_ingested_at,

                     timestamp_at_delivery,
                     timestamp_at_impression,
                     timestamp_at_hbp_win,
                     timestamp_at_s2s_win,

                     winner_account_id                                                            AS dsp_account_id,
                     winner_id                                                                    AS rtb_connection_id,
                     winning_seat                                                                 AS seat_id,

                     sdk_user_agent,
                     pub_account_id,
                     pub_app_object_id                                                            AS pub_app_id,
                     placement_id,
                     deal_id,

                     dev_platform                                                                 AS platform,
                     dev_platform_version                                                         AS platform_version,
                     placement_type,
                     placement_size,
                     placement_is_flat_cpm_enabled,
                     placement_flat_cpm,
                     placement_cpm_floor,
                     is_header_bidding,

                     ad_type,
                     ad_size,
                     supply_name,

                     ordinal_view_at_hbp_win                                                      AS hbp_ordinal_view,
                     flat_cpm_model_type,

                     dev_do_not_track                                                             AS do_not_track,

                     settlement_price,
                     rev_share,
                     hosting_cost,
                     publisher_payout_type,

                     viewed,
                     completed_view,
                     ad_clicked,

                     settlement_price_at_hbp_win,

                     geoip_country_code_at_delivery,
                     geoip_country_code_at_impression,

                     incentivized_at_delivery,
                     incentivized_at_impression,

                     tpat_ingested_at,
                     start_at_tpat,

                     coalesce(dev_id_source_at_delivery, dev_id_source_at_impression)             AS device_id_source,
                     coalesce(geoip_country_code_at_delivery, geoip_country_code_at_impression)   AS country,
                     coalesce(incentivized_at_delivery, incentivized_at_impression)               AS incentivized,
                     CASE
                     WHEN pub_app_object_id IN (
                          SELECT pub_app_id FROM tpat_publisher
                       ) THEN TRUE
                     WHEN LOWER(REGEXP_SUBSTR(sdk_user_agent, \'(?<=;)[-0-9a-zA-Z]+(?=/)\', 1, 1, \'ip\')) IN (
                         SELECT mediator FROM tpat_mediator
                       ) THEN TRUE
                     ELSE FALSE
                   END AS tpat_flag

                FROM edsp_transactions

               WHERE NOT is_test
                 AND timestamp_at_delivery IS NOT NULL
                 AND (
                         (   delivery_ingested_at >= \'2021-11-09T04:00:00.000Z\'
                             AND delivery_ingested_at <  \'2021-11-09T05:00:00.000Z\'
                         ) OR (
                                 timestamp_at_impression IS NOT NULL
                             AND GREATEST(impression_ingested_at, delivery_ingested_at) >= \'2021-11-09T04:00:00.000Z\'
                             AND GREATEST(impression_ingested_at, delivery_ingested_at) <  \'2021-11-09T05:00:00.000Z\'
                             AND COALESCE(is_test_at_impression, FALSE) = FALSE
                         ) OR (
                                 timestamp_at_s2s_win  IS NOT NULL
                             AND GREATEST(s2s_win_ingested_at, delivery_ingested_at) >= \'2021-11-09T04:00:00.000Z\'
                             AND GREATEST(s2s_win_ingested_at, delivery_ingested_at) <  \'2021-11-09T05:00:00.000Z\'
                             AND COALESCE(is_test_at_s2s_win, FALSE) = FALSE
                         ) OR (
                                 tpat_ingested_at IS NOT NULL
                             AND hbp_win_ingested_at IS NOT NULL
                             AND GREATEST(tpat_ingested_at, hbp_win_ingested_at, delivery_ingested_at) >= \'2021-11-09T04:00:00.000Z\'
                             AND GREATEST(tpat_ingested_at, hbp_win_ingested_at, delivery_ingested_at) <  \'2021-11-09T05:00:00.000Z\'
                             AND COALESCE(is_test_at_tpat, FALSE) = FALSE
                         ) OR (
                                 timestamp_at_hbp_win     IS NOT NULL
                             AND timestamp_at_impression  IS NOT NULL
                             AND GREATEST(hbp_win_ingested_at, impression_ingested_at, delivery_ingested_at) >= \'2021-11-09T04:00:00.000Z\'
                             AND GREATEST(hbp_win_ingested_at, impression_ingested_at, delivery_ingested_at) <  \'2021-11-09T05:00:00.000Z\'
                             AND COALESCE(is_test_at_impression, FALSE) = FALSE
                         ) OR (
                                 tpat_ingested_at       IS NOT NULL
                             AND GREATEST(tpat_ingested_at, delivery_ingested_at) >= \'2021-11-09T04:00:00.000Z\'
                             AND GREATEST(tpat_ingested_at, delivery_ingested_at) <  \'2021-11-09T05:00:00.000Z\'
                             AND COALESCE(is_test_at_tpat, FALSE) = FALSE
                         )
                     )
        ) WHERE "delivery_ingested_at" IS NOT NULL AND "timestamp_at_delivery" IS NOT NULL AND "delivery_ingested_at" >= \'2021-11-09 04:00:00.0\' AND "delivery_ingested_at" < \'2021-11-09 05:00:00.0\'

but our original query is:

              SELECT delivery_ingested_at,
                     impression_ingested_at,
                     hbp_win_ingested_at,
                     s2s_win_ingested_at,

                     timestamp_at_delivery,
                     timestamp_at_impression,
                     timestamp_at_hbp_win,
                     timestamp_at_s2s_win,

                     winner_account_id                                                            AS dsp_account_id,
                     winner_id                                                                    AS rtb_connection_id,
                     winning_seat                                                                 AS seat_id,

                     sdk_user_agent,
                     pub_account_id,
                     pub_app_object_id                                                            AS pub_app_id,
                     placement_id,
                     deal_id,

                     dev_platform                                                                 AS platform,
                     dev_platform_version                                                         AS platform_version,
                     placement_type,
                     placement_size,
                     placement_is_flat_cpm_enabled,
                     placement_flat_cpm,
                     placement_cpm_floor,
                     is_header_bidding,

                     ad_type,
                     ad_size,
                     supply_name,

                     ordinal_view_at_hbp_win                                                      AS hbp_ordinal_view,
                     flat_cpm_model_type,

                     dev_do_not_track                                                             AS do_not_track,

                     settlement_price,
                     rev_share,
                     hosting_cost,
                     publisher_payout_type,

                     viewed,
                     completed_view,
                     ad_clicked,

                     settlement_price_at_hbp_win,

                     geoip_country_code_at_delivery,
                     geoip_country_code_at_impression,

                     incentivized_at_delivery,
                     incentivized_at_impression,

                     tpat_ingested_at,
                     start_at_tpat,

                     coalesce(dev_id_source_at_delivery, dev_id_source_at_impression)             AS device_id_source,
                     coalesce(geoip_country_code_at_delivery, geoip_country_code_at_impression)   AS country,
                     coalesce(incentivized_at_delivery, incentivized_at_impression)               AS incentivized,
                     CASE
                     WHEN pub_app_object_id IN (
                          SELECT pub_app_id FROM tpat_publisher
                       ) THEN TRUE
                     WHEN LOWER(REGEXP_SUBSTR(sdk_user_agent, '(?<=;)[-0-9a-zA-Z]+(?=/)', 1, 1, 'ip')) IN (
                         SELECT mediator FROM tpat_mediator
                       ) THEN TRUE
                     ELSE FALSE
                   END AS tpat_flag

                FROM edsp_transactions

               WHERE NOT is_test
                 AND timestamp_at_delivery IS NOT NULL
                 AND (
                         (   delivery_ingested_at >= '2021-11-09T04:00:00.000Z'
                             AND delivery_ingested_at <  '2021-11-09T05:00:00.000Z'
                         ) OR (
                                 timestamp_at_impression IS NOT NULL
                             AND GREATEST(impression_ingested_at, delivery_ingested_at) >= '2021-11-09T04:00:00.000Z'
                             AND GREATEST(impression_ingested_at, delivery_ingested_at) <  '2021-11-09T05:00:00.000Z'
                             AND COALESCE(is_test_at_impression, FALSE) = FALSE
                         ) OR (
                                 timestamp_at_s2s_win  IS NOT NULL
                             AND GREATEST(s2s_win_ingested_at, delivery_ingested_at) >= '2021-11-09T04:00:00.000Z'
                             AND GREATEST(s2s_win_ingested_at, delivery_ingested_at) <  '2021-11-09T05:00:00.000Z'
                             AND COALESCE(is_test_at_s2s_win, FALSE) = FALSE
                         ) OR (
                                 tpat_ingested_at IS NOT NULL
                             AND hbp_win_ingested_at IS NOT NULL
                             AND GREATEST(tpat_ingested_at, hbp_win_ingested_at, delivery_ingested_at) >= '2021-11-09T04:00:00.000Z'
                             AND GREATEST(tpat_ingested_at, hbp_win_ingested_at, delivery_ingested_at) <  '2021-11-09T05:00:00.000Z'
                             AND COALESCE(is_test_at_tpat, FALSE) = FALSE
                         ) OR (
                                 timestamp_at_hbp_win     IS NOT NULL
                             AND timestamp_at_impression  IS NOT NULL
                             AND GREATEST(hbp_win_ingested_at, impression_ingested_at, delivery_ingested_at) >= '2021-11-09T04:00:00.000Z'
                             AND GREATEST(hbp_win_ingested_at, impression_ingested_at, delivery_ingested_at) <  '2021-11-09T05:00:00.000Z'
                             AND COALESCE(is_test_at_impression, FALSE) = FALSE
                         ) OR (
                                 tpat_ingested_at       IS NOT NULL
                             AND GREATEST(tpat_ingested_at, delivery_ingested_at) >= '2021-11-09T04:00:00.000Z'
                             AND GREATEST(tpat_ingested_at, delivery_ingested_at) <  '2021-11-09T05:00:00.000Z'
                             AND COALESCE(is_test_at_tpat, FALSE) = FALSE
                         )
                     )

Could you let us know how to disable such feature please? thanks.

jsleight commented 2 years ago

Which part is being re-written? It does seem to be wrapping your query into a select <cols> from <your_query> but I don't think that will really impact redshift performance? (though I'm by no means a comprehensive redshift expert so could be wrong on this part)

As a reference, here is the source code for building the redshift unload statement https://github.com/spark-redshift-community/spark-redshift/blob/master/src/main/scala/io/github/spark_redshift_community/spark/redshift/RedshiftRelation.scala#L173