pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
28.07k stars 1.73k forks source link

Regression in latest polars, query OOMs #16118

Open knl opened 2 months ago

knl commented 2 months ago

Checks

Reproducible example

Unfortunately, I can't provide a minimal example, as I deal with large amounts of data I can't share, and this problem is only visible in such cases.

Log output

No response

Issue description

I noticed that after upgrading to polars 0.20.24 my large query started getting killed by OOM, on a 2TB machine. Previously, the query worked fine, consuming less than 200GB. The runs look like this:

The query looks like:

h_types = (
    pl.from_dicts([
        {'st': 'FOPT',       'tt': 'RETAIL',        'h_type': 'direct'},
        {'st': 'FOPT',       'tt': 'LAST',          'h_type': 'direct2'},
        {'st': 'FIND',    'tt': 'LAST',          'h_type': 'indirect'},
        {'st': 'FIND',    'tt': 'EX', 'h_type': 'external'},
        {'st': 'FIND_M', 'tt': 'EX', 'h_type': 'multi_external'},
    ])
    .with_columns(
        pl.col('st').cast(pl.Categorical),
        pl.col('tt').cast(pl.Categorical),
    )
    .lazy()
)

def assign_h_type(df: pl.LazyFrame) -> pl.LazyFrame:
    return df.join(
        h_types,
        on=['st', 'tt'],
        how='left'
    ).with_columns(pl.col('h_type').fill_null('passive').cast(pl.Categorical))

data = (
    read_parquet(f'/parquet/rawdata/orders/date={date}/', partitioning='hive')
    .filter(
        pl.col('d').str.starts_with('omm')&
        (pl.col('class_') == 'Created')&
        (pl.col('o_num').is_not_null())
    )
    .select(
        pl.col('o_num').cast(pl.UInt64),
        pl.col('id_'),
        pl.col('d').cast(pl.Categorical),
        pl.col('app_').cast(pl.Categorical),
    )
    .unique()
    .sort('o_num')
    .with_columns(
        (pl.col('o_num') // 1e14 - 9).alias('sid').cast(pl.UInt8),
    )
    .join(
        read_parquet(f'/parquet/rawdata/entry/date={date}/', partitioning='hive')
        .filter(
            pl.col('d').str.starts_with('omm')&
            pl.col('blocked').is_null()&
            pl.col('tsn').is_not_null()
            #pl.col('app_').str.starts_with('h_')
        )
        .select(
            pl.col('oid_').alias('id_'),
            pl.col('d').cast(pl.Categorical),
            pl.col('app_').cast(pl.Categorical),
            pl.col('tsn').cast(pl.UInt64),
            pl.col('st').cast(pl.Categorical),
            pl.col('tt').cast(pl.Categorical),
            pl.col('success_'),
            pl.col('portf').str.strip_suffix('_XK').str.extract("^(\w*ZZZ(_BM)?)", 1).fill_null('EO').cast(pl.Categorical),
        )
        .pipe(assign_h_type)
        .drop('st', 'tt'),
        on=['id_', 'd', 'app_'],
        how='inner',
    )
    .select(
        'sid',
        'tsn',
        'o_num',
        'h_type',
        'success_',
        'portf',
    )
    .sort('sid', 'tsn')
    .cache()
)

opps = (
    data
    .filter(pl.col('h_type') == 'direct')
    .select(
        'sid',
        'tsn',
        'portf',
    )
    .unique()
    .sort('sid', 'tsn')
    .collect(streaming=True)
)

If, per https://github.com/pola-rs/polars/issues/15795, I put collect before .filter in opps, I get OOM even for 0.20.16.

Expected behavior

I would expect that the recent versions finish without OOM.

Installed versions

``` Replace this line with the output of pl.show_versions(). Leave the backticks in place. ```
ritchie46 commented 2 months ago

Is there a minimal example that shows a memory increase? I do need something with syntetic data to be able to understand what happens. It doesn't have to OOM, just be a similar query.