apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.37k stars 3.49k forks source link

[Python] Define a Dataset protocol based on Substrait and C Data Interface #37504

Open wjones127 opened 1 year ago

wjones127 commented 1 year ago

Describe the enhancement requested

Based on discussion in the 2023-08-30 Arrow community meeting. This is a continuation of https://github.com/apache/arrow/pull/35568 and https://github.com/apache/arrow/issues/33986.

We'd like to have a protocol for sharing unmaterialized datasets that:

  1. Can be consumed as one or more streams of Arrow data
  2. Can have projections and filters pushed down to the scanner

This would provide a extendible connection between scanners and query engines. Data formats might include Iceberg, Delta Lake, Lance, and PyArrow datasets (parquet, JSON, CSV). Query engines could include DuckDB, DataFusion, Polars, PyVelox, PySpark, Ray, and Dask. Such a connection would let end-users employ their preferred query engine to load any supported dataset. From their perspective, usage would might look like:

from deltalake import DeltaTable
table = DeltaTable("path/to/table")

import duckdb
duckdb.sql("SELECT y FROM table WHERE x > 3")

The protocol is largely invisible to the user. Behind the scenes, duckdb would call __arrow_scanner__() on table to get a scannable object. It would then pass down the column selection ['y'] and the filter x > 3 to the scanner, and get the get the resulting data stream as input to the query.

Shape of the protocol

The overall shape would look roughly like:

from abc import ABC

class AbstractArrowScannable(ABC):
    def __arrow_scanner__(self) -> AbstractArrowScanner

class AbstractArrowScanner(ABC):
    def get_schema(self) -> capsule[ArrowSchema]:
        ...

    def get_stream(
        self,
        columns: List[str],
        filter: SubstraitExpression,
    ) -> capsule[ArrowArrayStream]:
        ...

    def get_partitions(self, filter: filter: SubstraitExpression) -> list[AbstractArrowScanner]:
        ...

Data and schema are returned as C Data Interface objects (see: #35531). Predicates are passed as Substrait extended expressions.

Component(s)

Python

paleolimbot commented 1 year ago

Is schema negotiation outside the scope of this protocol? If get_schema() contains a Utf8View, for example, is it the consumer's responsibility to do the cast, or can the consumer pass a schema with Utf8View columns as Utf8 to get_stream() (or another method)?

wjones127 commented 1 year ago

Is schema negotiation outside the scope of this protocol?

I think we can include that. I'd like to design that as part of the PyCapsule API first, so we match the semantics there.

wjones127 commented 9 months ago

Haven't had time to work on this, but wanted to note here a current pain point for users of the dataset API is that there aren't table statistics the caller can access, and this leads to bad join orders. Some mentions of this here:

https://twitter.com/mim_djo/status/1740542585410814393 https://github.com/delta-io/delta-rs/issues/1838

pitrou commented 7 months ago

Are we sure a blocking API like this would be palatable for existing execution engines such as Acero, DuckDB... ?

Of course, at worse the various method/function calls can be offloaded to a dedicated thread pool.

wjones127 commented 7 months ago

Are we sure a blocking API like this would be palatable?

Are you referring to the fact they would have to acquire the GIL to call these methods? Or something else?

Ideally all these methods are brief.

Though I haven't discussed this in depth with implementors of query engines. I'd be curious for their thoughts.

pitrou commented 7 months ago

Are we sure a blocking API like this would be palatable?

Are you referring to the fact they would have to acquire the GIL to call these methods? Or something else?

No, to the fact that these functions are synchronous.

Ideally all these methods are brief.

I'm not sure. get_partitions will typically have to walk a filesystem, which can be long-ish especially on large datasets or remote filesystems.

paleolimbot commented 7 months ago

Perhaps get_partitions(...) -> Iterable[AbstractArrowScanner] would do it? Not sure if anybody is interested in asyncio for this but an async iterator might work.

pitrou commented 7 months ago

An Iterable would probably be better indeed. It would not solve the async use case directly but we would at least allow producing results without blocking on the entire filesystem walk.