pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.16k stars 1.94k forks source link

Problem with join operations followed by sink_csv on LazyFrame #15157

Open PhilippJunk opened 7 months ago

PhilippJunk commented 7 months ago

Checks

Reproducible example

import os

import numpy as np
import polars as pl

rng = np.random.default_rng()

test_dir = "./test_sink"
os.makedirs(test_dir, exist_ok=True)

DATA_SIZE = 10_000

def joins_only(df1, df2):
    overlap_df = df1.join(df2, on=["a", "b"], how="inner")
    size_overlap = overlap_df.select(pl.len()).collect().item()
    print(f"The size of overlap is: {size_overlap}")
    df1 = df1.join(overlap_df, on=["a", "b"], how="anti")
    return df1

dummy_df_1 = pl.DataFrame(
    {
        "a": rng.normal(size=DATA_SIZE),
        "b": rng.normal(size=DATA_SIZE),
        "c": np.full(DATA_SIZE, "a"),
    }
).lazy()

dummy_df_2 = pl.DataFrame(
    {
        "a": rng.normal(size=DATA_SIZE),
        "b": rng.normal(size=DATA_SIZE),
        "c": np.full(DATA_SIZE, "b"),
    }
).lazy()

removed_overlaps = joins_only(dummy_df_1, dummy_df_2)

removed_overlaps.collect().write_csv(os.path.join(test_dir, "write.csv"))
removed_overlaps.sink_csv(os.path.join(test_dir, "sink.csv")) # <- This line raises an error

# does it matter that overlap is empty?
dummy_df_3 = pl.concat([dummy_df_1, dummy_df_2])
removed_overlaps = joins_only(dummy_df_1, dummy_df_3)

The same problem exists if the overlap is not empty:

# does it matter that overlap is empty?
dummy_df_3 = pl.concat([dummy_df_1, dummy_df_2])
removed_overlaps = joins_only(dummy_df_1, dummy_df_3)

removed_overlaps.collect().write_csv(os.path.join(test_dir, "write.csv"))
removed_overlaps.sink_csv(os.path.join(test_dir, "sink.csv")) # <- This line raises an error as well

I originally noticed this after an additional concat operation, which does not error, but silently omits some of the data:

def joins_and_concat(df1, df2):
    overlap_df = df1.join(df2, on=["a", "b"], how="inner")
    df1 = df1.join(overlap_df, on=["a", "b"], how="anti")
    return pl.concat([df1, df2])

concat = joins_and_concat(dummy_df_1, dummy_df_2)
concat.sink_csv(os.path.join(test_dir, "sink.csv"))  # <- no error, but part of output is missing
concat.collect().write_csv(os.path.join(test_dir, "write.csv"))

Log output

