pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
27.89k stars 1.71k forks source link

Polars lazy mode bug and panic resolving columns properly #17427

Closed atigbadr closed 2 days ago

atigbadr commented 4 days ago

Checks

Reproducible example

import polars as pl

def add_prio_safety(df):
    """Add column 'prio_safty' by mapping SAFETY_CLASS."""
    # define dict for mapping
    prio_dict = {
        "IPS": 1,
        "EIP": 2,
        "IPR": 3,
        "IPI": 4,
        "QS": 5,
        "ND": 6,
        "NON": 7,
        "?": 8,
    }
    df = df.with_columns(
        pl.col("SAFETY_CLASS")
        .replace(prio_dict)
        .fill_null(pl.lit(9))
        .alias("PRIO_SAFETY")
    )

    return df

def rename_col_suffix(df, suffix, col_key="", col_key_suffix=""):
    """
    Rename all columns with suffix excepct for column col_key that is renamed with suffix
    col_key_suffix.
    """
    new_col_names = {
        c: f"{c}_{suffix}_{col_key_suffix}" if c == col_key else f"{c}_{suffix}"
        for c in df.lazy().collect_schema().names()
    }
    df = df.rename(new_col_names)

    return df

def add_rf_complet(df: pl.DataFrame, suffix: str = ""):
    """Add rf complet as concatenation of exisiting columns, with suffix if given."""
    df = df.with_columns(
        [
            pl.concat_str(
                [
                    pl.col(f"FACILITY{suffix}"),
                    pl.col(f"UNIT{suffix}"),
                    pl.col(f"DIVISION{suffix}"),
                    pl.lit("_"),
                    pl.col(f"SYSTEM_CODE{suffix}"),
                    pl.col(f"EQUIPMENT_NUMBER{suffix}"),
                    pl.col(f"EQUIPMENT_TYPE{suffix}"),
                    pl.lit("/"),
                    pl.col(f"COMPONENT_TYPE{suffix}"),
                    pl.lit("/"),
                    pl.col(f"COMPONENT_NUMBER{suffix}"),
                ],
                ignore_nulls=True,
            ).alias(f"RF_COMPLET{suffix}"),
            pl.concat_str(
                [
                    pl.col(f"SYSTEM_CODE{suffix}"),
                    pl.col(f"EQUIPMENT_NUMBER{suffix}"),
                    pl.col(f"EQUIPMENT_TYPE{suffix}"),
                    pl.lit("/"),
                    pl.col(f"COMPONENT_TYPE{suffix}"),
                    pl.lit("/"),
                    pl.col(f"COMPONENT_NUMBER{suffix}"),
                ],
                ignore_nulls=True,
            ).alias(f"RF_COMPLET_WO_TRANCHE{suffix}"),
            pl.concat_str(
                [
                    pl.col(f"SYSTEM_CODE{suffix}"),
                    pl.col(f"EQUIPMENT_NUMBER{suffix}"),
                    pl.col(f"EQUIPMENT_TYPE{suffix}"),
                ],
                ignore_nulls=True,
            ).alias(f"RF{suffix}"),
        ]
    )

    return df

def merge_tot_pmrq_sub_cols(df, col_name):
    """
    Merges same columns, col_name_XXX (XXX stands for 'tot', 'pmrq' or 'sub'), into a new column
    col_final, that contains the value of col_tot if it exists, otherwise value of col_sub if it
    exits, otherwise value of col_pmrq.

    Args:
        df (DataFrame): Input DataFrame.
        col_name (str): Original column name. Columns with same main name and suffix '_tot', '_pmrq'
        and '_sub' must exist.

    Returns
    -------
        DataFrame: Output DataFrame with a new column '{col_name}_final'.

    Operations:
        Create a new column '{col_name}_final'
    """
    df = df.with_columns(
        pl.when(
            (pl.col(f"{col_name}_TOT").is_not_null())
            & (pl.col(f"{col_name}_TOT") != "_//")
            & (pl.col(f"{col_name}_TOT") != "//")
            & (pl.col(f"{col_name}_TOT") != "")
        )
        .then(pl.col(f"{col_name}_TOT"))
        .when(
            (pl.col(f"{col_name}_SUB").is_not_null())
            & (pl.col(f"{col_name}_SUB") != "_//")
            & (pl.col(f"{col_name}_SUB") != "//")
            & (pl.col(f"{col_name}_SUB") != "")
        )
        .then(pl.col(f"{col_name}_SUB"))
        .otherwise(pl.col(f"{col_name}_PMRQ"))
        .alias(f"{col_name}_FINAL")
    )
    return df

