pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.69k stars 1.9k forks source link

RecordBatch requires all its arrays to have an equal number of rows when running pipeline in streaming mode #18599

Open EpicUsaMan opened 1 month ago

EpicUsaMan commented 1 month ago

Checks

Reproducible example

Can't guarantee any reproducible example, because it happens only on really large frames (close to memory limit of 256 GB)

Log output

thread '<unnamed>' panicked at /home/runner/work/polars/polars/crates/polars-arrow/src/record_batch.rs:22:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("RecordBatch requires all its arrays to have an equal number of rows"))
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'polars-5' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-51' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-2' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-8' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-54' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-44' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-15' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-50' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-45' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-7' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-40' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-48' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-26' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-49' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-16' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-36' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-13' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-12' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-4' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"
thread 'polars-18' panicked at crates/polars-pipe/src/executors/sinks/io.rs:271:49:
called `Result::unwrap()` on an `Err` value: "SendError(..)"

Issue description

This error happen ONLY when running query with streaming=True and starts to appear after upgrade to polars 1.6.0

my flow is: scan_parquet -> filter -> sort -> cum_sum / ewm_mean -> group_by

(Happens almost immediately after running pipeline, so looks like falling during scan/sort)

Expected behavior

Streaming to work exactly the same as regular pipeline

Installed versions

``` --------Version info--------- Polars: 1.6.0 Index type: UInt32 Platform: Linux-6.8.0-41-generic-x86_64-with-glibc2.39 Python: 3.12.3 (main, Jul 31 2024, 17:43:48) [GCC 13.2.0] ----Optional dependencies---- adbc_driver_manager altair cloudpickle 3.0.0 connectorx deltalake fastexcel fsspec 2024.6.1 gevent 24.2.1 great_tables matplotlib nest_asyncio 1.6.0 numpy 1.26.4 openpyxl pandas 2.2.2 pyarrow 16.1.0 pydantic pyiceberg sqlalchemy 2.0.31 torch 2.3.1+rocm6.0 xlsx2csv xlsxwriter ```
ritchie46 commented 1 month ago

Without a reproducable example this isn't something we can fix sadly. :/

Though we are making a whole new engine and removing the old streaming one, so it might get fixed implicitly.

EpicUsaMan commented 1 month ago

Without a reproducable example this isn't something we can fix sadly. :/

Though we are making a whole new engine and removing the old streaming one, so it might get fixed implicitly.

It's actually just somehow in streaming we are losing error, which I described here, but still can't prepare minimal example It produce a bug only on very large pipelines during streaming Removing .over() by columns with only 1 value fixes the problem

So, I made another ticket: https://github.com/pola-rs/polars/issues/18600

Jure-BB commented 1 month ago

I encountered the same error while trying to process a Parquet file with LazyFrame (not sure, if it's related to OP's issue). Happens with and without streaming.

Data source: Overture maps - addresses (parquet format).

Parquet Schema (click to expand) | column\_name | column\_type | null | key | default | extra | | :--- | :--- | :--- | :--- | :--- | :--- | | id | VARCHAR | YES | null | null | null | | geometry | BLOB | YES | null | null | null | | bbox | STRUCT\(xmin FLOAT, xmax FLOAT, ymin FLOAT, ymax FLOAT\) | YES | null | null | null | | country | VARCHAR | YES | null | null | null | | postcode | VARCHAR | YES | null | null | null | | street | VARCHAR | YES | null | null | null | | number | VARCHAR | YES | null | null | null | | unit | VARCHAR | YES | null | null | null | | address\_levels | STRUCT\("value" VARCHAR\)\[\] | YES | null | null | null | | version | INTEGER | YES | null | null | null | | sources | STRUCT\(property VARCHAR, dataset VARCHAR, record\_id VARCHAR, update\_time VARCHAR, confidence DOUBLE\)\[\] | YES | null | null | null | | type | VARCHAR | YES | null | null | null |

To download test file to the current directory: aws s3 cp s3://overturemaps-us-west-2/release/2024-07-22.0/theme=addresses/type=address/part-00000-a1dedcdb-edf7-42c4-aea4-87ddc4d97b65-c000.zstd.parquet ./ --no-sign-request

Example:

(I've trimmed full paths to just filenames)

let in_file_path = "part-00000-a1dedcdb-edf7-42c4-aea4-87ddc4d97b65-c000.zstd.parquet";
let lf = LazyFrame::scan_parquet(in_file_path, ScanArgsParquet::default()).unwrap();
let lf = lf.sort(["country", "postcode", "street", "number", "unit"],
                SortMultipleOptions::default());

let out_file_path = "sorted.parquet";
let mut write_options = ParquetWriteOptions::default();
write_options.compression = ParquetCompression::Lz4Raw;
lf.with_streaming(true).sink_parquet(out_file_path, write_options).unwrap();

Error:

panicked at ...\polars-arrow-0.43.1\src\record_batch.rs:22:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("RecordBatch requires all its arrays to have an equal number of rows"))

OS: Windows