apache / arrow-julia

Official Julia implementation of Apache Arrow
https://arrow.apache.org/julia/
Other
285 stars 59 forks source link

Add an indexable variant of Arrow.Stream #353

Open bkamins opened 1 year ago

bkamins commented 1 year ago

In distributed computing context it would be nice to have a vector-variant of Arrow.Stream iterator. The idea is to be able to split processing of a single large arrow file with multiple record batches into multiple worker processes. Looking at the source code this should be possible to be done in a relatively efficient way.

@quinnj - what do you think?

baumgold commented 1 year ago

I don't think this is possible. The Arrow file format is a series of FlatBuffer messages that are not indexed and therefore have to be iterated over. More concretely, the BatchIterator doesn't support random access.

bkamins commented 1 year ago

My idea was that the constructor of such indexable object could do the indexing you mention. I assume that the whole file would have to be scanned, but maybe it could be done in a cheap way, i.e. without having to read/interpret all the data stored in the file.

quinnj commented 1 year ago

Yeah, we could probably add support for this. Maybe with a lazy::Bool=true keyword argument; lazy=false would eagerly iterate messages and store the positions so they could be randomly accessed while lazy=true gives the current behavior where each iteration only consumes one message.

Curious though, because a non-hard workflow you can already do is:

for record_batch in Arrow.Stream(...)
    Distributed.@spawn begin
        # do stuff with record_batch
    end
end

what are the alternative workflows where that doesn't work for you?

bkamins commented 1 year ago

What you propose works, but I thought in this approach the parallelism would not be achieved (i.e. that Arrow.Stream would parse values before moving forward to the next record batch). If it does just skip ahead then the issue can be closed.

quinnj commented 1 year ago

Ah, you're correct; we do all the message processing in the Arrow.Stream iterate method. Ok, yeah, we should provide an alternative here.

baumgold commented 1 year ago

Ah, you're correct; we do all the message processing in the Arrow.Stream iterate method. Ok, yeah, we should provide an alternative here.

This would be a great improvement as it would also allow predicate-pushdown at the RecordBatch level based on Message-level metatdata, thus opening up the ability to operate on a single RecordBatch without uncompressing all RecordBatches in a file. This is an important feature for me so I'll try to spend some time building this without breaking too much.

My idea was that the constructor of such indexable object could do the indexing you mention. I assume that the whole file would have to be scanned, but maybe it could be done in a cheap way, i.e. without having to read/interpret all the data stored in the file.

If the data uses the "IPC File Format" then the footer (link) should contain all the information we need to construct this index. This should be more performant than scanning the whole file, but is certainly an optimization as scanning should also be supported.

JoaoAparicio commented 1 year ago

My idea was that the constructor of such indexable object could do the indexing you mention. I assume that the whole file would have to be scanned, but maybe it could be done in a cheap way, i.e. without having to read/interpret all the data stored in the file.

I implemented this minus the indexing. Thoughts?