def add_rf_complet_and_mi(
    df, df_1, df_2, df_3, df_4, df_5
):
    """
    Add columns rf_complet, rf_complet_sub, rf_complet_pmid, and related modele_industriel
    and SAFETY_CLASS.

    Args:
        df (dataframe) :
            input dataframe
        compute_stats (bool) :
            if true, statitics about filters and joins are computed and
            saved in a csv file stored in S3 at stats_path. WARNING : Increase a lot the
            computation time. In prod, should be False. Default: False.
        stats_path (str) :
            S3 path to save stats. Used only if comute_stats is True. Default: None.

    Returns
    -------
        dataframe: output dataframe
    """
    ###################
    # ADD RF FROM TOT #
    ###################
    df = df.with_columns(pl.col("RF").replace("", None))

    # rename columns to prepare join
    df_1 = rename_col_suffix(df_1, suffix="TOT")

    # JOIN main df and TIDWOTSK data
    df = df.join(
        df_1,
        left_on=["NUMERO_OT", "NUMERO_TOT"],
        right_on=["WORK_ORDER_NBR_TOT", "WORK_ORDER_TASK_TOT"],
        how="left",
    )

    # add rf_complet by contactenation : facility & unit & division & "_" & system_code & equipment_number
    # & equipment_type & "/" & component_type & "/" & component_number
    df = add_rf_complet(df, suffix="_TOT")

    #################
    # ADD PMRQ INFO #
    #################

    # GET TIDECPMS data : PMRQ and e_code relation

    # JOIN main df and TIDECPMS data
    df = df.join(
        df_2,
        left_on=["PMRQ_SITE"],
        right_on=["PMRQ"],
        how="left",
    )

    # rename dataframe before join
    df_3_pmrq = rename_col_suffix(df_3, suffix="PMRQ")

    df_3_pmrq = df_3_pmrq.with_columns(
        pl.col("E_CODE_PMRQ").alias("E_CODE_PMRQ_copy")
    )

    # JOIN main df and TIDECHDR data

    df = df.join(
        df_3_pmrq, left_on=["OWNER_CODE"], right_on=["E_CODE_PMRQ"], how="left"
    )

    df = df.rename({"E_CODE_PMRQ_copy": "E_CODE_PMRQ"})
    # add rf_complet by contactenation : facility & unit & division & "_" & system_code & equipment_number
    # & equipment_type & "/" & component_type & "/" & component_number
    df = add_rf_complet(df, suffix="_PMRQ")

    # rename columns to prepare join

    df_4 = df_4.rename(
        {"E_CODE": "E_CODE_TIDECEQN", "EQUIP_REVISION": "EQUIP_REVISION_TIDECEQN"}
    )
    df_4_pmrq = rename_col_suffix(df_4, suffix="PMRQ")

    # JOIN main df and TIDECEQN data : modele_industriel and classe_surete info
    df = df.join(
        df_4_pmrq,
        left_on=["E_CODE_PMRQ", "EQUIP_REVISION_PMRQ"],
        right_on=["E_CODE_TIDECEQN_PMRQ", "EQUIP_REVISION_TIDECEQN_PMRQ"],
        how="left",
    )

    ################
    # ADD SUB INFO #
    ################

    # rename columns to prepare join
    df_5 = rename_col_suffix(df_5, suffix="TIDPMOVR")

    # JOIN main df and TIDPMOVR data
    df = df.join(
        df_5,
        left_on=["PM_ID_NUMBER", "PM_RQ_NUMBER", "NUMERO_TOT"],
        right_on=[
            "PM_ID_NUMBER_TIDPMOVR",
            "PM_RQ_NUMBER_TIDPMOVR",
            "WORK_ORDER_TASK_TIDPMOVR",
        ],
        how="left",
    )

    # rename column e_code_sub from e_code found in table TIDPMOVR

    df = df.rename({"E_CODE_TIDPMOVR": "E_CODE_SUB"})

    # rename columns of df_3 to prepare join to get substituted values
    df_3_sub = rename_col_suffix(
        df_3, suffix="SUB", col_key="E_CODE", col_key_suffix="TIDECHDR"
    )

    # JOIN main df and TIDECHDR (equipment info - for substituded values this time)
    df = df.join(
        df_3_sub,
        left_on=["E_CODE_SUB"],
        right_on=["E_CODE_SUB_TIDECHDR"],
        how="left",
    )

    # add rf_complet_sub
    df = add_rf_complet(df, suffix="_SUB")

    # change columns of names of table TIDECEQN for join with sub values
    df_4_sub = rename_col_suffix(df_4, suffix="SUB")

    # JOIN main df and TIDECEQN (modele_industriel and classe_surete info - for substituded values this time)
    df = df.join(
        df_4_sub,
        left_on=["E_CODE_SUB", "EQUIP_REVISION_SUB"],
        right_on=["E_CODE_TIDECEQN_SUB", "EQUIP_REVISION_TIDECEQN_SUB"],
        how="left",
    )

    ###############################
    # MERGE PMRQ, SUB & PMID INFO #
    ###############################

    # add final columns that merge original and sub columns

    cols_to_merge = [
        "E_CODE",
        "FACILITY",
        "UNIT",
        "RF_COMPLET",
        "RF_COMPLET_WO_TRANCHE",
        "RF",
        "MODELE_INDUSTRIEL",
        "SAFETY_CLASS",
    ]

    for col in cols_to_merge:
        df = merge_tot_pmrq_sub_cols(df, col)

    ###########################
    # DROP NOT NEEDED COLUMNS #
    ###########################

    # drop columns redundant due to joins
    columns_to_drop = [
        "WORK_ORDER_NBR_TOT",
        "WORK_ORDER_TASK_TOT",
        "PM_ID_NUMBER_TIDPMREF",
        "OWNER_CODE",
        "E_CODE_TIDECEQN_PMRQ",
        "OWNER_CODE_TIDPMREF",
        "E_CODE_PMID_TIDECHDR",
        "EQUIP_REVISION_TIDECEQN_PMRQ",
        "PM_ID_NUMBER_TIDPMOVR",
        "PM_RQ_NUMBER_TIDPMOVR",
        "WORK_ORDER_TASK_TIDPMOVR",
        "E_CODE_SUB_TIDECHDR",
        "E_CODE_TIDECEQN_SUB",
        "E_CODE_TIDECEQN_PMID",
        "EQUIP_REVISION_TIDECEQN_SUB",
        "EQUIP_REVISION_TIDECEQN_PMID",
        "EQUIP_REVISION_PMRQ_right",
        "EQUIP_REVISION_SUB_right",
        "E_CODE_SUB_right",
    ]

    for suffix in ["TOT", "PMRQ", "SUB"]:
        # for suffix in ["tot", "pmrq", "pmid", "sub"]:
        for col in [
            "E_CODE",
            "EQUIP_REVISION",
            "FACILITY",
            "UNIT",
            "DIVISION",
            "SYSTEM_CODE",
            "EQUIPMENT_NUMBER",
            "EQUIPMENT_TYPE",
            "COMPONENT_TYPE",
            "COMPONENT_NUMBER",
            "RF_COMPLET_WO_TRANCHE",
            "RF",
            "MODELE_INDUSTRIEL",
            "SAFETY_CLASS",
        ]:
            columns_to_drop.append(f"{col}_{suffix}")
    df = df.drop(columns_to_drop, strict=False)
    ##################
    # RENAME COLUMNS #
    ##################

    # we rename columns with correct values for downstream function,
    # keep previous values with suffix '_old'
    df = df.rename(
        {
            "SITE": "SITE_OLD",
            "TRANCHE": "TRANCHE_OLD",
            "RF": "RF_OLD",
            "FACILITY_FINAL": "SITE",
            "UNIT_FINAL": "TRANCHE",
            "RF_FINAL": "RF",
            "RF_COMPLET_FINAL": "RF_COMPLET",
            "RF_COMPLET_WO_TRANCHE_FINAL": "RF_COMPLET_SANS_TRANCHE",
            "MODELE_INDUSTRIEL_FINAL": "MODELE_INDUSTRIEL",
            "SAFETY_CLASS_FINAL": "SAFETY_CLASS",
        }
    )

    ###################
    # ADD prio_safety #
    ###################

    df = add_prio_safety(df)

    ##############
    # SAVE STATS #
    ##############

    return df

