pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.04k stars 1.94k forks source link

Streaming pipeline runs out of memory #15771

Open dhruvyy opened 6 months ago

dhruvyy commented 6 months ago

Checks

Reproducible example

Not possible to provide.

Log output

No response

Issue description

I have a pipeline that is entirely streaming compliant and confirmed by running the .explain(streaming=True) method. Large datasets are run against this pipeline and the process is simply killed due to OOM issues, which by definition should not happen.

Data points

Questions

  1. Is there a native way to profile how much memory a specific polars function is consuming?

Expected behavior

The pipeline should run end to end (albeit slowly) and not run into OOM issues, especially since the streaming plan confirms everything is streaming compliant.

--- STREAMING
UNION
  PLAN 0:
     SELECT [col("amount"), col("base_amount"), col("beneficiary"), col("beneficiary_address_country"), col("beneficiary_bank"), col("beneficiary_correspondents"), col("beneficiary_residence_country"), col("canonical_datetime"), col("crediting_party"), col("currency"), col("debiting_party"), col("event_type"), col("instructed_amount"), col("instructing_parties"), col("intermediate_banks"), col("last_update_datetime"), col("ordering_parties"), col("originator"), col("originator_address_country"), col("originator_bank"), col("originator_residence_country"), col("owning_account_id"), col("transaction_id"), col("transaction_sub_type"), col("transaction_type"), col("uetr"), col("originator_jurisdiction"), col("beneficiary_jurisdiction"), col("intermediate_bank_jurisdictions"), col("beneficiary_correspondent_jurisdictions"), col("instructing_jurisdictions"), col("ordering_jurisdictions"), col("intermediary_jurisdictions_count"), col("originator_country_score"), col("originator_high_risk_country_flag"), col("beneficiary_country_score"), col("beneficiary_high_risk_country_flag"), col("intermediary_jurisdictions"), col("intermediate_country_score"), col("intermediary_jurisdictions_high_risk_flag"), col("max_date"), col("benford_digit"), col("ending_digit"), col("month"), col("week"), col("threedays"), col("pipeline_to_date"), col("date"), col("swift_transaction_category"), col("originator").struct.field_by_name(account_number)().alias("focal_counterparty_id"), col("beneficiary").struct.field_by_name(account_number)().alias("counterparty_account_id"), String(Originator).alias("role"), String(C).alias("credit_or_debit"), col("originator").alias("focal_counterparty"), col("beneficiary").alias("counterparty"), col("owning_account_id").alias("local_account")] FROM
       WITH_COLUMNS:
       [.when([(col("__POLARS_CSER_13645683119572449170")) & (col("transaction_sub_type").is_in([Series]))]).then(String(fi_to_fi_customer_credit_transfer)).otherwise(.when([(col("__POLARS_CSER_13645683119572449170")) & (col("transaction_sub_type").is_in([Series]))]).then(String(cover_payments)).otherwise(.when([(col("__POLARS_CSER_13645683119572449170")) & (col("transaction_sub_type").is_in([Series]))]).then(String(fi_credit_transfer)).otherwise(null.strict_cast(String)))).alias("swift_transaction_category"), [(col("transaction_type")) == (String(SWIFT))].alias("__POLARS_CSER_13645683119572449170")]
         SELECT [col("amount"), col("base_amount"), col("beneficiary"), col("beneficiary_address_country"), col("beneficiary_bank"), col("beneficiary_correspondents"), col("beneficiary_residence_country"), col("canonical_datetime"), col("crediting_party"), col("currency"), col("debiting_party"), col("event_type"), col("instructed_amount"), col("instructing_parties"), col("intermediate_banks"), col("last_update_datetime"), col("ordering_parties"), col("originator"), col("originator_address_country"), col("originator_bank"), col("originator_residence_country"), col("owning_account_id"), col("transaction_id"), col("transaction_sub_type"), col("transaction_type"), col("uetr"), col("originator_jurisdiction"), col("beneficiary_jurisdiction"), col("intermediate_bank_jurisdictions"), col("beneficiary_correspondent_jurisdictions"), col("instructing_jurisdictions"), col("ordering_jurisdictions"), col("intermediary_jurisdictions_count"), col("originator_country_score"), col("originator_high_risk_country_flag"), col("beneficiary_country_score"), col("beneficiary_high_risk_country_flag"), col("intermediary_jurisdictions"), col("intermediate_country_score"), col("intermediary_jurisdictions_high_risk_flag"), col("max_date"), col("__POLARS_CSER_15815706651934126073").str.slice([0, 1]).strict_cast(Int32).alias("benford_digit"), col("__POLARS_CSER_15815706651934126073").str.slice([-1, null]).strict_cast(Int32).alias("ending_digit"), col("canonical_datetime").dt.to_string().alias("month"), [(col("__POLARS_CSER_10562999114576139691")) / (7)].floor().strict_cast(Int32).alias("week"), [(col("__POLARS_CSER_10562999114576139691")) / (3)].floor().strict_cast(Int32).alias("threedays"), 2020-12-31.alias("pipeline_to_date"), col("canonical_datetime").dt.to_string().alias("date"), col("instructed_amount").abs().strict_cast(String).alias("__POLARS_CSER_15815706651934126073"), [(col("max_date").cast(Datetime(Microseconds, None))) - (col("canonical_datetime"))].dt.total_days().alias("__POLARS_CSER_10562999114576139691")] FROM
           WITH_COLUMNS:
           [2020-12-31.alias("max_date")]
             SELECT [col("owning_account_id"), col("transaction_id"), col("amount"), col("base_amount"), col("beneficiary"), col("beneficiary_address_country"), col("beneficiary_bank"), col("beneficiary_correspondents"), col("beneficiary_residence_country"), col("canonical_datetime"), col("crediting_party"), col("currency"), col("debiting_party"), col("event_type"), col("instructed_amount"), col("instructing_parties"), col("intermediate_banks"), col("last_update_datetime"), col("ordering_parties"), col("originator"), col("originator_address_country"), col("originator_bank"), col("originator_residence_country"), col("transaction_sub_type"), col("transaction_type"), col("uetr"), col("originator_jurisdiction"), col("beneficiary_jurisdiction"), col("intermediate_bank_jurisdictions"), col("beneficiary_correspondent_jurisdictions"), col("instructing_jurisdictions"), col("ordering_jurisdictions"), col("intermediary_jurisdictions_count"), col("originator_country_score"), col("originator_high_risk_country_flag"), col("beneficiary_country_score"), col("beneficiary_high_risk_country_flag"), col("intermediary_jurisdictions"), col("intermediate_country_score"), col("intermediary_jurisdictions_high_risk_flag")] FROM
              INNER JOIN:
              LEFT PLAN ON: [col("owning_account_id"), col("transaction_id")]
                 SELECT [col("beneficiary_jurisdiction"), col("owning_account_id"), col("transaction_id"), col("amount"), col("base_amount"), col("beneficiary"), col("beneficiary_address_country"), col("beneficiary_bank"), col("beneficiary_correspondents"), col("beneficiary_residence_country"), col("canonical_datetime"), col("crediting_party"), col("currency"), col("debiting_party"), col("event_type"), col("instructed_amount"), col("instructing_parties"), col("intermediate_banks"), col("last_update_datetime"), col("ordering_parties"), col("originator"), col("originator_address_country"), col("originator_bank"), col("originator_residence_country"), col("transaction_sub_type"), col("transaction_type"), col("uetr"), col("originator_jurisdiction"), col("intermediate_bank_jurisdictions"), col("beneficiary_correspondent_jurisdictions"), col("instructing_jurisdictions"), col("ordering_jurisdictions"), col("intermediary_jurisdictions_count"), col("originator_country_score"), col("originator_high_risk_country_flag"), col("beneficiary_country_score"), col("beneficiary_high_risk_country_flag")] FROM
                  LEFT JOIN:
                  LEFT PLAN ON: [col("beneficiary_jurisdiction")]
                     SELECT [col("originator_jurisdiction"), col("beneficiary_jurisdiction"), col("owning_account_id"), col("transaction_id"), col("amount"), col("base_amount"), col("beneficiary"), col("beneficiary_address_country"), col("beneficiary_bank"), col("beneficiary_correspondents"), col("beneficiary_residence_country"), col("canonical_datetime"), col("crediting_party"), col("currency"), col("debiting_party"), col("event_type"), col("instructed_amount"), col("instructing_parties"), col("intermediate_banks"), col("last_update_datetime"), col("ordering_parties"), col("originator"), col("originator_address_country"), col("originator_bank"), col("originator_residence_country"), col("transaction_sub_type"), col("transaction_type"), col("uetr"), col("intermediate_bank_jurisdictions"), col("beneficiary_correspondent_jurisdictions"), col("instructing_jurisdictions"), col("ordering_jurisdictions"), col("intermediary_jurisdictions_count"), col("originator_country_score"), col("originator_high_risk_country_flag")] FROM
                      LEFT JOIN:
                      LEFT PLAN ON: [col("originator_jurisdiction")]
                         WITH_COLUMNS:
                         [col("amount"), col("base_amount"), col("beneficiary"), col("beneficiary_address_country"), col("beneficiary_bank"), col("beneficiary_correspondents"), col("beneficiary_residence_country"), col("canonical_datetime"), col("crediting_party"), col("currency"), col("debiting_party"), col("event_type"), col("instructed_amount"), col("instructing_parties"), col("intermediate_banks"), col("last_update_datetime"), col("ordering_parties"), col("originator"), col("originator_address_country"), col("originator_bank"), col("originator_residence_country"), col("owning_account_id"), col("transaction_id"), col("transaction_sub_type"), col("transaction_type"), col("uetr"), col("originator_jurisdiction"), col("beneficiary_jurisdiction"), col("intermediate_bank_jurisdictions"), col("beneficiary_correspondent_jurisdictions"), col("instructing_jurisdictions"), col("ordering_jurisdictions"), col("intermediate_bank_jurisdictions").list.length().alias("intermediary_jurisdictions_count")]
                           WITH_COLUMNS:
                           [col("amount"), col("base_amount"), col("beneficiary"), col("beneficiary_address_country"), col("beneficiary_bank"), col("beneficiary_correspondents"), col("beneficiary_residence_country"), col("canonical_datetime"), col("crediting_party"), col("currency"), col("debiting_party"), col("event_type"), col("instructed_amount"), col("instructing_parties"), col("intermediate_banks"), col("last_update_datetime"), col("ordering_parties"), col("originator"), col("originator_address_country"), col("originator_bank"), col("originator_residence_country"), col("owning_account_id"), col("transaction_id"), col("transaction_sub_type"), col("transaction_type"), col("uetr"), col("originator").struct.field_by_name(country)().str.strip_chars([null]).replace([String(), null]).coalesce([col("originator").struct.field_by_name(bic)().str.slice([4, 2])]).alias("originator_jurisdiction"), col("beneficiary").struct.field_by_name(country)().str.strip_chars([null]).replace([String(), null]).coalesce([col("beneficiary").struct.field_by_name(bic)().str.slice([4, 2])]).alias("beneficiary_jurisdiction"), col("intermediate_banks").eval().alias("intermediate_bank_jurisdictions"), col("beneficiary_correspondents").eval().alias("beneficiary_correspondent_jurisdictions"), col("instructing_parties").eval().alias("instructing_jurisdictions"), col("ordering_parties").eval().alias("ordering_jurisdictions")]
                             WITH_COLUMNS:
                             [String(CASH).alias("transaction_type")]

                                Parquet SCAN 2 files: first file: /home/user/pipeline/data/test_data/data-0-8f374fa9-d449-4c49-b624-5089d3278a31-0.parquet
                                PROJECT 26/28 COLUMNS
                      RIGHT PLAN ON: [col("originator_jurisdiction")]
                         SELECT [col("jurisdiction").alias("originator_jurisdiction"), col("country_score").alias("originator_country_score"), col("high_risk_country_flag").alias("originator_high_risk_country_flag")] FROM
                           SELECT [col("high_risk_country_flag"), col("country_score"), col("jurisdiction")] FROM
                             WITH_COLUMNS:
                             [col("fatf_increased_monitoring_grey_list").str.uppercase().coalesce([col("fatf_call_for_action_black_list").str.uppercase(), String(0)]).alias("high_risk_country_flag"), col("overall_score").alias("country_score"), col("iso_code").alias("jurisdiction")]
                               SELECT [col("fatf_increased_monitoring_grey_list"), col("fatf_call_for_action_black_list"), col("overall_score"), col("iso_code")] FROM

                                  Csv SCAN seeds/jurisdiction_scores.csv
                                  PROJECT 4/40 COLUMNS
                      END LEFT JOIN
                  RIGHT PLAN ON: [col("beneficiary_jurisdiction")]
                     SELECT [col("jurisdiction").alias("beneficiary_jurisdiction"), col("country_score").alias("beneficiary_country_score"), col("high_risk_country_flag").alias("beneficiary_high_risk_country_flag")] FROM
                       SELECT [col("high_risk_country_flag"), col("country_score"), col("jurisdiction")] FROM
                         WITH_COLUMNS:
                         [col("fatf_increased_monitoring_grey_list").str.uppercase().coalesce([col("fatf_call_for_action_black_list").str.uppercase(), String(0)]).alias("high_risk_country_flag"), col("overall_score").alias("country_score"), col("iso_code").alias("jurisdiction")]
                           SELECT [col("fatf_increased_monitoring_grey_list"), col("fatf_call_for_action_black_list"), col("overall_score"), col("iso_code")] FROM

                              Csv SCAN seeds/jurisdiction_scores.csv
                              PROJECT 4/40 COLUMNS
                  END LEFT JOIN
              RIGHT PLAN ON: [col("owning_account_id"), col("transaction_id")]
                 WITH_COLUMNS:
                 [col("intermediary_jurisdictions").map_list().alias("intermediate_country_score"), col("intermediary_jurisdictions").map_list().alias("intermediary_jurisdictions_high_risk_flag")]
                   SELECT [col("owning_account_id"), col("transaction_id"), col("intermediary_jurisdictions")] FROM
                     WITH_COLUMNS:
                     [col("intermediate_bank_jurisdictions").str.split([String(,)]).alias("intermediary_jurisdictions")]
                       WITH_COLUMNS:
                       [col("intermediate_bank_jurisdictions").str.concat_horizontal([col("beneficiary_correspondent_jurisdictions"), col("instructing_jurisdictions"), col("ordering_jurisdictions")]).alias("intermediary_jurisdictions")]
                         SELECT [col("owning_account_id"), col("transaction_id"), col("intermediate_bank_jurisdictions").list.join([String(,)]), col("beneficiary_correspondent_jurisdictions").list.join([String(,)]), col("instructing_jurisdictions").list.join([String(,)]), col("ordering_jurisdictions").list.join([String(,)])] FROM
                           WITH_COLUMNS:
                           [col("owning_account_id"), col("transaction_id"), col("intermediate_bank_jurisdictions"), col("beneficiary_correspondent_jurisdictions"), col("instructing_jurisdictions"), col("ordering_jurisdictions")]
                             WITH_COLUMNS:
                             [col("owning_account_id"), col("transaction_id"), col("intermediate_banks").eval().alias("intermediate_bank_jurisdictions"), col("beneficiary_correspondents").eval().alias("beneficiary_correspondent_jurisdictions"), col("instructing_parties").eval().alias("instructing_jurisdictions"), col("ordering_parties").eval().alias("ordering_jurisdictions")]

                                Parquet SCAN 2 files: first file: /home/user/pipeline/data/test_data/data-0-8f374fa9-d449-4c49-b624-5089d3278a31-0.parquet
                                PROJECT 6/28 COLUMNS
              END INNER JOIN
  PLAN 1:
     SELECT [col("amount"), col("base_amount"), col("beneficiary"), col("beneficiary_address_country"), col("beneficiary_bank"), col("beneficiary_correspondents"), col("beneficiary_residence_country"), col("canonical_datetime"), col("crediting_party"), col("currency"), col("debiting_party"), col("event_type"), col("instructed_amount"), col("instructing_parties"), col("intermediate_banks"), col("last_update_datetime"), col("ordering_parties"), col("originator"), col("originator_address_country"), col("originator_bank"), col("originator_residence_country"), col("owning_account_id"), col("transaction_id"), col("transaction_sub_type"), col("transaction_type"), col("uetr"), col("originator_jurisdiction"), col("beneficiary_jurisdiction"), col("intermediate_bank_jurisdictions"), col("beneficiary_correspondent_jurisdictions"), col("instructing_jurisdictions"), col("ordering_jurisdictions"), col("intermediary_jurisdictions_count"), col("originator_country_score"), col("originator_high_risk_country_flag"), col("beneficiary_country_score"), col("beneficiary_high_risk_country_flag"), col("intermediary_jurisdictions"), col("intermediate_country_score"), col("intermediary_jurisdictions_high_risk_flag"), col("max_date"), col("benford_digit"), col("ending_digit"), col("month"), col("week"), col("threedays"), col("pipeline_to_date"), col("date"), col("swift_transaction_category"), col("beneficiary").struct.field_by_name(account_number)().alias("focal_counterparty_id"), col("originator").struct.field_by_name(account_number)().alias("counterparty_account_id"), String(Beneficiary).alias("role"), String(D).alias("credit_or_debit"), col("beneficiary").alias("focal_counterparty"), col("originator").alias("counterparty"), col("owning_account_id").alias("local_account")] FROM
       WITH_COLUMNS:
       [.when([(col("__POLARS_CSER_13645683119572449170")) & (col("transaction_sub_type").is_in([Series]))]).then(String(fi_to_fi_customer_credit_transfer)).otherwise(.when([(col("__POLARS_CSER_13645683119572449170")) & (col("transaction_sub_type").is_in([Series]))]).then(String(cover_payments)).otherwise(.when([(col("__POLARS_CSER_13645683119572449170")) & (col("transaction_sub_type").is_in([Series]))]).then(String(fi_credit_transfer)).otherwise(null.strict_cast(String)))).alias("swift_transaction_category"), [(col("transaction_type")) == (String(SWIFT))].alias("__POLARS_CSER_13645683119572449170")]
         SELECT [col("amount"), col("base_amount"), col("beneficiary"), col("beneficiary_address_country"), col("beneficiary_bank"), col("beneficiary_correspondents"), col("beneficiary_residence_country"), col("canonical_datetime"), col("crediting_party"), col("currency"), col("debiting_party"), col("event_type"), col("instructed_amount"), col("instructing_parties"), col("intermediate_banks"), col("last_update_datetime"), col("ordering_parties"), col("originator"), col("originator_address_country"), col("originator_bank"), col("originator_residence_country"), col("owning_account_id"), col("transaction_id"), col("transaction_sub_type"), col("transaction_type"), col("uetr"), col("originator_jurisdiction"), col("beneficiary_jurisdiction"), col("intermediate_bank_jurisdictions"), col("beneficiary_correspondent_jurisdictions"), col("instructing_jurisdictions"), col("ordering_jurisdictions"), col("intermediary_jurisdictions_count"), col("originator_country_score"), col("originator_high_risk_country_flag"), col("beneficiary_country_score"), col("beneficiary_high_risk_country_flag"), col("intermediary_jurisdictions"), col("intermediate_country_score"), col("intermediary_jurisdictions_high_risk_flag"), col("max_date"), col("__POLARS_CSER_15815706651934126073").str.slice([0, 1]).strict_cast(Int32).alias("benford_digit"), col("__POLARS_CSER_15815706651934126073").str.slice([-1, null]).strict_cast(Int32).alias("ending_digit"), col("canonical_datetime").dt.to_string().alias("month"), [(col("__POLARS_CSER_10562999114576139691")) / (7)].floor().strict_cast(Int32).alias("week"), [(col("__POLARS_CSER_10562999114576139691")) / (3)].floor().strict_cast(Int32).alias("threedays"), 2020-12-31.alias("pipeline_to_date"), col("canonical_datetime").dt.to_string().alias("date"), col("instructed_amount").abs().strict_cast(String).alias("__POLARS_CSER_15815706651934126073"), [(col("max_date").cast(Datetime(Microseconds, None))) - (col("canonical_datetime"))].dt.total_days().alias("__POLARS_CSER_10562999114576139691")] FROM
           WITH_COLUMNS:
           [2020-12-31.alias("max_date")]
             SELECT [col("owning_account_id"), col("transaction_id"), col("amount"), col("base_amount"), col("beneficiary"), col("beneficiary_address_country"), col("beneficiary_bank"), col("beneficiary_correspondents"), col("beneficiary_residence_country"), col("canonical_datetime"), col("crediting_party"), col("currency"), col("debiting_party"), col("event_type"), col("instructed_amount"), col("instructing_parties"), col("intermediate_banks"), col("last_update_datetime"), col("ordering_parties"), col("originator"), col("originator_address_country"), col("originator_bank"), col("originator_residence_country"), col("transaction_sub_type"), col("transaction_type"), col("uetr"), col("originator_jurisdiction"), col("beneficiary_jurisdiction"), col("intermediate_bank_jurisdictions"), col("beneficiary_correspondent_jurisdictions"), col("instructing_jurisdictions"), col("ordering_jurisdictions"), col("intermediary_jurisdictions_count"), col("originator_country_score"), col("originator_high_risk_country_flag"), col("beneficiary_country_score"), col("beneficiary_high_risk_country_flag"), col("intermediary_jurisdictions"), col("intermediate_country_score"), col("intermediary_jurisdictions_high_risk_flag")] FROM
              INNER JOIN:
              LEFT PLAN ON: [col("owning_account_id"), col("transaction_id")]
                 SELECT [col("beneficiary_jurisdiction"), col("owning_account_id"), col("transaction_id"), col("amount"), col("base_amount"), col("beneficiary"), col("beneficiary_address_country"), col("beneficiary_bank"), col("beneficiary_correspondents"), col("beneficiary_residence_country"), col("canonical_datetime"), col("crediting_party"), col("currency"), col("debiting_party"), col("event_type"), col("instructed_amount"), col("instructing_parties"), col("intermediate_banks"), col("last_update_datetime"), col("ordering_parties"), col("originator"), col("originator_address_country"), col("originator_bank"), col("originator_residence_country"), col("transaction_sub_type"), col("transaction_type"), col("uetr"), col("originator_jurisdiction"), col("intermediate_bank_jurisdictions"), col("beneficiary_correspondent_jurisdictions"), col("instructing_jurisdictions"), col("ordering_jurisdictions"), col("intermediary_jurisdictions_count"), col("originator_country_score"), col("originator_high_risk_country_flag"), col("beneficiary_country_score"), col("beneficiary_high_risk_country_flag")] FROM
                  LEFT JOIN:
                  LEFT PLAN ON: [col("beneficiary_jurisdiction")]
                     SELECT [col("originator_jurisdiction"), col("beneficiary_jurisdiction"), col("owning_account_id"), col("transaction_id"), col("amount"), col("base_amount"), col("beneficiary"), col("beneficiary_address_country"), col("beneficiary_bank"), col("beneficiary_correspondents"), col("beneficiary_residence_country"), col("canonical_datetime"), col("crediting_party"), col("currency"), col("debiting_party"), col("event_type"), col("instructed_amount"), col("instructing_parties"), col("intermediate_banks"), col("last_update_datetime"), col("ordering_parties"), col("originator"), col("originator_address_country"), col("originator_bank"), col("originator_residence_country"), col("transaction_sub_type"), col("transaction_type"), col("uetr"), col("intermediate_bank_jurisdictions"), col("beneficiary_correspondent_jurisdictions"), col("instructing_jurisdictions"), col("ordering_jurisdictions"), col("intermediary_jurisdictions_count"), col("originator_country_score"), col("originator_high_risk_country_flag")] FROM
                      LEFT JOIN:
                      LEFT PLAN ON: [col("originator_jurisdiction")]
                         WITH_COLUMNS:
                         [col("amount"), col("base_amount"), col("beneficiary"), col("beneficiary_address_country"), col("beneficiary_bank"), col("beneficiary_correspondents"), col("beneficiary_residence_country"), col("canonical_datetime"), col("crediting_party"), col("currency"), col("debiting_party"), col("event_type"), col("instructed_amount"), col("instructing_parties"), col("intermediate_banks"), col("last_update_datetime"), col("ordering_parties"), col("originator"), col("originator_address_country"), col("originator_bank"), col("originator_residence_country"), col("owning_account_id"), col("transaction_id"), col("transaction_sub_type"), col("transaction_type"), col("uetr"), col("originator_jurisdiction"), col("beneficiary_jurisdiction"), col("intermediate_bank_jurisdictions"), col("beneficiary_correspondent_jurisdictions"), col("instructing_jurisdictions"), col("ordering_jurisdictions"), col("intermediate_bank_jurisdictions").list.length().alias("intermediary_jurisdictions_count")]
                           WITH_COLUMNS:
                           [col("amount"), col("base_amount"), col("beneficiary"), col("beneficiary_address_country"), col("beneficiary_bank"), col("beneficiary_correspondents"), col("beneficiary_residence_country"), col("canonical_datetime"), col("crediting_party"), col("currency"), col("debiting_party"), col("event_type"), col("instructed_amount"), col("instructing_parties"), col("intermediate_banks"), col("last_update_datetime"), col("ordering_parties"), col("originator"), col("originator_address_country"), col("originator_bank"), col("originator_residence_country"), col("owning_account_id"), col("transaction_id"), col("transaction_sub_type"), col("transaction_type"), col("uetr"), col("originator").struct.field_by_name(country)().str.strip_chars([null]).replace([String(), null]).coalesce([col("originator").struct.field_by_name(bic)().str.slice([4, 2])]).alias("originator_jurisdiction"), col("beneficiary").struct.field_by_name(country)().str.strip_chars([null]).replace([String(), null]).coalesce([col("beneficiary").struct.field_by_name(bic)().str.slice([4, 2])]).alias("beneficiary_jurisdiction"), col("intermediate_banks").eval().alias("intermediate_bank_jurisdictions"), col("beneficiary_correspondents").eval().alias("beneficiary_correspondent_jurisdictions"), col("instructing_parties").eval().alias("instructing_jurisdictions"), col("ordering_parties").eval().alias("ordering_jurisdictions")]
                             WITH_COLUMNS:
                             [String(CASH).alias("transaction_type")]

                                Parquet SCAN 2 files: first file: /home/user/pipeline/data/test_data/data-0-8f374fa9-d449-4c49-b624-5089d3278a31-0.parquet
                                PROJECT 26/28 COLUMNS
                      RIGHT PLAN ON: [col("originator_jurisdiction")]
                         SELECT [col("jurisdiction").alias("originator_jurisdiction"), col("country_score").alias("originator_country_score"), col("high_risk_country_flag").alias("originator_high_risk_country_flag")] FROM
                           SELECT [col("high_risk_country_flag"), col("country_score"), col("jurisdiction")] FROM
                             WITH_COLUMNS:
                             [col("fatf_increased_monitoring_grey_list").str.uppercase().coalesce([col("fatf_call_for_action_black_list").str.uppercase(), String(0)]).alias("high_risk_country_flag"), col("overall_score").alias("country_score"), col("iso_code").alias("jurisdiction")]
                               SELECT [col("fatf_increased_monitoring_grey_list"), col("fatf_call_for_action_black_list"), col("overall_score"), col("iso_code")] FROM

                                  Csv SCAN seeds/jurisdiction_scores.csv
                                  PROJECT 4/40 COLUMNS
                      END LEFT JOIN
                  RIGHT PLAN ON: [col("beneficiary_jurisdiction")]
                     SELECT [col("jurisdiction").alias("beneficiary_jurisdiction"), col("country_score").alias("beneficiary_country_score"), col("high_risk_country_flag").alias("beneficiary_high_risk_country_flag")] FROM
                       SELECT [col("high_risk_country_flag"), col("country_score"), col("jurisdiction")] FROM
                         WITH_COLUMNS:
                         [col("fatf_increased_monitoring_grey_list").str.uppercase().coalesce([col("fatf_call_for_action_black_list").str.uppercase(), String(0)]).alias("high_risk_country_flag"), col("overall_score").alias("country_score"), col("iso_code").alias("jurisdiction")]
                           SELECT [col("fatf_increased_monitoring_grey_list"), col("fatf_call_for_action_black_list"), col("overall_score"), col("iso_code")] FROM

                              Csv SCAN seeds/jurisdiction_scores.csv
                              PROJECT 4/40 COLUMNS
                  END LEFT JOIN
              RIGHT PLAN ON: [col("owning_account_id"), col("transaction_id")]
                 WITH_COLUMNS:
                 [col("intermediary_jurisdictions").map_list().alias("intermediate_country_score"), col("intermediary_jurisdictions").map_list().alias("intermediary_jurisdictions_high_risk_flag")]
                   SELECT [col("owning_account_id"), col("transaction_id"), col("intermediary_jurisdictions")] FROM
                     WITH_COLUMNS:
                     [col("intermediate_bank_jurisdictions").str.split([String(,)]).alias("intermediary_jurisdictions")]
                       WITH_COLUMNS:
                       [col("intermediate_bank_jurisdictions").str.concat_horizontal([col("beneficiary_correspondent_jurisdictions"), col("instructing_jurisdictions"), col("ordering_jurisdictions")]).alias("intermediary_jurisdictions")]
                         SELECT [col("owning_account_id"), col("transaction_id"), col("intermediate_bank_jurisdictions").list.join([String(,)]), col("beneficiary_correspondent_jurisdictions").list.join([String(,)]), col("instructing_jurisdictions").list.join([String(,)]), col("ordering_jurisdictions").list.join([String(,)])] FROM
                           WITH_COLUMNS:
                           [col("owning_account_id"), col("transaction_id"), col("intermediate_bank_jurisdictions"), col("beneficiary_correspondent_jurisdictions"), col("instructing_jurisdictions"), col("ordering_jurisdictions")]
                             WITH_COLUMNS:
                             [col("owning_account_id"), col("transaction_id"), col("intermediate_banks").eval().alias("intermediate_bank_jurisdictions"), col("beneficiary_correspondents").eval().alias("beneficiary_correspondent_jurisdictions"), col("instructing_parties").eval().alias("instructing_jurisdictions"), col("ordering_parties").eval().alias("ordering_jurisdictions")]

                                Parquet SCAN 2 files: first file: /home/user/pipeline/data/test_data/data-0-8f374fa9-d449-4c49-b624-5089d3278a31-0.parquet
                                PROJECT 6/28 COLUMNS
              END INNER JOIN
