pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.4k stars 1.97k forks source link

scan_parquet collect streaming not streaming even though explain indicates streaming #16376

Open Dekermanjian opened 6 months ago

Dekermanjian commented 6 months ago

Checks

Reproducible example

Unfortunately, it is not quite straight forward to provide a copy-pasteable example. A faithful simulation of the problem requires several parquet files that are very large in size.

Log output

No response

Issue description

I am using scan_parquet() to scan several parquet files that exist in a directory. The data is ~ 35M records. Before collecting the lazy frame, I am trying to filter it down. This is the resulting lazyframe.explain() output:

--- STREAMING

  Parquet SCAN 8 files: first file: adl://HIDDEN/HIDDEN/FILENAME.parquet
  PROJECT */7 COLUMNS
  SELECTION: [([(col("patient_id")) == (Series[id])]) & ([(col("_input_file_modification_date").strict_cast(Date)) == (2024-05-21)])]  --- END STREAMING

  DF []; PROJECT */0 COLUMNS; SELECTION: "None"

This suggests that the filtering should work in streaming fashion. The result of the above should result in a dataframe with ~17K records. I keep running out of memory on a 64GB RAM machine. I have tried setting pl.Config.set_streaming_chunk_size() to a low value of 100 but I am still running out of memory. I am really lost and not sure what is going on. I have tried several versions of polars including the latest as of today version, but all of them ran out of memory.

Any suggestions/help is appreciated.

Expected behavior

I expect to filter the data in streaming fashion avoiding out of memory crashes.

Installed versions

``` --------Version info--------- Polars: 0.20.14 Index type: UInt32 Platform: Linux-5.15.0-1057-azure-x86_64-with-glibc2.31 Python: 3.11.9 (main, Apr 19 2024, 16:48:06) [GCC 11.2.0] ----Optional dependencies---- adbc_driver_manager: cloudpickle: 2.2.1 connectorx: deltalake: 0.17.3 fastexcel: fsspec: 2023.5.0 gevent: hvplot: matplotlib: 3.8.4 numpy: 1.26.4 openpyxl: 3.1.2 pandas: 2.2.2 pyarrow: 15.0.2 pydantic: pyiceberg: pyxlsb: sqlalchemy: 2.0.30 xlsx2csv: 0.8.2 xlsxwriter: . ```
ritchie46 commented 5 months ago

Can you show log output?

Dekermanjian commented 5 months ago

@ritchie46 thank you for your response. I am experiencing something weird where the log output in a bash shell only says: Terminated

If I run it in a Jupyter notebook then it just says that the kernel crashed.

After trying to trouble shoot it yesterday, I think that the streaming may actually be working but the data is just too large. The number of records in the table is 3,066,869,440. The problem is that this is a delta table that I am trying to read using scan_parquet() so that I can utilize streaming, but doing so makes the data become astronomically large because it is reading in all historical versions of the data.

Do you have any suggestions or advice as to how I can go about streaming this large parquet files using polars? Is this beyond the scope of polars?

ritchie46 commented 5 months ago

Currently we are not there yet. We are wroking on a new engine to be able to cope with that. Please give us some time. ;)

Dekermanjian commented 5 months ago

Thank you @ritchie46, I am excited to see the new engine when it is ready and available!

Samreay commented 4 months ago

Just wondering if there's been any update to the streaming functionality, as I saw several other scan_parquet issues merged in, in the past twelve months or so, and am hoping improvements to stream might have made it in :)

jacksongoode commented 4 months ago

It seems like streaming does not actually stream in a LazyFrame from a parquet source.

swtb3 commented 1 month ago

Any update on this, is there an associated feature or ticket we can monitor?