pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
26.63k stars 1.63k forks source link

`.merge_sorted()` not working with `streaming=True` #16015

Open cmdlineluser opened 2 weeks ago

cmdlineluser commented 2 weeks ago

Checks

Reproducible example

import polars as pl

lf_a = pl.LazyFrame({"key": [1, 2, 3], "value": ["a", "c", "e"]})
lf_b = pl.LazyFrame({"key": [1, 2, 3], "value": ["b", "d", "f"]})

df = lf_a.merge_sorted(lf_b, key="key").collect(streaming=True)
print(df)

Log output

UNION: union is run in parallel
RUN STREAMING PIPELINE
[df -> union -> ordered_sink, df -> union -> ordered_sink]

Issue description

I'm not sure if this is expected to work on the streaming engine or not.

I just noticed a difference in output when testing a query:

lf_a.merge_sorted(lf_b, key="key").collect()
# shape: (6, 2)
# ┌─────┬───────┐
# │ key ┆ value │
# │ --- ┆ ---   │
# │ i64 ┆ str   │
# ╞═════╪═══════╡
# │ 1   ┆ a     │
# │ 1   ┆ b     │
# │ 2   ┆ c     │
# │ 2   ┆ d     │
# │ 3   ┆ e     │
# │ 3   ┆ f     │
# └─────┴───────┘
lf_a.merge_sorted(lf_b, key="key").collect(streaming=True)
# shape: (6, 2)
# ┌─────┬───────┐
# │ key ┆ value │
# │ --- ┆ ---   │
# │ i64 ┆ str   │
# ╞═════╪═══════╡
# │ 1   ┆ a     │
# │ 2   ┆ c     │
# │ 3   ┆ e     │
# │ 1   ┆ b     │
# │ 2   ┆ d     │
# │ 3   ┆ f     │
# └─────┴───────┘

Expected behavior

Produce same output as streaming=False

Installed versions

``` --------Version info--------- Polars: 0.20.23 Index type: UInt32 Platform: macOS-13.6.1-arm64-arm-64bit Python: 3.12.2 (main, Feb 6 2024, 20:19:44) [Clang 15.0.0 (clang-1500.1.0.2.5)] ----Optional dependencies---- adbc_driver_manager: cloudpickle: connectorx: deltalake: fastexcel: fsspec: gevent: hvplot: matplotlib: nest_asyncio: numpy: 1.26.4 openpyxl: pandas: 2.2.1 pyarrow: 15.0.2 pydantic: pyiceberg: pyxlsb: sqlalchemy: xlsx2csv: xlsxwriter: ```
ritchie46 commented 2 weeks ago

Ah, I know what is happening. We should block the union to run on the streaming engine because I implemented a hack. A better way would be removing the hack and properly implement DataFrame functions with multiple arguments.