END UNION  --- END STREAMING

  DF []; PROJECT */0 COLUMNS; SELECTION: "None"

Installed versions

``` --------Version info--------- Polars: 0.20.22-rc.1 Index type: UInt32 Platform: Linux-5.15.146.1-microsoft-standard-WSL2-x86_64-with-glibc2.35 Python: 3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0] ----Optional dependencies---- adbc_driver_manager: cloudpickle: connectorx: deltalake: 0.16.4 fastexcel: fsspec: gevent: hvplot: matplotlib: nest_asyncio: 1.6.0 numpy: 1.26.4 openpyxl: pandas: 2.2.2 pyarrow: 15.0.2 pydantic: 1.10.15 pyiceberg: pyxlsb: sqlalchemy: xlsx2csv: xlsxwriter: None```
buggtb commented 6 months ago

Just for a bit more context here as well from some more testing.

I'm running the Polars code from the above plan on a Mac with 96GB Ram with a 50m row input set.

I watch OSX stick it into RAM until a certain point and then it flips it over to the swap partition which is 90GB in size and when it hits 90GB OSX just kills it.

I've also set the parquet scan to low_memory true and cache false to try and limit the ram usage on the input size. No obvious difference. I also dialed the streaming chunk size down to like 1000 rows and it still blows up.

