pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.9k stars 1.92k forks source link

pl.col().diff() causes error when trying to .sink_parquet() #13670

Open niccolopetti opened 9 months ago

niccolopetti commented 9 months ago

Checks

Reproducible example

pl.LazyFrame( {'a': [1, 3, 8]})\
.with_columns(
        (pl.col("a").diff()).alias("b"),
    ).sink_parquet("a.parquet") 
pl.read_parquet("a.parquet")
### Traceback ``` --------------------------------------------------------------------------- InvalidOperationError Traceback (most recent call last) Cell In[58], [line 4](vscode-notebook-cell:?execution_count=58&line=4) [1](vscode-notebook-cell:?execution_count=58&line=1) pl.LazyFrame( {'a': [1, 3, 8]})\ [2](vscode-notebook-cell:?execution_count=58&line=2) .with_columns( [3](vscode-notebook-cell:?execution_count=58&line=3) (pl.col("a").diff()).alias("b"), ----> [4](vscode-notebook-cell:?execution_count=58&line=4) ).sink_parquet("a.parquet") [5](vscode-notebook-cell:?execution_count=58&line=5) pl.read_parquet("a.parquet") File [~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1991](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1991), in LazyFrame.sink_parquet(self, path, compression, compression_level, statistics, row_group_size, data_pagesize_limit, maintain_order, type_coercion, predicate_pushdown, projection_pushdown, simplify_expression, slice_pushdown, no_optimization) [1925](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1925) """ [1926](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1926) Evaluate the query in streaming mode and write to a Parquet file. [1927](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1927) (...) [1980](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1980) [1981](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1981) """ [1982](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1982) lf = self._set_sink_optimizations( [1983](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1983) type_coercion=type_coercion, [1984](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1984) predicate_pushdown=predicate_pushdown, (...) [1988](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1988) no_optimization=no_optimization, [1989](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1989) ) -> [1991](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1991) return lf.sink_parquet( [1992](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1992) path=normalize_filepath(path), [1993](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1993) compression=compression, [1994](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1994) compression_level=compression_level, [1995](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1995) statistics=statistics, [1996](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1996) row_group_size=row_group_size, [1997](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1997) data_pagesize_limit=data_pagesize_limit, [1998](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1998) maintain_order=maintain_order, [1999](https://file+.vscode-resource.vscode-cdn.net/path_to_proj_root/~/.local/lib/python3.10/site-packages/polars/lazyframe/frame.py:1999) ) InvalidOperationError: sink_Parquet(ParquetWriteOptions { compression: Zstd(None), statistics: false, row_group_size: None, data_pagesize_limit: None, maintain_order: true }) not yet supported in standard engine. Use 'collect().write_parquet()' ```

Log output

No response

Issue description

pl.col().diff() causes error when trying to .sink_parquet(), however the query is supported in streaming mode, below is the execution graph: image this issue resembles #9337 and #9740 but can't be solved via

My current workaround is:

df = df.map(
    lambda df: df.with_columns(
        pl.when(pl.col("a") >= 2).then(pl.col("a")).otherwise(None).alias("b"),
    ),
    streamable=True,
    schema={
        "a": pl.Int64,
        "b": pl.Int64,
    }
)

It works in streaming mode, but requires full schema definition which might be huge in practical cases.

Originally posted by @elephantum in https://github.com/pola-rs/polars/issues/9337#issuecomment-1586798879

Expected behavior

Running the same query in streaming mode calling collect() works, so I would expect sink_parquet to work too:

 pl.LazyFrame( {'a': [1, 3, 8]})\
.with_columns(
        (pl.col("a").diff()).alias("b"),
    ).collect(streaming=True)

this produces the expected output:

shape: (3, 2)
┌─────┬──────┐
│ a   ┆ b    │
│ --- ┆ ---  │
│ i64 ┆ i64  │
╞═════╪══════╡
│ 1   ┆ null │
│ 3   ┆ 2    │
│ 8   ┆ 5    │
└─────┴──────┘

Installed versions

``` --------Version info--------- Polars: 0.20.3 Index type: UInt32 Platform: Linux-6.6.6-76060606-generic-x86_64-with-glibc2.35 Python: 3.10.13 (main, Sep 11 2023, 13:44:35) [GCC 11.2.0] ----Optional dependencies---- adbc_driver_manager: cloudpickle: 2.2.1 connectorx: deltalake: fsspec: 2023.9.2 gevent: hvplot: matplotlib: numpy: 1.26.3 openpyxl: 3.1.2 pandas: 1.5.3 pyarrow: 13.0.0 pydantic: pyiceberg: pyxlsb: sqlalchemy: 2.0.16 xlsx2csv: xlsxwriter: ```
ritchie46 commented 9 months ago

The query is not supported in FULL streaming mode. Only partiually.

niccolopetti commented 9 months ago

The query is not supported in FULL streaming mode. Only partiually.

thanks for the answer, any workarounds to accomplish it? Is it planned to be supported in full streaming mode?

for my real use case I was thinking of doing this not in streaming mode, with a select with only that value and the primary key, then process the other columns in streaming fashion and then just doing a join between two .parquet files in streaming mode, any better solutions?

niccolopetti commented 9 months ago

The query is not supported in FULL streaming mode. Only partiually.

thanks for the answer, any workarounds to accomplish it? Is it planned to be supported in full streaming mode?

for my real use case I was thinking of doing this not in streaming mode, with a select with only that value and the primary key, then process the other columns in streaming fashion and then just doing a join between two .parquet files in streaming mode, any better solutions?

update, also the solution I had thought which was based on precomputing the column b not on streaming fashion and then do

pl.concat([
    pl.LazyFrame( {'a': [1, 3, 8]}),
    pl.LazyFrame( {'b': [2, 4, 9]})], how="horizontal"
    ).sink_parquet("a.parquet") 

doesn't work, giving always

InvalidOperationError: sink_Parquet(ParquetWriteOptions { compression: Zstd(None), statistics: false, row_group_size: None, data_pagesize_limit: None, maintain_order: true }) not yet supported in standard engine. Use 'collect().write_parquet()'

any ideas how to do this? @ritchie46