pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.69k stars 1.9k forks source link

Can't `sink_parquet` on a sorted LazyFrame containing decimal columns #17289

Open edthrn opened 3 months ago

edthrn commented 3 months ago

Checks

Reproducible example

Given a very large data set (1b rows) stored on S3:

This works good:

# Works as expected
pl.scan_parquet(
    "s3://.../my_dataset/**/*.parquet"
).filter(
    pl.col("date") < datetime.now() - timedelta(days=120),
).sink_parquet(
    "/tmp/data.parquet"
)

But this doesn't:

# Does not work
pl.scan_parquet(
    "s3://.../my_dataset/**/*.parquet"
).filter(
    pl.col("date") < datetime.now() - timedelta(days=120),
).sort(
    pl.col("value")
).sink_parquet(
    "/tmp/data.parquet"
)

I get the following error:

Log output

POLARS PREFETCH_SIZE: 64
RUN STREAMING PIPELINE
[parquet -> sort -> parquet_sink]
STREAMING CHUNK SIZE: 1388 rows
STREAMING CHUNK SIZE: 1388 rows
...
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
OOC sort started
Temporary directory path in use: /tmp
OOC sort started
OOC sort started
OOC sort started
OOC sort started
OOC sort started
OOC sort started
OOC sort started
OOC sort started
parquet file must be read, statistics not sufficient for predicate.
OOC sort started
OOC sort started
...
OOC sort started
thread '<unnamed>' panicked at crates/polars-core/src/series/series_trait.rs:234:9:
`shrink_to_fit` operation not supported for dtype `decimal[22,10]`
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'thread 'thread 'polars-9polars-27polars-1' panicked at ' panicked at ' panicked at crates/polars-pipe/src/executors/sinks/io.rscrates/polars-pipe/src/executors/sinks/io.rscrates/polars-pipe/src/executors/sinks/io.rs:::271271271:::4949:
49called `Result::unwrap()` on an `Err` value: "SendError(..)":
:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-16' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-5' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
[...]
thread 'polars-3' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-7' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-30' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
[...]
---------------------------------------------------------------------------
PanicException                            Traceback (most recent call last)
Cell In[7], line 9
      1 pl.scan_parquet(
      2     's3://.../*.parquet',
      3     storage_options=options
      4 ).filter(
      5     pl.col("date") < datetime.now() - timedelta(days=120)
      7 ).sort(
      8     pl.col("value")
----> 9 ).sink_parquet(
     10     '/tmp/data.parquet',
     11 )

File ~/venv/lib/python3.11/site-packages/polars/_utils/unstable.py:58, in unstable.<locals>.decorate.<locals>.wrapper(*args, **kwargs)
     55 @wraps(function)
     56 def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
     57     issue_unstable_warning(f"`{function.__name__}` is considered unstable.")
---> 58     return function(*args, **kwargs)

File ~/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:2233, in LazyFrame.sink_parquet(self, path, compression, compression_level, statistics, row_group_size, data_pagesize_limit, maintain_order, type_coercion, predicate_pushdown, projection_pushdown, simplify_expression, slice_pushdown, no_optimization)
   2225 elif statistics == "full":
   2226     statistics = {
   2227         "min": True,
   2228         "max": True,
   2229         "distinct_count": True,
   2230         "null_count": True,
   2231     }
-> 2233 return lf.sink_parquet(
   2234     path=normalize_filepath(path),
   2235     compression=compression,
   2236     compression_level=compression_level,
   2237     statistics=statistics,
   2238     row_group_size=row_group_size,
   2239     data_pagesize_limit=data_pagesize_limit,
   2240     maintain_order=maintain_order,
   2241 )

PanicException: called `Result::unwrap()` on an `Err` value: "SendError(..)"

Issue description

I stumbled upon #16603 and tried the POLARS_ACTIVATE_DECIMAL=1 hack.

It was necessary for the first (unsorted) sample code to work, but it is apparently not sufficient for the sorted code sample to work.

I tested with both versions 0.20.31 and 1.0.0rc2: Same results.

EDIT: also tested on 1.0.0 with same results

Expected behavior

I expected the lazy scan/filter/sort/sink to work as good as scan/filter/sink.

Installed versions