buggtb commented 6 months ago

The inner join is a killer. But, https://github.com/pola-rs/polars/issues/14201 taking this and casting the join key columns, does stop it blowing up. Further down we do a few selects and then a pl.concat over two of these frames and that also causes this to blow up, where as sinking to disk instead of concating seems to not do that.

Still on the RAM usage, I'm curious because when it doesn't work, it doesn't seem to do much spilling to disk and instead seems to more fill up the Swap until it hits 90GB and gets killed.

buggtb commented 6 months ago

Yeah so I doubled the size again for my input and something like this:

df1 = pl.scan_parquet("output_data/tmpbase.parquet").cast({"owning_account_id": pl.Categorical})
df2 = pl.scan_parquet("output_data/tmpintermediate.parquet").cast({"transaction_id": pl.Categorical})
return df1.join(
    df2,
    on=["owning_account_id", "transaction_id"],
    how="inner",
)

....

result[table].sink_ipc(f"output_data/{table}")

goes boom. I'm not sure whats up, but I can't keep an inner join streamable and the confines of the Ram limits

buggtb commented 6 months ago

Flipping the inner join to left join on the same dataset doesn't seem to use more than about 20% of available RAM, so I'm surprised an inner join uses 90GB Swap and blows up.

     df1 = pl.scan_parquet("output_data/tmpbase.parquet", low_memory=True, cache=False).cast({"owning_account_id": pl.Categorical})
     df2 = pl.scan_parquet("output_data/tmpintermediate.parquet", low_memory=True, cache=False).cast({"transaction_id": pl.Categorical})
    df1.join(
        df2,
        on=["owning_account_id", "transaction_id"],
        how="left",
    ).filter(pl.col("transaction_id").is_not_null()).sink_parquet("output_data/table.parquet", maintain_order=False, row_group_size=1000)