df_data =  {'SYSTEME_ELEMENTAIRE': ['10259518993288137799'], 'PROCEDURE_END': ['3717447046043311022'], 'NUM_EQU_BIG': ['11710319551364327107'], 'ANNEE': [30], 'NB_PDM': [2], 'TYPE_PMRQ_SITE': ['10116380667568548973'], 'NUMERO_OT': ['17840858055815323787'], 'FAMILLE_EQUIPEMENT_TOT': ['4305432402955241627'], 'HR_BTE_STD': [None], 'JOURS_SITE': [2190], 'RF': ['12429358722875805624'], 'DELTARELATIF_JOURS_SITE_JOURS_PALIER': [0.825], 'DELTA_FREQUENCE': [-0.1375], 'LIBELLE_ACTIVITE_CORRIGE': ['5692331372148487298'], 'SYSTEME_ELEME_TOT': ['10259518993288137799'], 'NUMERO_OTM': ['17840858055815323787'], 'ID': ['12995454962688269568'], 'ID_DF': [474773], 'ARRET': ['2840857409195221889'], 'FREQSITE': [0.16666666666666666], 'LIBELLE_TOT': ['13739374131666219563'], 'STATUT_PMRQ': ['10713894897579468908'], 'TYPE_PMRQ_PALIER': ['10116380667568548973'], 'DN_ROBINET': [None], 'MAILLE_GESTION': ['646788285261384707'], 'RAPPORT_FREQUENCE_SITE_PALIER': [0.547945205479452], 'FREQPALIER': [0.30416666666666664], 'VOIE': ['3717447046043311022'], 'PMRQ_SITE': ['13058443681754841415'], 'PROVENANCE': ['8832189020569722148'], 'JOURS_PALIER': [1200], 'TYPE_TOT': ['706592231814243521'], 'EXCLUSION': ['9225005290826826710'], 'LIBELLE_ACTIVITE': ['933838592733991669'], 'TRANCHE': [1], 'NUMERO_EQUIPEMENT': ['13488033529883711274'], 'PMRQ_PALIER': ['3389348587797839655'], 'FREQUENCE_SITE': ['2873171927416060184'], 'TYPE_GESTE_MAINTENANCE': ['5956551079191982252'], 'SITE': ['11203314010027630067'], 'TYPE_ARRET': ['10383623603468822009'], 'CODE_BTE': ['3717447046043311022'], 'NUMERO_TOT': ['12667853719307402080'], 'DISCIPLINE_TOT': ['7103309410951340401'], 'RDU': ['17378103479390767212'], 'FREQUENCE_PALIER': ['4177858838454549957'], 'TYPE_ROBINET': ['3717447046043311022'], 'DISCIPLINE_OT': ['7103309410951340401'], 'NOM_EQUIPEMENT': ['16076728382483653473'], 'HR_BTE_LOCALES': [None], 'PUISSANCE': [1300], 'PALIER': ['13917492298881635150'], 'TECHNIQUE_END': ['3717447046043311022']}
df_1_data =  {'WORK_ORDER_NBR': ['16044451887719335171'], 'WORK_ORDER_TASK': ['4438342574969621960'], 'WORK_ORDER_TYPE': ['1946467547790703037'], 'FACILITY': ['16491980263625684958'], 'UNIT': ['7569191480599717718'], 'DIVISION': ['3717447046043311022'], 'SYSTEM_CODE': ['1844251584742556006'], 'EQUIPMENT_NUMBER': ['16065166508435221904'], 'EQUIPMENT_TYPE': ['1123847502349723702'], 'COMPONENT_TYPE': ['3717447046043311022'], 'COMPONENT_NUMBER': ['3717447046043311022'], 'E_CODE': ['15111364629475924547'], 'EQUIP_REVISION': ['5597443177314721229'], 'MODELE_INDUSTRIEL': ['3717447046043311022'], 'SAFETY_CLASS': ['15621912796529646110']}
df_2_data =  {'PM_IDENTIFIER': ['11675224185897931296'], 'PM_ID_NUMBER': ['231603847244906558'], 'PM_RQ_NUMBER': ['13341616531854961667'], 'OWNER_CODE': ['6942795238955349321'], 'PMRQ': ['13805924151917160826']}
df_3_data =  {'FACILITY': ['507015596643481126'], 'UNIT': ['9225005290826826710'], 'DIVISION': ['3717447046043311022'], 'SYSTEM_CODE': ['17029355182952006822'], 'EQUIPMENT_NUMBER': ['4217269385066705779'], 'EQUIPMENT_TYPE': ['7858962460223325788'], 'COMPONENT_TYPE': ['3717447046043311022'], 'COMPONENT_NUMBER': ['3717447046043311022'], 'REVISION_STATUS': ['1725357529781714295'], 'E_CODE': ['16719398411140702957'], 'EQUIP_REVISION': ['5597443177314721229']}
df_4_data =  {'E_CODE': ['18185221358599604455'], 'EQUIP_REVISION': ['5597443177314721229'], 'MODELE_INDUSTRIEL': ['7403472235573633466'], 'SAFETY_CLASS': ['15621912796529646110']}
df_5_data =  {'E_CODE': ['953248819759467992'], 'PM_ID_NUMBER': ['1236084142282258250'], 'PM_RQ_NUMBER': ['1497819091941839941'], 'WORK_ORDER_TASK': ['3636567706313226194']}
df_6_data =  {'Code arret complet - base source': ['3234782187912179067'], 'Code arret complet - revise': ['9323118806241067325']}
df_7_data =  {'Code PMT': ['9323118806241067325'], 'Code arret - revise': ['9323118806241067325'], 'Annee - revise': ['72259354551516523']}

