pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.5k stars 1.87k forks source link

Streaming and non-streaming joins don't give the same output #19040

Open etiennebacher opened 3 hours ago

etiennebacher commented 3 hours ago

Checks

Reproducible example

import polars as pl

df_A = pl.LazyFrame({
  "K": [True, False],
  "A": [1, 1]
})

df_B = pl.LazyFrame({
  "K": [True],
  "B": [1]
})

df_C = pl.LazyFrame({
  "K": [True],
  "C": [1]
})

df_ABC = df_A.join(
    df_B, how = 'full', on = ["K"], coalesce = True
  ).join(
    df_C, how = 'full', on = ["K"], coalesce = True
  ).with_columns(
    B = pl.col('B')
  )

file_abc = 'file_abc.csv'
file_abc_collect = 'file_abc_collect.csv'

df_ABC.sink_csv(file_abc)
df_ABC.collect().write_csv(file_abc_collect)

df_read = pl.read_csv(file_abc)
df_read_c = pl.read_csv(file_abc_collect)

print(df_read.height) # returns 1, but should be 2
print(df_read_c.height) # returns 2

Log output

RUN STREAMING PIPELINE
[df -> callback -> hstack -> parquet_sink, df -> callback -> generic_join_build, df -> generic_join_build]
join parallel: true
join parallel: true
FULL join dataframes finished
FULL join dataframes finished
file < 128 rows, no statistics determined
no. of chunks: 1 processed by: 1 threads.
file < 128 rows, no statistics determined
no. of chunks: 1 processed by: 1 threads.
1
2

Issue description

In the example above, three LazyFrames are joined. If the resulting LazyFrame is written to a file via sink_csv() (or sink_parquet()) the result is different than first calling collect() and then write_csv(). Namely, when using sink_csv(), some rows are missing in the output.

If the last with_columns() call is removed then using sink_csv() or collect() + write_csv() give the same output.

Originally reported in https://github.com/pola-rs/r-polars/issues/1246 (cc @Columbus240)

Expected behavior

Number of rows should be equivalent using sink_csv() or collect() + write_csv().

Installed versions

``` --------Version info--------- Polars: 1.8.2 Index type: UInt32 Platform: Linux-6.8.0-45-generic-x86_64-with-glibc2.39 Python: 3.12.3 (main, Sep 11 2024, 14:17:37) [GCC 13.2.0] ----Optional dependencies---- adbc_driver_manager altair cloudpickle connectorx deltalake fastexcel fsspec gevent great_tables matplotlib nest_asyncio numpy openpyxl pandas pyarrow pydantic pyiceberg sqlalchemy torch xlsx2csv xlsxwriter ```
cmdlineluser commented 2 hours ago

I think the difference can also be seen without sinking.

df1 = pl.LazyFrame({"a": ["foo"], "b": [1]})
df2 = pl.LazyFrame({"a": ["bar"], "b": [2]})

q = df1.join(df2, how="full", on=["a", "b"]).with_columns(c = 1)

q.collect()
# shape: (2, 5)
# ┌──────┬──────┬─────────┬─────────┬─────┐
# │ a    ┆ b    ┆ a_right ┆ b_right ┆ c   │
# │ ---  ┆ ---  ┆ ---     ┆ ---     ┆ --- │
# │ str  ┆ i64  ┆ str     ┆ i64     ┆ i32 │
# ╞══════╪══════╪═════════╪═════════╪═════╡
# │ null ┆ null ┆ bar     ┆ 2       ┆ 1   │
# │ foo  ┆ 1    ┆ null    ┆ null    ┆ 1   │
# └──────┴──────┴─────────┴─────────┴─────┘

q.collect(streaming=True)
# shape: (1, 5)
# ┌──────┬──────┬─────────┬─────────┬─────┐
# │ a    ┆ b    ┆ a_right ┆ b_right ┆ c   │
# │ ---  ┆ ---  ┆ ---     ┆ ---     ┆ --- │
# │ str  ┆ i64  ┆ str     ┆ i64     ┆ i32 │
# ╞══════╪══════╪═════════╪═════════╪═════╡
# │ null ┆ null ┆ bar     ┆ 2       ┆ 1   │
# └──────┴──────┴─────────┴─────────┴─────┘