pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.38k stars 1.97k forks source link

`sink_parquet()` ignores `row_group_size` parameter #13092

Open DrMaphuse opened 11 months ago

DrMaphuse commented 11 months ago

Checks

Reproducible example

import polars as pl
import pyarrow.parquet as pq

df = pl.DataFrame(["a"] * 1_000_000).lazy()

df.sink_parquet("test.parquet", row_group_size=100)

# Read the metadata from a local file
metadata = pq.read_metadata("test.parquet")
metadata.row_group(0)
<pyarrow._parquet.RowGroupMetaData object at 0x7fac3a298180>
  num_columns: 1
  num_rows: 15625
  total_byte_size: 2006

Log output

No response

Issue description

The row_group_size parameter of the sink_parquet() function does not appear to have any effect.

Incidentally, the default row groups can also sometimes lead to inflated footer size (I've seen up to 50GB), which causes issues with some parquet readers that limit footer size to 16MB.

Expected behavior

Row groups should match the size passed in the argument.

Installed versions

``` --------Version info--------- Polars: 0.19.19 Index type: UInt32 Platform: Linux-5.15.0-86-generic-x86_64-with-glibc2.35 Python: 3.9.7 | packaged by conda-forge | (default, Sep 29 2021, 19:20:46) [GCC 9.4.0] ----Optional dependencies---- adbc_driver_manager: cloudpickle: 2.2.1 connectorx: deltalake: fsspec: 2023.6.0 gevent: matplotlib: 3.6.3 numpy: 1.25.2 openpyxl: 3.1.2 pandas: 2.0.3 pyarrow: 12.0.1 pydantic: 1.10.6 pyiceberg: pyxlsb: sqlalchemy: 1.4.39 xlsx2csv: 0.8.1 xlsxwriter: 3.1.4 ```
stas-sl commented 11 months ago

https://github.com/pola-rs/polars/issues/10108

ByteNybbler commented 10 months ago

Seems like the parquet file sink is using a batched writer under the hood. If I'm properly understanding how they work, Polars' batched parquet writers don't currently have any row group size settings, and instead just write row groups which are sized based on the DataFrames they receive. In this case, it seems like those DataFrames come from the streaming engine.

If the parquet BatchedWriter could have a row_group_size parameter, that would be very useful in general, and would also help resolve this issue. I'm not sure what the best way to implement that would be though. An intermediately-buffered DataFrame accumulating rows might work, but it might be pretty imprecise and would also risk running out of memory for huge row group sizes?

douglas-raillard-arm commented 6 months ago

I hit this issue and dug a bit: the arrow package works slightly differently: writing a RecordBatch does not imply creating a row group. Instead, this is left to a .flush() method. It looks like it wouldn't be very hard to adapt polars code to behave the same, or at least split write() into a partial_write() and flush() operation, so that write() preserves its current semantic.

ghuls commented 5 months ago

Still the same issue with Polars 0.20.31.

NilsJPWerner commented 1 month ago

Having the same issue on Polars 1.9.0.

I am trying to convert a csv to a parquet file with only one chunk for performance purposes via streaming to limit the max memory used.

The same options work for write_csv so this is a bit surprising.

In [3]: pl.scan_csv(
   ...:     "~/Downloads/input.csv",
   ...:     schema=schema,
   ...:     infer_schema=False,
   ...: ).sink_parquet(
   ...:     "~/Downloads/output.parquet",
   ...:     compression="zstd",
   ...:     row_group_size=sys.maxsize,
   ...:     data_page_size=sys.maxsize,
   ...: )
   ...: 
   ...: pl.read_parquet("~/Downloads/output.parquet").n_chunks()
Out[3]: 243

In [4]: pl.scan_csv(
   ...:     "~/Downloads/input.csv",
   ...:     schema=schema,
   ...:     infer_schema=False,
   ...: ).collect().write_parquet(
   ...:     "~/Downloads/output.parquet",
   ...:     compression="zstd",
   ...:     row_group_size=sys.maxsize,
   ...:     data_page_size=sys.maxsize,
   ...: )

In [5]: pl.read_parquet("~/Downloads/output.parquet").n_chunks()
Out[5]: 1
douglas-raillard-arm commented 1 month ago

I actually had a go at trying to fix it. Making the API more like arrow-rs was not very hard, but I stopped at the point where I needed to rewrite the row group metadata generation logic to amend an existing set of metadata rather than generate a whole new one. Otherwise, you end up with the row group metadata reflecting only the last chunk you added to it, which makes it completely wrong ofc.