pl.DataFrame(df_data).write_parquet("df.parquet")
pl.DataFrame(df_1_data).write_parquet("df_1.parquet")
pl.DataFrame(df_2_data).write_parquet("df_2.parquet")
pl.DataFrame(df_3_data).write_parquet("df_3.parquet")
pl.DataFrame(df_4_data).write_parquet("df_4.parquet")
pl.DataFrame(df_5_data).write_parquet("df_5.parquet")
pl.DataFrame(df_6_data).write_parquet("df_6.parquet")
pl.DataFrame(df_7_data).write_parquet("df_7.parquet")

df = pl.scan_parquet("df.parquet")
df_1 = pl.scan_parquet("df_1.parquet")
df_2 = pl.scan_parquet("df_2.parquet")
df_3 = pl.scan_parquet("df_3.parquet")
df_4 = pl.scan_parquet("df_4.parquet")
df_5 = pl.scan_parquet("df_5.parquet")
df_6 = pl.scan_parquet("df_6.parquet")
df_7 = pl.scan_parquet("df_7.parquet")

df = add_rf_complet_and_mi(
    df, df_1, df_2, df_3, df_4, df_5
)
df = df.with_columns(
    pl.concat_str(
        [pl.col("SITE"), pl.col("ARRET")],
        separator="-",
    ).alias("ARRET_SITE")
)

