apache / iceberg-python

Apache PyIceberg
https://py.iceberg.apache.org/
Apache License 2.0
383 stars 140 forks source link

Expose PyIceberg table as PyArrow Dataset #30

Open Fokko opened 11 months ago

Fokko commented 11 months ago

Feature Request / Improvement

Migrated from https://github.com/apache/iceberg/issues/7598:

Hi, I've been looking at seeing what we can do to make PyArrow Datasets extensible for various table formats and making them consumable to various compute engines (including DuckDB, Polars, DataFusion, Dask). I've written up my observations here: https://docs.google.com/document/d/1r56nt5Un2E7yPrZO9YPknBN4EDtptpx-tqOZReHvq1U/edit?usp=sharing

What this means for PyIceberg's API

Currently, integration with engines like DuckDB means filters and projections have to be specified up front, rather than pushed down from the query:

con = table.scan(
    row_filter=GreaterThanOrEqual("trip_distance", 10.0),
    selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_duckdb(table_name="distant_taxi_trips")

Ideally, we can export the table as a dataset, register it in DuckDB (or some other engine), and then filters and projections can be pushed down as the engine sees fit. Then the following would perform equivalent to the above, but would be more user friendly:

dataset = table.to_pyarrow_dataset()
con.register(dataset, "distant_taxi_trips")
conn.sql(""""SELECT VendorID, tpep_pickup_datetime, tpep_dropoff_datetime
    FROM distant_taxi_trips
    WHERE trip_distance > 10.0""")

Query engine

Other

RahulDubey391 commented 9 months ago

Hi @Fokko , I can have a look into the issue!

stefnba commented 8 months ago

Hi, is there any update on this topic? Thanks.

jwills commented 7 months ago

Yo, just chiming in that we would love this for dbt-duckdb use cases-- thanks!

jwills commented 7 months ago

(If this is a thing I can add, please lmk- I can be surprisingly useful)

Fokko commented 7 months ago

Hey @jwills I think many folks are looking forward to this, so it would be great if you would be willing to spend time on getting this in 🙌

jwills commented 7 months ago

sg @Fokko, will dive in here

Fokko commented 7 months ago

Awesome, let me know if there are any questions. Happy to provide context

jwills commented 7 months ago

@Fokko okay I'm read in here; is the best approach atm something like this comment from the original issue?

I looked at the code in PyIceberg again and I remembered an idea I had that I never tested. Right now, the implementation eagerly loads a table for every file-level projection and concats them. Would it be possible instead to create a pyarrow dataset for every file and return a union dataset that combines them? I've never touched these lower level features of PyArrow datasets before so this idea is all based on hazy recollection of source code reading from long ago.

If this is something PyArrow supports today (unioning datasets with different projection plans that produce the same final schema, without materializing a table), then it could be the easiest way to achieve the "pyiceberg returns a dataset that is compatible with iceberg schema evolution", at least for copy-on-write workloads.

Are there any dragons here or downsides to attempting this that would make it not worthwhile to attempt?

stefnba commented 7 months ago

Maybe this helps.

A PyArrow Dataset can be initiated from a list of file paths:

Create a FileSystemDataset from explicitly given files. The files must be located on the same filesystem given by the filesystem parameter. Note that in contrary of construction from a single file, passing URIs as paths is not allowed.

More info here.

To make it work, the corresponding cloud filesystem, e.g. pyarrow.fs.S3FileSystem has to be specified, see here.

Fokko commented 7 months ago

Hey @jwills Having a union dataset feels like a step in the right direction to me, however I don't think it will really help when it comes to performance.

Loading the files through PyArrow is very slow at the moment. The biggest issue there is that we aren't able to do the schema evolution in pure Arrow. That's why we materialize to a table, do all the changes needed to the schema, and then we concat all the tables in the end. This is very costly to do in Python. The main issue here is that Arrow does not support fetching schema's/filtering through field-ids which is the basis of Iceberg.

A cleaner option would be to have the arrow dataset expose a protocol that we can implement. This was suggested a while ago, but they we're very reluctant on this and wanted to do everything through substrait.

jwills commented 7 months ago

@Fokko agreed, the union approach seems like a perf killer. Will noodle on this a bit more-- thanks for the context here!

Fokko commented 7 months ago

Just for context, don't know if it helps. I was recently playing by pushing the union of the tables into Arrow, including all the schema evolution. This would prevent PyIceberg from doing this itself which is slow. The idea was to create an empty table with the requested schema. And then union all the parquet files to it. With the new option in concat table to automatically do schema evolution. The missing part there is that Arrow cannot re-order struct fields :(

jwills commented 7 months ago

That is helpful, thank you.

One other option I was considering on my side, given that I have access to https://github.com/duckdb/duckdb_iceberg : Using pyiceberg to fetch the metadata for an Iceberg table (like path and which snapshot to read) but then delegating the actual reading to the Iceberg scan operation built-in to DuckDB (which looks to me like it bypasses the arrow issues entirely.)

Do you have thoughts on that approach, or is it outside of your wheelhouse?

Fokko commented 7 months ago

I'm always in for in for creative solutions. I think that would well, also my colleague did something similar: https://gist.github.com/kainoa21/f3d01c607fce2741cef22683048a22a3 which is a really nifty trick!

wonnor-pro commented 7 months ago

Hi team, do we have an update on this? We are really excited with this feature.

mfatihaktas commented 6 months ago

Just to note, we would also love this feature. It would allow us to support Iceberg read/write in Ibis.

TiansuYu commented 3 weeks ago

A PyArrow Dataset can be initiated from a list of file paths:

Create a FileSystemDataset from explicitly given files. The files must be located on the same filesystem given by the filesystem parameter. Note that in contrary of construction from a single file, passing URIs as paths is not allowed.

I have the opposite idea in mind: a Pyarrow representation / dataset protocol should be something like a MemoryBuffer that offers a set of APIs available to every table implementation.

Then other query engine can then load the dataset via this "InMemoryDataset" (thats kind of my mental model for Arrow) as intermediary (not sure we want to expose this directly like this, could be something thats hidden and low level).