apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14k stars 3.41k forks source link

[Python] Should RecordBatchStreamReader/Writer be AsyncIterable? #20858

Open asfimport opened 5 years ago

asfimport commented 5 years ago

Filing this issue after a discussion today with @xhochy about how to implement streaming pyarrow http services. I had attempted to use both Flask and aiohttp's streaming interfaces because they seemed familiar, but no dice. I have no idea how hard this would be to add – supporting all the asynciterable primitives in JS was non-trivial.

Reporter: Paul Taylor / @trxcllnt

Note: This issue was originally created as ARROW-4283. Please see the migration documentation for further details.

asfimport commented 5 years ago

Antoine Pitrou / @pitrou: There are two cases here:

asfimport commented 5 years ago

Paul Taylor / @trxcllnt: @pitrou Thanks for the feedback.

I want to clarify: my Python skills aren't sharp, I'm not familiar with the pyarrow API or Python's asyncio/async-iterable primitives, so filter my comments through the lens of a beginner.

The little experience I do have is using the RecordBatchStreamReader to read from stdin (via sys.stdin.buffer) and named file descriptors (via os.fdopen()). Since Python's so friendly (and I have no idea how the Python IO primitives work), I thought maybe I could pass aiohttp's Request.stream to the RecordBatchStreamReader constructor, and quickly learned that no, I can't ;).

In the JS implementation we have two main entry points for reading RecordBatch streams:

  1. a static RecordBatchReader.from(source), which accepts heterogeneous source types and returns a RecordBatchReader for the underlying Arrow type (file, stream, or JSON) and conforms to sync/async semantics of the source input type
  2. methods that create through/transform streams from the RecordBatchReader and RecordBatchWriter, for use with node's native stream primitives

    Each link in the streaming pipeline is a sort of transform stream, and a significant amount of effort went into supporting all the different node/browser IO primitives, so I understand if that's too much to ask at this point.

    As an alternative, would it be possible to add a method that accepts a Python byte stream, and returns a zero-copy AsyncIterable of RecordBatches? Or maybe add an an example in the python/ipc docs page of how to do that?

asfimport commented 1 year ago

Rémi Dettai / @rdettai: Hi! I would like to revive this thread. We have a similar usecase where we need an async interface in PyArrow for IPC streams.

RecordBatchStreamReader: this is too high-level; you need to read from your data source in Python (using await something.read()) then construct a record batch out of the data (perhaps with a BufferReader)

To be able to construct the record batch, we need to know how much bytes we need to read. Getting that information implies:

The big issue here is that PyArrow doesn't seem to expose the right primitives for that, in particular parsing the metadata.

I believe that asyncio is quickly gaining in popularity, and Arrow being an exchange format, it will end up being used in a lot of use cases like the one mentioned by [~paul.e.taylor] where async is very valuable.

asfimport commented 1 year ago

Weston Pace / @westonpace: Things have changed a bit since 2019. The RecordBatchFileWriter has an asynchronous API now. It's currently exposed as a whole-file reading AsyncGenerator (an iterator function that returns a promise each time you call it) via RecordBatchFileWriter::OpenAsync and RecordBatchFileWriter::GetRecordBatchGenerator. Although, under the hood, there are ReadFooterAsync, ReadRecordBatchAsync methods that could be exposed should more direct control be desired.

Adapting this pattern to the streaming reader should be pretty straightforward. These methods all return arrow::Future. As far as I know no one has done the neccesary work to plumb arrow::Future into a python async API (e.g. asyncio).

Asynchronous methods in Arrow typically work by offloading the blocking I/O calls to a global I/O thread pool (which can have more threads than there are cores and should generally be sized appropriately for the I/O device). This keeps the CPU threads free and non-blocking. To hook this into asyncio you would probably want to call arrow::Future::AddCallback and then, in that callback, schedule a task on some kind of python executor. In that python executor task you will want to mark some kind of asyncio future complete and this will presumably run any needed callbacks.

asfimport commented 1 year ago

Weston Pace / @westonpace: Also, a note on RecordBatchStreamWriter. In most cases you can probably get away with not making this async. If you are writing to disk then the write is, typically, implicitly async. The write function merely does a memcpy from user space to kernel space (into the page cache), marks the page dirty, and then immediately returns (without waiting for the data to be persisted to disk). The only time this is blocking is if you are out of memory and swapping in which case you might have to wait for some physical memory to become available. We do run into this on large datasets though and are currently investigating a direct I/O alternative which would, unfortunately, require async. So that would be the exception to "in most cases".

Cloud filesystems behave similarly (well, I'm quite certain Arrow's S3 writer is implicitly async and others should be able to be so) where we create an S3 request and then submit that request to an I/O thread under the hood and simply have a non-blocking write method.

gaby commented 1 year ago

Any progress on this?