pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.37k stars 1.86k forks source link

`.with_row_count()` causing: sink_parquet not yet supported in standard engine. Use `'collect().write_parquet()'` #9740

Closed cmdlineluser closed 2 months ago

cmdlineluser commented 1 year ago

Polars version checks

Issue description

The following example works without the .with_row_count()

I tried show_graph(streaming=true) mentioned in https://github.com/pola-rs/polars/issues/6603#issuecomment-1411546372 which prints:

Cannot combine 'streaming' with 'common_subplan_elimination'. CSE will be turned off.

I'm not sure how to know if something is unsupported from the graph:

Reproducible example

import datetime
import polars as pl

df_lab = pl.LazyFrame(
    {'ID_1': [1, 1, 1, 2, 3],
     'ID_2': [1, 1, 2, 1, 1],
     'Timestamp': [datetime.datetime(1984, 5, 11, 14, 30),
      datetime.datetime(1984, 5, 11, 15, 30),
      datetime.datetime(1990, 12, 11, 9, 10),
      datetime.datetime(1975, 1, 8, 23, 23),
      datetime.datetime(1984, 5, 11, 14, 30)],
     'Event': [0, 1, 0, 1, 1]}
)

df_event = pl.LazyFrame(
    {'ID_1': [1, 1, 1, 2, 3],
     'ID_2': [1, 1, 1, 1, 1],
     'Timestamp_Lab': [datetime.datetime(1984, 5, 11, 14, 0),
      datetime.datetime(1984, 5, 11, 14, 15),
      datetime.datetime(1984, 5, 11, 15, 0),
      datetime.datetime(1975, 1, 8, 20, 0),
      datetime.datetime(1984, 5, 11, 14, 0)],
     'Result': [1, 2, 3, 4, 5]}
)

df_lab.with_row_count().join(df_event, on=['ID_1', 'ID_2']).sink_parquet('moo.parquet')

# thread '<unnamed>' panicked at 'sink_parquet not yet supported in standard engine. 
# Use 'collect().write_parquet()'', 
# /Users/user/git/polars/polars/polars-lazy/src/physical_plan/planner/lp.rs:153:28
# PanicException: sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'

Expected behavior

No exception.

Installed versions

``` Polars: 0.18.5 ```
cmdlineluser commented 1 year ago

So I realized that row_nr wasn't actually needed in later on in the query.

Removing it I got a bit further, this runs with .collect(streaming=True) but raises the same sink_parquet exception:

import polars as pl
import datetime

df_lab = pl.LazyFrame(
    {
        "ID_1": [1, 1, 1, 2, 3],
        "ID_2": [1, 1, 1, 1, 1],
        "Timestamp_Lab": [
            datetime.datetime(1984, 5, 11, 14, 0),
            datetime.datetime(1984, 5, 11, 14, 15),
            datetime.datetime(1984, 5, 11, 15, 0),
            datetime.datetime(1975, 1, 8, 20, 0),
            datetime.datetime(1984, 5, 11, 14, 0),
        ],
        "Hemoglobin": [None, 14.0, 13.0, 10.0, 11.0],
        "Leukocytes": [123.0, None, 123.0, 50.0, 110.0],
        "Platelets": [50, 50, 50, 110, 50],
    }
)

df_event = pl.LazyFrame(
    {
        "ID_1": [1, 1, 1, 2, 3],
        "ID_2": [1, 1, 2, 1, 1],
        "Timestamp": [
            datetime.datetime(1984, 5, 11, 14, 30),
            datetime.datetime(1984, 5, 11, 15, 30),
            datetime.datetime(1990, 12, 11, 9, 10),
            datetime.datetime(1975, 1, 8, 23, 23),
            datetime.datetime(1984, 5, 11, 14, 30),
        ],
        "Event": [0, 1, 0, 1, 1],
    }
)

df = df_lab.join(df_event, on=["ID_1", "ID_2"])

out = (
    df.with_columns(
        pl.when(pl.col("Timestamp") > pl.col("Timestamp_Lab")).then(
            pl.col("Timestamp_Lab", "Hemoglobin", "Leukocytes", "Platelets")
        )
    )
    .groupby("Timestamp")
    .agg(
        pl.col("ID_1", "ID_2", "Event").first(),
        pl.col("Hemoglobin", "Leukocytes", "Platelets"),
    )
)

print(out.collect(streaming=True))
# shape: (3, 7)
# ┌─────────────────────┬──────┬──────┬───────┬──────────────────────┬────────────────────────┬────────────────┐
# │ Timestamp           ┆ ID_1 ┆ ID_2 ┆ Event ┆ Hemoglobin           ┆ Leukocytes             ┆ Platelets      │
# │ ---                 ┆ ---  ┆ ---  ┆ ---   ┆ ---                  ┆ ---                    ┆ ---            │
# │ datetime[μs]        ┆ i64  ┆ i64  ┆ i64   ┆ list[f64]            ┆ list[f64]              ┆ list[i64]      │
# ╞═════════════════════╪══════╪══════╪═══════╪══════════════════════╪════════════════════════╪════════════════╡
# │ 1984-05-11 14:30:00 ┆ 1    ┆ 1    ┆ 0     ┆ [null, 14.0, … 11.0] ┆ [123.0, null, … 110.0] ┆ [50, 50, … 50] │
# │ 1975-01-08 23:23:00 ┆ 2    ┆ 1    ┆ 1     ┆ [10.0]               ┆ [50.0]                 ┆ [110]          │
# │ 1984-05-11 15:30:00 ┆ 1    ┆ 1    ┆ 1     ┆ [null, 14.0, 13.0]   ┆ [123.0, null, 123.0]   ┆ [50, 50, 50]   │
# └─────────────────────┴──────┴──────┴───────┴──────────────────────┴────────────────────────┴────────────────┘

out.sink_parquet("moo.parquet")
# thread '<unnamed>' panicked at 'sink_parquet not yet supported in standard engine. 
# Use 'collect().write_parquet()'', 
# /Users/user/git/polars/polars/polars-lazy/src/physical_plan/planner/lp.rs:153:28
# PanicException: sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'

It seems that the list columns are the problem?

cmdlineluser commented 1 year ago

Interestingly, I found .map(streamable=True) from: https://github.com/pola-rs/polars/issues/9337#issuecomment-1586798879 which allows the .sink_parquet from the previous comment to succeed.

out = (
    df.with_columns(
        pl.when(pl.col("Timestamp") > pl.col("Timestamp_Lab")).then(
            pl.col("Timestamp_Lab", "Hemoglobin", "Leukocytes", "Platelets")
        )
    )
    .map(lambda df:
       df.groupby("Timestamp")
         .agg(
            pl.col("ID_1", "ID_2", "Event").first(),
            pl.col("Hemoglobin", "Leukocytes", "Platelets")
         ),
         streamable = True,
         schema = {
            "Timestamp": pl.Datetime("us"),
            "ID_1": pl.Int64,
            "ID_2": pl.Int64,
            "Event": pl.Int64,
            "Hemoglobin": pl.List(pl.Float64),
            "Leukocytes": pl.List(pl.Float64),
            "Platelets": pl.List(pl.Int64),
        }
    ) 
)

out.sink_parquet("moo.parquet")

Not exactly sure what the differences are.

The final step was supposed to be a .list.to_struct() + .unnest() but it seems .list.to_struct() isn't supported by the streaming engine as of yet.

I guess it's similar to the initial .with_row_count(), generating counts/lengths in this type of mode is actually quite a complex issue.