pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.4k stars 1.97k forks source link

Polars scan_iceberg inefficiently loads Iceberg table data into memory leading to out-of-memory errors #18912

Open learningkeeda opened 1 month ago

learningkeeda commented 1 month ago

Checks

Reproducible example

catalog = load_catalog("default") table = catalog.load_table("test.table1")

pl.scan_iceberg(table ).collect()

Log output

No response

Issue description

Expected behavior

To avoid these issues, it's best to use dictionary encoding, especially for columns with low cardinality, where values are repeated frequently. Dictionary encoding reduces the storage size of data by encoding frequent values with shorter codes. Without it, data is stored in its original, uncompressed form, leading to increased memory usage

Installed versions

``` --------Version info--------- Polars: 0.20.13 Index type: UInt32 Platform: Linux Python: 3.9.19 ----Optional dependencies---- adbc_driver_manager: not installed cloudpickle: not installed connectorx: not installed deltalake: not installed fsspec: not installed gevent: not installed matplotlib: not installed numpy: 1.26.4 openpyxl: 3.1.5 pandas: 2.2.2 pyarrow: 17.0.0 pydantic: 2.6.3 pyiceberg: 0.7.0 sqlalchemy: 2.0.29 xlsxwriter: 3.2.0 ```
ritchie46 commented 1 month ago

Are you sure you are not going OOM because you load into memory?

Nicolas-SB commented 1 month ago

I also noticed increased memory usage depending on how I read the data into my DataFrame.

[...] # Imports and stuff

parquet_with_hive = pl.read_parquet("s3://mybucket/mytable/col1=val1/myfile.parquet", hive_partitioning=True)
parquet_without_hive = pl.read_parquet("s3://mybucket/mytable/col1=val1/myfile.parquet", hive_partitioning=False)

iceberg_table = load_catalog("default").load_table("mytable")
iceberg = pl.scan_iceberg(iceberg_table)
iceberg = iceberg.filter(pl.col("col1") == val1).collect() # Only collect data from this one parquet file, not from other partitions

# Check that the data is indeed the same
assert_frame_equal(iceberg, parquet_with_hive)
assert_frame_equal(iceberg, parquet_without_hive)

print(parquet_with_hive.estimated_size("mb"))
print(parquet_without_hive.estimated_size("mb"))
print(iceberg.estimated_size("mb"))

With one of my test data sets it leads to the following output:

180.23824501037598
969.8529415130615
1467.6535663604736

I expected the estimated_size to be the same, since all DataFrames hold the same data. We would like to use Iceberg, but it seems that we will have to read the data using read_parquet + hive_partitioning=True instead.

learningkeeda commented 1 month ago

Are you sure you are not going OOM because you load into memory?

Iceberg tables having String columns with low cardinality, are not efficiently loaded into memory as a result we are getting OOM. Ideally, if we have optimizations in place (such as dictionary encoding), we can extend polars usage to much larger size datasets.

bhaskar-pv commented 1 week ago

Any update on this? I am having an issue reading 2 GB of a column of data where the iceberg size is 240 GB.