pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
29.72k stars 1.9k forks source link

ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows")) when `streaming=True` #13059

Closed jmakov closed 7 months ago

jmakov commented 10 months ago

Checks

Reproducible example

# works if (ts_ns_to - ts_ns_from) is about 1 month, if more, panics
# works with `streaming=False`
source = polars.scan_parquet("/parquet_partitioned_dataset_path/*/*/*/*.parquet") \
    .filter(
        polars.col("timestamp").is_between(ts_ns_from, ts_ns_to)
        & (polars.col("another_col") < some_threshold)) \
    .sort("timestamp")
q1 = source.select([polars.sum_horizontal([col for col in source.columns if "test" in col])/20])
q1.collect(streaming=True)

Log output

chrome crashes when using `POLARS_VERBOSE`. A few rows before it does:
POLARS PREFETCH_SIZE: 48
RUN STREAMING PIPELINE
parquet -> sort -> projection -> ordered_sink
RefCell { value: [] }
STREAMING CHUNK SIZE: 2272 rows
STREAMING CHUNK SIZE: 2272 rows
STREAMING CHUNK SIZE: 2272 rows
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
STREAMING CHUNK SIZE: 2272 rows
parquet file must be read, statistics not sufficient for predicate.
parquet file must be read, statistics not sufficient for predicate.
...

Traceback:

thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 6 but the index is 6
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 9 but the index is 9
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 2 but the index is 2
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 6 but the index is 6
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 1 but the index is 1
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 2 but the index is 2
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 2 but the index is 2
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 6 but the index is 6
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 5 but the index is 5
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 1 but the index is 1
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 4 but the index is 4
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 8 but the index is 8
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 5 but the index is 5
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 9 but the index is 9
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 14 but the index is 14
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 5 but the index is 5
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 15 but the index is 15
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 5 but the index is 5
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 7 but the index is 7
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 2 but the index is 2
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 2 but the index is 2
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 9 but the index is 9
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 3 but the index is 3
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 7 but the index is 7
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at crates/polars-core/src/series/into.rs:15:23:
index out of bounds: the len is 6 but the index is 6
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at /home/conda/feedstock_root/build_artifacts/polars_1701471768069/work/crates/polars-arrow/src/chunk.rs:20:31:
called `Result::unwrap()` on an `Err` value: ComputeError(ErrString("Chunk require all its arrays to have an equal number of rows"))
thread '<unnamed>' panicked at crates/polars-pipe/src/executors/sinks/io.rs:232:33:
called `Result::unwrap()` on an `Err` value: Io(Os { code: 28, kind: StorageFull, message: "No space left on device" })

Issue description

The query executes fine for about 1 month of data. If more data is selected, the above panic happens (even with low_memory=True. Sometimes, even when it doesn't panic, I see a lot of data being written to /tmp/polars/sort.

Expected behavior

According to docs, should work

Installed versions

``` --------Version info--------- Polars: 0.19.19 Index type: UInt32 Platform: Linux-6.1.64-1-MANJARO-x86_64-with-glibc2.38 Python: 3.10.13 | packaged by conda-forge | (main, Oct 26 2023, 18:07:37) [GCC 12.3.0] ----Optional dependencies---- adbc_driver_manager: cloudpickle: 3.0.0 connectorx: 0.3.2 deltalake: fsspec: 2023.12.1 gevent: 23.9.1 matplotlib: 3.8.2 numpy: 1.24.4 openpyxl: pandas: 1.5.3 pyarrow: 14.0.1 pydantic: 1.10.13 pyiceberg: pyxlsb: sqlalchemy: 1.4.50 xlsx2csv: xlsxwriter: ```
ritchie46 commented 10 months ago

It seems we cannot finish the query because we don't have enough space? The data we sort must be stored somewhere.

jmakov commented 10 months ago

Thanks for the quick response. Makes sense. However wondering how come that even if I don't select the sorted column in the resulting query, /tmp/polars/sort takes a lot of space e.g. 25GB. In addition it looks like the sorted column takes only 570MB of space: source.select("timestamp").collect(streaming=True).estimated_size("mb") returns 570. Looks to me that when we sort by 1 column, the data of the whole DF is written somewhere and then sorted, making it impossible to sorting larger than memory data. Or am I missing something?

If that's the case - that we need to have the whole DF sorted in memory, how would then one approach sorting larger than mem DFs, especially when you do e.g. sum_horizontal and require only 1 resulting column (which fits into memory)? Is there a way to make sure scan_parquet reads partitions in sorted order so that sort("timestamp") wouldn't need to be called?