pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.7k stars 2k forks source link

Anti, Semi, and Outer joins do not work with sink_parquet. Inner, Left and Cross joins do just fine #13995

Closed cdmoye closed 10 months ago

cdmoye commented 10 months ago

Checks

Reproducible example

import polars as pl
import polars.exceptions

if __name__ == "__main__":
    lf1 = pl.LazyFrame({'a': [1,2]})
    lf2 = pl.LazyFrame({'a': [1,3]})
    try:
        lf1.join(lf2, on='a', how='inner').sink_parquet('test.parquet')
        print('inner works')
    except polars.exceptions.InvalidOperationError:
        print('inner does not work')
    try:
        lf1.join(lf2, on='a', how='left').sink_parquet('test.parquet')
        print('left works')
    except polars.exceptions.InvalidOperationError:
        print('left does not work')
    try:
        lf1.join(lf2, on='a', how='outer').sink_parquet('test.parquet')
        print('outer works')
    except polars.exceptions.InvalidOperationError:
        print('outer does not work')
    try:
        lf1.join(lf2, on='a', how='semi').sink_parquet('test.parquet')
        print('semi works')
    except polars.exceptions.InvalidOperationError:
        print('semi does not work')
    try:
        lf1.join(lf2, on='a', how='anti').sink_parquet('test.parquet')
        print('anti works')
    except polars.exceptions.InvalidOperationError:
        print('anti does not work')
    try:
        lf1.join(lf2, on='a', how='cross').sink_parquet('test.parquet')
        print('cross works')
    except polars.exceptions.InvalidOperationError:
        print('cross does not work')

Log output

RUN STREAMING PIPELINE
df -> generic_join_build
RefCell { value: [df -> placeholder -> parquet_sink] }
RUN STREAMING PIPELINE
df -> generic_join_build
RefCell { value: [df -> placeholder -> parquet_sink] }
RUN STREAMING PIPELINE
df -> cross_join_sink
RefCell { value: [df -> placeholder -> parquet_sink] }

Issue description

When attempting particular types of joins on two LazyFrames followed by a sink_parquet, an InvalidOperationException is thrown that states that the operation is not yet supported in the standard engine.

The code sample above gives the following stdout:

inner works
left works
outer does not work
semi does not work
anti does not work
cross works

Expected behavior

sink_parquet should work regardless of join type

Installed versions

``` --------Version info--------- Polars: 0.20.5 Index type: UInt32 Platform: Linux-6.1.72-96.166.amzn2023.x86_64-x86_64-with-glibc2.34 Python: 3.11.7 | packaged by conda-forge | (main, Dec 23 2023, 14:43:09) [GCC 12.3.0] ----Optional dependencies---- adbc_driver_manager: cloudpickle: connectorx: deltalake: fsspec: gevent: hvplot: matplotlib: numpy: 1.26.3 openpyxl: pandas: pyarrow: pydantic: pyiceberg: pyxlsb: sqlalchemy: xlsx2csv: xlsxwriter: ```
ritchie46 commented 10 months ago

We know this. No need for an issue for everything that is not implemented.

cdmoye commented 10 months ago

We know this. No need for an issue for everything that is not implemented.

Thanks for the response, I love the tool!

It's hard for users to know what's just not yet implemented and what's implemented, but has a bug in it. If the docs for either sink_parquet or for join mentioned this, then I wouldn't have opened the issue.

polars.exceptions.InvalidOperationError: sink_Parquet(ParquetWriteOptions { compression: Zstd(None), statistics: false, row_group_size: None, data_pagesize_limit: None, maintain_order: true }) not yet supported in standard engine. Use 'collect().write_parquet()'

That exception doesn't specify what is not yet supported in the engine which made it unclear to me whether it was the join, or some other part of my expressions that were not supported. Once I narrowed it down to the join, I checked the docs and saw no mention of it. So, I checked other types of joins and they worked fine. Hence the rationale that it was a bug.

So, while there's certainly no need for an issue for everything that is not implemented, if issue creation is to be avoided it should be made more clear what is known to not be implemented--particularly when it's only a few things about an expression.

cmdlineluser commented 10 months ago

Can anybody confirm if .profile() is a way to detect what parts of a query are supported?

lf1.join(lf2, on='a', how='semi').profile(streaming=True)
# (shape: (1, 1)
#  ┌─────┐
#  │ a   │
#  │ --- │
#  │ i64 │
#  ╞═════╡
#  │ 1   │
#  └─────┘,
#  shape: (4, 3)
#  ┌──────────────┬───────┬──────┐
#  │ node         ┆ start ┆ end  │
#  │ ---          ┆ ---   ┆ ---  │
#  │ str          ┆ u64   ┆ u64  │
#  ╞══════════════╪═══════╪══════╡
#  │ optimization ┆ 0     ┆ 55   │
#  │ STREAMING    ┆ 47    ┆ 213  │
#  │              ┆       ┆      │
#  │ STREAMING    ┆ 55    ┆ 156  │
#  │              ┆       ┆      │
#  │ join(a)      ┆ 254   ┆ 1046 │ # <- does this indicate the `join` is not supported?
#  └──────────────┴───────┴──────┘)
lf1.join(lf2, on='a', how='left').profile(streaming=True)
# (shape: (2, 1)
#  ┌─────┐
#  │ a   │
#  │ --- │
#  │ i64 │
#  ╞═════╡
#  │ 1   │
#  │ 2   │
#  └─────┘,
#  shape: (2, 3)
#  ┌──────────────┬───────┬─────┐
#  │ node         ┆ start ┆ end │
#  │ ---          ┆ ---   ┆ --- │
#  │ str          ┆ u64   ┆ u64 │
#  ╞══════════════╪═══════╪═════╡
#  │ optimization ┆ 0     ┆ 2   │
#  │ STREAMING    ┆ 2     ┆ 307 │
#  │              ┆       ┆     │
#  └──────────────┴───────┴─────┘)