df_6 = df_6.rename(
    {
        "Code arret complet - base source": "ARRET_SITE",
        "Code arret complet - revise": "ARRET_NEW",
    }
)

df_7 = df_7.rename(
    {"Code arret - revise": "ARRET_NEW", "Annee - revise": "ANNEE_NEW"}
)
# Work around i had to use to get this to work in streaming mode is
# df = df.cast(df.collect_schema())
df = df.join(df_6, how="left", on="ARRET_SITE")
df = df.join(
    df_7.select(["ARRET_NEW", "ANNEE_NEW"]), how="left", on="ARRET_NEW"
)
df = df.drop("ARRET_SITE", strict=False)
print(df.explain())
print(df.collect().head())

Log output

pyo3_runtime.PanicException: called `Result::unwrap()` on an `Err` value: ColumnNotFound(ErrString(""))

Issue description

Polars lazy mode struggles/bugs resolving columns properly with schema especially when using combination of rename/drop.

The code works in non lazy mode. The code also works when using df = df.cast(df.collect_schema()) at some point before the final LoC

Everything is in the code.

Data is written to parquet and scanned since this is where the original data is scanned from in our case.

Expected behavior

Lazy mode should be able to resolve columns properly.

Installed versions

``` --------Version info--------- Polars: 1.0.0 Index type: UInt32 Platform: Linux-3.10.0-1160.114.2.el7.x86_64-x86_64-with-glibc2.28 Python: 3.11.8 (main, Apr 12 2024, 16:17:28) [GCC 8.5.0 20210514 (Red Hat 8.5.0-20)] ----Optional dependencies---- adbc_driver_manager: cloudpickle: connectorx: 0.3.2 deltalake: 0.18.1 fastexcel: 0.10.4 fsspec: 2024.6.1 gevent: great_tables: hvplot: 0.10.0 matplotlib: nest_asyncio: 1.6.0 numpy: 1.26.4 openpyxl: 3.1.5 pandas: 2.2.2 pyarrow: 16.1.0 pydantic: 2.7.4 pyiceberg: sqlalchemy: 2.0.31 torch: xlsx2csv: 0.8.2 xlsxwriter: ```
cmdlineluser commented 4 days ago