join parallel: true
INNER join dataframes finished
join parallel: false
join parallel: false
INNER join dataframes finished
ANTI join dataframes finished
join parallel: false
INNER join dataframes finished
join parallel: false
join parallel: false
INNER join dataframes finished
ANTI join dataframes finished
Traceback (most recent call last):
  File "/home/philipp/work/others/2024-03-19_polars_concat_bug/report.py", line 50, in <module>
    removed_overlaps.sink_csv(os.path.join(test_dir, "join_sink.csv"))
  File "/home/philipp/miniforge3/envs/polars/lib/python3.12/site-packages/polars/_utils/deprecation.py", line 134, in wrapper
    return function(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/philipp/miniforge3/envs/polars/lib/python3.12/site-packages/polars/_utils/deprecation.py", line 134, in wrapper
    return function(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/philipp/miniforge3/envs/polars/lib/python3.12/site-packages/polars/_utils/unstable.py", line 59, in wrapper
    return function(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/philipp/miniforge3/envs/polars/lib/python3.12/site-packages/polars/lazyframe/frame.py", line 2404, in sink_csv
    return lf.sink_csv(
           ^^^^^^^^^^^^
polars.exceptions.InvalidOperationError: sink_Csv(CsvWriterOptions { include_bom: false, include_header: true, batch_size: 1024, maintain_order: true, serialize_options: SerializeOptions { date_format: None, time_format: None, datetime_format: None, float_precision: None, separator: 44, quote_char: 34, null: "", line_terminator: "\n", quote_style: Necessary } }) not yet supported in standard engine. Use 'collect().write_parquet()'

Issue description

sink_csv does not behave as expected after join operations on LazyFrames. In some cases it errors. In other cases, it silently produces different results compared to collect().write_csv()

Expected behavior

df.sink_csv(file) and df.collect().write_csv(file) should lead to the identical output.

Installed versions

``` --------Version info--------- Polars: 0.20.16 Index type: UInt32 Platform: Linux-6.5.0-25-generic-x86_64-with-glibc2.35 Python: 3.12.2 | packaged by conda-forge | (main, Feb 16 2024, 20:50:58) [GCC 12.3.0] ----Optional dependencies---- adbc_driver_manager: cloudpickle: connectorx: deltalake: fastexcel: fsspec: gevent: hvplot: matplotlib: numpy: 1.26.4 openpyxl: pandas: pyarrow: pydantic: pyiceberg: pyxlsb: sqlalchemy: xlsx2csv: xlsxwriter: ```
ritchie46 commented 7 months ago

As the error states, we don't support the full query streaming yet, so sink_csv results in that error.

This is expected behavior. In the future we might resolve the collect() -> write ourselves, but this is expected behavior.

We are working on supporting more of our queries streaming, it is an ongoing process.

PhilippJunk commented 7 months ago

Thanks for clarifying. Is it also expected behavior that the third examples runs without an error, but silently omits anything from the join?

alkment commented 4 months ago

It would be great if an error could be raised when trying to sink a query that's not fully supported instead of generating an incorrect result.

Unsurprisingly this affects sink_parquet too.

sclamons commented 3 months ago

EDIT: This might be different as there's not even a join here, just a concat. Should this be its own issue? Or is it the same underlying problem?

Also ran across this problem, and worked up a minimal example. The streaming engine is probably the biggest draw of polars for me, so I'd really love to see this fixed.


Minimal example

df1 = pl.LazyFrame({"Name": ["A"], "X": [1]})
df2 = pl.LazyFrame({"Name": ["B"], "X": [2]})
merged_df = pl.concat([df1, df2], how="align")
merged_df.sink_csv("sunk.csv")
merged_df.collect().write_csv("written.csv")

Contents of sunk.csv:

Name,X
B,2

Contents of written.csv:

Name,X
A,1
B,2

Interestingly this bug goes away if you omit the how="align" from the join. In that case, sunk.csv is identical to written.csv.

sclamons commented 3 months ago

EDIT: This example doesn't take the same logical path—it's actually equivalent (I think) to my above example without how="align", so no surprise it works.

One more detail -- the Rust interface doesn't replicate the issue with the same logical setup:

use polars::prelude::*;

fn main() {

    let sink_df = make_concat_lazy_example();
    sink_df.sink_csv("out/sunk.csv", CsvWriterOptions::default());

    let write_df = make_concat_lazy_example();
    let mut write_file = std::fs::File::create("out/written.csv").unwrap();
    CsvWriter::new(&mut write_file).finish(&mut write_df.collect().unwrap());
}

fn make_concat_lazy_example() -> LazyFrame {
    let df1 = df![
        "Name" => ["A"],
        "X"    => [1]
    ].unwrap().lazy();
    let df2 = df![
        "Name" => ["B"],
        "X"    => [2]
    ].unwrap().lazy();

    let merged_df = concat(
        [df1, df2], 
        UnionArgs::default()
    ).unwrap();
    merged_df
}

This produces identical CSVs containing both rows.

cmdlineluser commented 3 months ago

@sclamons align is a wrapper around full joins:

https://github.com/pola-rs/polars/blob/64b45a89813ceb25532d2678f9e9d5b85659b35e/py-polars/polars/functions/eager.py#L163

It looks like coalesce() is causing the issue on the streaming engine, without it - both rows are present.

(df1.join(df2, how="full", on=["Name", "X"], suffix="_PL_CONCAT_RIGHT")
    .with_columns(
       pl.coalesce([name, f"{name}_PL_CONCAT_RIGHT"])
       for name in ["Name", "X"]
    )
    .collect(streaming=True)
)

# shape: (1, 4)
# ┌──────┬─────┬──────────────────────┬───────────────────┐
# │ Name ┆ X   ┆ Name_PL_CONCAT_RIGHT ┆ X_PL_CONCAT_RIGHT │
# │ ---  ┆ --- ┆ ---                  ┆ ---               │
# │ str  ┆ i64 ┆ str                  ┆ i64               │
# ╞══════╪═════╪══════════════════════╪═══════════════════╡
# │ B    ┆ 2   ┆ B                    ┆ 2                 │
# └──────┴─────┴──────────────────────┴───────────────────┘

I think your Rust example is just doing a default vertical concat, so it's not equivalent to the Python repro?

sclamons commented 3 months ago

@cmdlineluser Yes, you're right—the Rust example isn't taking the how="align" path, so no surprise it works.