``` --------Version info--------- Polars: 0.20.31 Index type: UInt32 Platform: Linux-4.19.0-27-cloud-amd64-x86_64-with-glibc2.28 Python: 3.11.0 (main, May 15 2024, 19:44:29) [GCC 8.3.0] ----Optional dependencies---- adbc_driver_manager: cloudpickle: connectorx: deltalake: fastexcel: fsspec: 2024.6.1 gevent: hvplot: matplotlib: nest_asyncio: 1.6.0 numpy: 2.0.0 openpyxl: pandas: 2.2.2 pyarrow: 16.1.0 pydantic: pyiceberg: pyxlsb: sqlalchemy: 2.0.31 torch: xlsx2csv: xlsxwriter: ```
edthrn commented 3 months ago

I just tested with a smaller dataset, ie instead of scanning all ~17k files, I only scan the first 50...

And it works :thinking:

Does it mean that the problem comes from the data itself (eg, a null value or something similar)? In that case, it's still odd that the unsorted version works as expected...

edthrn commented 3 months ago

I managed to scan/filter/sort/sink the whole dataset by processing it by batches of 500 source files.

for i, batch in enumerate(batched(s3_urls, batch_size=500)):
    pl.scan_parquet(
        batch,
    ).filter(
        pl.col("date") < datetime.now() - timedelta(days=120)
    ).sort(
        pl.col("value")
    ).sink_parquet(
        f'/tmp/data_{i}.parquet',
    )

Hence the supposition I gave above can be ruled out: it's not a data value/data type problem.


Minor problem now: I now have 34 parquet files at the end of the process (knowing that I have 17k source files in total), instead of a single large one.

lostmygithubaccount commented 3 months ago

I'm hitting something similar after upgrading to Polars v1.0.0 (note: I am using polars-u64-idx)

thread '<unnamed>' panicked at crates/polars-core/src/series/series_trait.rs:234:9:
`shrink_to_fit` operation not supported for dtype `decimal[15,2]`
thread 'polars-5' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-4' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-3' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"thread 'polars-6
' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-8' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-9' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-10' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
---------------------------------------------------------------------------
PanicException                            Traceback (most recent call last)
Cell In[7], [line 5](vscode-notebook-cell:?execution_count=7&line=5)
      [1](vscode-notebook-cell:?execution_count=7&line=1) df = pl.scan_parquet(data)
      [2](vscode-notebook-cell:?execution_count=7&line=2) (
      [3](vscode-notebook-cell:?execution_count=7&line=3)     df.sort(pl.col("l_orderkey"), pl.col("l_partkey"), pl.col("l_suppkey"))
      [4](vscode-notebook-cell:?execution_count=7&line=4)     .head(3)
----> [5](vscode-notebook-cell:?execution_count=7&line=5)     .collect(streaming=True)
      [6](vscode-notebook-cell:?execution_count=7&line=6) )

File ~/repos/ibis/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:1942, in LazyFrame.collect(self, type_coercion, predicate_pushdown, projection_pushdown, simplify_expression, slice_pushdown, comm_subplan_elim, comm_subexpr_elim, cluster_with_columns, no_optimization, streaming, background, _eager, **_kwargs)
   [1939](https://file+.vscode-resource.vscode-cdn.net/Users/cody/repos/ibis/~/repos/ibis/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:1939) # Only for testing purposes atm.
   [1940](https://file+.vscode-resource.vscode-cdn.net/Users/cody/repos/ibis/~/repos/ibis/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:1940) callback = _kwargs.get("post_opt_callback")
-> [1942](https://file+.vscode-resource.vscode-cdn.net/Users/cody/repos/ibis/~/repos/ibis/venv/lib/python3.11/site-packages/polars/lazyframe/frame.py:1942) return wrap_df(ldf.collect(callback))

PanicException: called `Result::unwrap()` on an `Err` value: "SendError(..)"

from:

df = pl.scan_parquet(data)
(
    df.sort(pl.col("l_orderkey"), pl.col("l_partkey"), pl.col("l_suppkey"))
    .head(3)
    .collect(streaming=True)
)

where data points to ~275GB of Parquet files

interestingly before upgrading I was hitting #17281 on this operation

edthrn commented 3 months ago

I confirm that I still get the issue after upgrading to v1.0.0