This is where I'm now at, doing a left join followed by a filter to remove the other records left as its supposed to be an inner. Seems to currently live at about 40% of my ram, rather than OOMing

Baukebrenninkmeijer commented 3 months ago

@buggtb Thanks for this thread, it was helpful since I was dealing with the same problem - inner join during streaming filling whole memory and then being killed. Left join seems to have fixed the problem for now, but I'm unclear as to why this was happening as well.

gtebbutt commented 3 months ago

I've run into a very similar issue with streaming mode running out of memory, but on a much simpler query:

pl.scan_parquet("./data/*.parquet", cache=False, low_memory=True).filter(
    pl.col("id").is_in(include_ids)
).sink_parquet(output_path)

It's scanning about 110GB, across ~2000 parquet files ranging in size from a few MB to 1GB, on a machine with 64GB RAM. It gets most of the way through before the memory spikes significantly and then crashes.

Didn't see any noticable difference in behaviour with or without low_memory, but setting POLARS_MAX_THREADS=12 (down from the machine default of 24) allows it to run in full. I'm relatively new to polars, so I'm not sure how expected/well-known this behaviour is, but I'm guessing it's hitting a cluster of the larger files and decompressing too many in parallel?

Baukebrenninkmeijer commented 3 months ago

@gtebbutt this seems to be lazy but not streaming. Meaning that in the end, this will try to load all matching files into memory still. Could you add '.collect(streaming=True)' before sink_parquet and try again?

pbower commented 2 months ago

I get the same issue. This may not be required due to it running in rust (no GC), but I know in python a manual garbage collection process is needed when streaming over arrow datasets otherwise memory overflows. Is something similar needed here , when running the python polars library?

ncbarta commented 2 months ago

I watch OSX stick it into RAM until a certain point and then it flips it over to the swap partition which is 90GB in size and when it hits 90GB OSX just kills it.

I observe the same behavior on my device when doing:

pl.scan_csv('source.csv').sink_parquet('dest.parquet')

I don't know why 90GB is so special.