pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
30.49k stars 1.98k forks source link

Support streaming from non-local files. #9345

Open elephantum opened 1 year ago

elephantum commented 1 year ago

Problem description

When I try to process huge file from S3 in a streaming way I get

sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()

Minimal reproducible example is:

import polars as pl
df = pl.scan_parquet("s3://rimrep-data-public/091-aims-sst/test-50-64-spatialpart/part.0.parquet")
df.sink_parquet("part.0.parquet")

I wish that streaming processing would work with remote files because that is exactly when you have to deal with huge data.

ritchie46 commented 1 year ago

Yes, we got that on the radar. First thing we want to do is add streaming support for scan_pyarrow_dataset.

elephantum commented 1 year ago

Also relevant for this issue: https://github.com/fsspec/filesystem_spec/blob/master/fsspec/parquet.py#L17 Specifically function fsspec.parquet.open_parquet_file

It seems that some time ago Nvidia introduced this functionality https://developer.nvidia.com/blog/optimizing-access-to-parquet-data-with-fsspec/

From the surface it seems to be compatible with functionality required for LazyParquetReader. I imagine it might be possible to do similar magic on python side and populate LogicalPlan with reading chunks of data through fsspec.

bcambel commented 1 year ago

@elephantum perhaps this article can also do the work till @ritchie46 completes the work.

Btw @ritchie46 such a great library! Thanks a lot! Groetjes, B

jackmead515 commented 1 year ago

Maybe a bit of an add, but would help if the feature wasn't just limited to string interpretation and we had the ability to pass in an opened buffer to a file or in memory data source like:

stream = io.Bytes()
pl.scan_parquet(stream)

# or

with open('file.parquet', 'rb') as f:
   pl.scan_parquet(f)

# or because s3fs is awesome...

with s3.open('s3://bucket/etc/etc/', 'rb') as f:
   pl.scan_parquet(f)

# or finally like with a Flask request stream

pl.read_parquet(request.stream)

Talk about the IO and Memory savings... that would be impressive.

I mean yes we have tools like s3fs-fuse where I can read straight from S3 using a scan, but still.

svaningelgem commented 1 year ago

@jackmead515 : this is already possible for scanning. As long as it is a python byte-stream, you can do that.

Please do correct me if I'm wrong, but I'm using that in production already ;-). I would very much like the sink_* methods to do the same, but that -alas- ain't possible yet.

jackmead515 commented 1 year ago

@svaningelgem No kidding? I just reread the documentation. I guess read_parquet() actually does define:

string, Path, BinaryIO, BytesIO, bytes

In the function... But since this isn't lazy, I suspect that it's still going to try to read all the data during that function call. I was assuming that the scan_ function calls are purely lazy and will significantly reduce memory because it will take it in chunks.

cdmoye commented 10 months ago

FWIW, the original issue reported by @elephantum seems to no longer be a problem:

import polars as pl
df = pl.scan_parquet('s3://<my bucket>/<my key that is a parquet file>')
df.sink_parquet('test.parquet')
pl.read_parquet('test.parquet')

shows me the dataframe I expect.

This is using polars 0.20.5

deanm0000 commented 10 months ago

@cdmoye it's a bit vague but there's still no support to sink to remote objectstores only read from them.