pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
26.9k stars 1.65k forks source link

Implement streaming for scan_pyarrow_dataset #11419

Open lmocsi opened 8 months ago

lmocsi commented 8 months ago

Description

As per the streaming API documentation (https://pola-rs.github.io/polars/user-guide/concepts/streaming/#when-is-streaming-available) streaming now supports: scan_csv, scan_parquet, scan_ipc. Would be nice, if it supported scan_pyarrow_dataset, as well. (Or if scan_parquet supported partitioning='hive')

ritchie46 commented 8 months ago

Yes, I would welcome this. I would like a function that accepts polars expression that represent predicates, column names that are projected and slice information and then could produce a python generator.

ldacey commented 8 months ago

Yeah, a big use case for me is to use a pyarrow scanner to filter a dataset (certain rows and subset of columns) from location A (GCS) and save it to location B (ABFS) without ever reading the data as a pyarrow Table.

deanm0000 commented 8 months ago

Yeah, a big use case for me is to use a pyarrow scanner to filter a dataset (certain rows and subset of columns) from location A (GCS) and save it to location B (ABFS) without ever reading the data as a pyarrow Table.

You can do this now without polars see https://arrow.apache.org/docs/python/dataset.html go all the way to the section Writing large amounts of data

ldacey commented 8 months ago

Yeah, a big use case for me is to use a pyarrow scanner to filter a dataset (certain rows and subset of columns) from location A (GCS) and save it to location B (ABFS) without ever reading the data as a pyarrow Table.

You can do this now without polars see https://arrow.apache.org/docs/python/dataset.html go all the way to the section Writing large amounts of data

Yeah - I do it without polars today. All of my other tasks (Airflow) are using Polars to read the data into a LazyFrame to transform it, filter it, drop duplicates, etc. I was just saying it would be cool to be able to read a dataset and then stream the output with polars, similar to how a scanner works in pyarrow.

For example, this worked for a 450mil row dataset and allowed me to repartition the data monthly instead of daily. Is there a way to do this with Polars currently to replace projection with df.with_columns(..) or do other LazyFrame transformations/filters?

projection = {col: ds.field(col) for col in dataset.schema.names}
projection.update({"month_id": pc.strftime(ds.field("timestamp_utc"), format="%Y%m").cast(pa.int32())}) 
scanner = dataset.scanner(columns=projection)
ds.write_dataset(
    data=scanner,
    base_dir="...",
    filesystem=gcs,
    format="parquet",
    partitioning=["month_id"],
    partitioning_flavor="hive",
)
deanm0000 commented 8 months ago

pyarrow's dataset writer will/can open all (or if too many, a big subset of) the destination files simultaneously and write to them as it traverses the source. polars doesn't have that capability yet so you'd have to sink_parquet to each destination of your hive manually. In other words, even if/when polars streams from a pyarrow dataset, it'd be a separate feature to have it sink to multiple parquet files as a purpose-build dataset writer. It seems the direction polars is going is to have native dataset readers and writers rather than to rely on pyarrow at all.

lmocsi commented 8 months ago

pyarrow's dataset writer will/can open all (or if too many, a big subset of) the destination files simultaneously and write to them as it traverses the source. polars doesn't have that capability yet so you'd have to sink_parquet to each destination of your hive manually. In other words, even if/when polars streams from a pyarrow dataset, it'd be a separate feature to have it sink to multiple parquet files as a purpose-build dataset writer. It seems the direction polars is going is to have native dataset readers and writers rather than to rely on pyarrow at all.

Is this capability on the todo list at polars, or should I request it (writing multiple files simultaneously)?

deanm0000 commented 8 months ago

I don't think there's a to-do list apart from the issues list. I suppose it can't hurt to create the issue but I'd frame it more along the lines of writing datasets including hive partitioning natively rather than focusing on the underlying parallelism that would accompany it. Just my 2 cents. I don't really speak for the main devs of the project.

arnaud-vennin commented 6 months ago

@lmocsi did you have the opportunity to create the issue related with the topic? I would also be interested of having streaming mode for scan_pyarrow_dataset as well.

As for now, I use a workaround described in this article : getting all the table partitions on S3 in a list and then call the scan_parquet function to be able to use the streaming mode.

One issue I had is if one of the sources tables is updated during the pipeline run, it will crash it.

barak1412 commented 5 months ago

@arnaud-vennin did you manage to run your code on private S3 cloud, rather than AWS? currently I can't achieve streaming.

jmakov commented 5 months ago

Not related to the issue itself, but wondering why you went with arrow dataset instead of parquet dataset.

barak1412 commented 5 months ago

@jmakov look at my ticket here.

lmocsi commented 3 months ago

Not related to the issue itself, but wondering why you went with arrow dataset instead of parquet dataset.

Cause at the time of submitting the issue, scan_parquet did not support hive partitioning I guess.

arnaud-vennin commented 2 weeks ago

@barak1412 I'm sorry I missed the notification related to your question. Currently we are using scan_parquet() which now accepts partitioning,

barak1412 commented 2 weeks ago

@arnaud-vennin Thanks. Unfortunately still no streaming option for scan_pyarrow_dataset, so for example, working with HDFS metastore is still very limited (I hope it will be prioritized soon, after the streaming engine redesign).