There should be a way to reduce the example down.

It does appear to be projection_pushdown related:

>>> df.collect(projection_pushdown=False)
shape: (1, 73)
┌──────────────────┬──────────────────┬──────────────────┬───────┬───┬──────────────┬─────────────┬───────────┬───────────┐
│ SYSTEME_ELEMENTA ┆ PROCEDURE_END    ┆ NUM_EQU_BIG      ┆ ANNEE ┆ … ┆ SAFETY_CLASS ┆ PRIO_SAFETY ┆ ARRET_NEW ┆ ANNEE_NEW │
│ IRE              ┆ ---              ┆ ---              ┆ ---   ┆   ┆ ---          ┆ ---         ┆ ---       ┆ ---       │
│ ---              ┆ str              ┆ str              ┆ i64   ┆   ┆ str          ┆ str         ┆ str       ┆ str       │
│ str              ┆                  ┆                  ┆       ┆   ┆              ┆             ┆           ┆           │
╞══════════════════╪══════════════════╪══════════════════╪═══════╪═══╪══════════════╪═════════════╪═══════════╪═══════════╡
│ 1025951899328813 ┆ 3717447046043311 ┆ 1171031955136432 ┆ 30    ┆ … ┆ null         ┆ 9           ┆ null      ┆ null      │
│ 7799             ┆ 022              ┆ 7107             ┆       ┆   ┆              ┆             ┆           ┆           │
└──────────────────┴──────────────────┴──────────────────┴───────┴───┴──────────────┴─────────────┴───────────┴───────────┘
cmdlineluser commented 4 days ago

Minimal repro:

import polars as pl

(pl.LazyFrame({'A': [1]})
   .with_columns(B = 2)
   .drop([], strict=False)
   .rename({'A': 'C', 'B': 'A'})
   .drop([], strict=False)
   .collect()
)
# pyo3_runtime.PanicException: 
# called `Result::unwrap()` on an `Err` value: ColumnNotFound(ErrString("C"))