apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.68k stars 3.56k forks source link

[Python] Pickling a sliced array serializes all the buffers #26685

Open asfimport opened 4 years ago

asfimport commented 4 years ago

If a large array is sliced, and pickled, it seems the full buffer is serialized, this leads to excessive memory usage and data transfer when using multiprocessing or dask.


>>> import pyarrow as pa
>>> ar = pa.array(['foo'] * 100_000)
>>> ar.nbytes
700004
>>> import pickle
>>> len(pickle.dumps(ar.slice(10, 1)))
700165

NumPy for instance
>>> import numpy as np
>>> ar_np = np.array(ar)
>>> ar_np
array(['foo', 'foo', 'foo', ..., 'foo', 'foo', 'foo'], dtype=object)
>>> import pickle
>>> len(pickle.dumps(ar_np[10:11]))
165

I think this makes sense if you know arrow, but kind of unexpected as a user.

Is there a workaround for this? For instance copy an arrow array to get rid of the offset, and trim the buffers?

Reporter: Maarten Breddels / @maartenbreddels Assignee: Clark Zinzow

Related issues:

Note: This issue was originally created as ARROW-10739. Please see the migration documentation for further details.

asfimport commented 4 years ago

Wes McKinney / @wesm: We truncate the buffers on sliced arrays when writing record batches to the IPC protocol, so the buffers should be similarly truncated in the case of pickling. 

asfimport commented 4 years ago

Maarten Breddels / @maartenbreddels: Ok, good to know.

Two workarounds I came up with

 


%%timeit
s = pa.serialize(ar.slice(10, 1))
ar2 = pa.deserialize(s.to_buffer())
790 µs ± 578 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)

 

 

 


import vaex.arrow.convert

----

def trim_buffers(ar):
    '''
    >>> ar = pa.array([1, 2, 3, 4], pa.int8())
    >>> ar.nbytes
    4
    >>> ar.slice(2, 2) #doctest: +ELLIPSIS
    <pyarrow.lib.Int8Array object at 0x...>
    [
      3,
      4
    ]
    >>> ar.slice(2, 2).nbytes
    4
    >>> trim_buffers(ar.slice(2, 2)).nbytes
    2
    >>> trim_buffers(ar.slice(2, 2))#doctest: +ELLIPSIS
    <pyarrow.lib.Int8Array object at 0x...>
    [
      3,
      4
    ]
    '''
    schema = pa.schema({'x': ar.type})
    with pa.BufferOutputStream() as sink:
        with pa.ipc.new_stream(sink, schema) as writer:
            writer.write_table(pa.table({'x': ar}))
    with pa.BufferReader(sink.getvalue()) as source:
        with pa.ipc.open_stream(source) as reader:
            table = reader.read_all()
            assert table.num_columns == 1
            assert table.num_rows == len(ar)
            trimmed_ar = table.column(0)
    if isinstance(trimmed_ar, pa.ChunkedArray):
        assert len(trimmed_ar.chunks) == 1
        trimmed_ar = trimmed_ar.chunks[0]

    return trimmed_ar
----

%%timeit
vaex.arrow.convert.trim_buffers(ar.slice(10, 1))
202 µs ± 2.31 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

 

 

asfimport commented 4 years ago

Joris Van den Bossche / @jorisvandenbossche: Note that pyarrow.serialize is deprecated, so best not use that as a workaround

asfimport commented 3 years ago

Maarten Breddels / @maartenbreddels: Thanks Joris!

I cannot reproduce the previous timings (I guess I had an debug install without optimization), but this one seems fastest:


%%timeit
pa.concat_arrays([ar.slice(10, 1)])
2.16 µs ± 9.22 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

(vs 8 and 125 us using ipc and (de)serialize respectively)

asfimport commented 3 years ago

Wes McKinney / @wesm: Seems like this would be good to fix?

asfimport commented 3 years ago

Joris Van den Bossche / @jorisvandenbossche: That would indeed be good.

For a moment, I naively thought this would just be adding a SliceBuffer call when wrapping the buffer in _reduce_array_data (https://github.com/apache/arrow/blob/c43fab3d621bedef15470a1be43570be2026af20/python/pyarrow/array.pxi#L597). But of course, the offset and length to slice the buffer with depends on the array type and bit width, or whether it's a bitmap, etc. In the IPC code, the truncation is handled in the Visit methods of RecordBatchSerializer (eg for primitive arrays: https://github.com/apache/arrow/blob/c43fab3d621bedef15470a1be43570be2026af20/cpp/src/arrow/ipc/writer.cc#L331), and this is quite a lot of code for doing this correctly for all the different data types. Something we shouldn't start replicating in cython, I think.

Are there other utilities in C++ that can be reused to do this truncation? Or could we "just" use the IPC serialization under the hood for pickling in Python?

asfimport commented 2 years ago

Jim Crist-Harif: We're running into this in Dask right now when attempting to integrate Pandas string[pyarrow], since pickling pyarrow string arrays ends up pickling all the data even if the result only includes a small slice. I'm willing to hack on this if no one else has the bandwidth, but upon initial inspection this looks a bit more complicated than I'd like to bite off for a new-ish arrow contribute. With some guidance on the best path forward though I could possibly get something working though? @jorisvandenbossche any further thoughts on a solution here?

asfimport commented 2 years ago

Krisztian Szucs / @kszucs: Postponing to 10.0 since there is no PR available at the moment.

asfimport commented 2 years ago

Philipp Moritz / @pcmoritz: In Ray we are also planning to work around this https://github.com/ray-project/ray/pull/22891 – it would be wonderful to see this fixed in Arrow :)

asfimport commented 2 years ago

Clark Zinzow: Hey folks, I'm the author of the Ray PR that @pcmoritz linked to, which essentially ports Arrow's buffer truncation in the IPC serialization path to Python as a custom pickle serializer. I'd be happy to help push on getting this fixed upstream for Arrow 10.0.0.

First, is there any in-progress work by [~jcrist] or others?

If not, I could take this on in the next month or so; the two implementation routes that I've thought of when looking at the IPC serialization code (these are basically the same routes that @jorisvandenbossche pointed out a year ago) are:

  1. Refactor the IPC writer's per-type buffer truncation logic into utilities that can be shared by the IPC serialization path and the pickle serialization path.
  2. Directly use the Arrow IPC format in its pickle serialization, where the pickle reducer is a light wrapper around the IPC serialization and deserialization hooks.

    Do either of these routes sound appealing? (2) has the added benefits of consolidating the serialization schemes on the IPC format and pushing all expensive serialization code into C++ land, but is a larger change and would involve otherwise-unnecessary wrapping of plain Arrow (chunked) arrays in record batches in order to match the IPC format, so maybe (1) is the better option.

asfimport commented 2 years ago

Clark Zinzow: Ping on this, @amol- [~jcrist] are either of y'all actively working on this for Arrow 10.0.0? And if not, does option (1) that I gave in my previous comment sound like a good path forward?

asfimport commented 2 years ago

Joris Van den Bossche / @jorisvandenbossche: [~clarkzinzow] sorry for the slow reply, several of us were on holidays. As far as I know, nobody is actively working on this, so a PR is certainly welcome! I think option (1) is a good path forward.

asfimport commented 2 years ago

Clark Zinzow: @jorisvandenbossche No worries! And sounds good, I should be able to start working on this in a few weeks, I will update this thread once I've started working on it and again once I have a PR out.

asfimport commented 2 years ago

Joris Van den Bossche / @jorisvandenbossche: That sounds good!

asfimport commented 2 years ago

Joris Van den Bossche / @jorisvandenbossche: [~clarkzinzow] were you able to make some progress on this?

(I am removing the "In progress" label, just to make it a bit easier to keep track of this as open issue in JIRA, but can change that again once there is an open PR)

asfimport commented 2 years ago

Clark Zinzow: @jorisvandenbossche I did a quick implementation of (2), where the Arrow IPC format is used under-the-hood for pickle serialization, and confirmed that the buffer truncation works as expected. Although this is a far simpler solution than (1), the overhead of the RecordBatch wrapper adds ~230 extra bytes to the pickled payload (per Array chunk) compared to current Arrow master, which can be pretty bad for the many-chunk and/or many-column case (order of magnitude larger serialized payloads). We could sidestep this issue by having Table, RecordBatch, and ChunkedArray port their __reduce__ to the Arrow IPC serialization as well, which should avoid this many-column and many-chunk blow-up, but there will still be the baseline ~230 byte bloat for ChunkedArray and Array that we might find untenable.

I can try to get a PR up for (2) either today or tomorrow while I start working on (1) in the background. (1) is going to have a much larger Arrow code impact + we'll continue having two serialization paths to maintain, but it shouldn't result in any serialized payload bloat.

asfimport commented 1 year ago

Apache Arrow JIRA Bot: This issue was last updated over 90 days ago, which may be an indication it is no longer being actively worked. To better reflect the current state, the issue is being unassigned per project policy. Please feel free to re-take assignment of the issue if it is being actively worked, or if you plan to start that work soon.

asfimport commented 1 year ago

Clark Zinzow: Apologies for the delay on this one! I have an implementation of (1) working e2e locally now, where the buffer traversal + truncation is shared by both the IPC serialization and pickle paths, but I haven't had time to clean up the handling of dictionary arrays, which is currently a bit ugly. I'm hoping to get back to it this week and push up a PR.

@jorisvandenbossche Do you know what the 11.0.0 release timeline is?

h-vetinari commented 1 year ago

@raulcd @kou @pitrou @jorisvandenbossche Do you have any insights into the situation here? This bug caused ray to pin arrow <7 on windows, which is putting us into a difficult spot in conda-forge, as we only support the last 4 major versions (which is already highly unusual in conda-forge, but owed to how quickly arrow progresses, to give projects a chance to catch up).

That means arrow 6 does not get migrated against new library releases anymore (e.g. abseil,grpc,protobuf,re2, etc.), and so we'd be making ray essentially impossible to install with a current set of dependencies. By extension, this would become very painful for ray-users on windows, as their environments become hard/impossible to resolve. For these reasons, I'm opposed to adding that cap in conda-forge, but of course we don't want to ship broken packages either.

CC @mattip @krfricke @clarkzinzow

pitrou commented 1 year ago

@h-vetinari This is definitely an issue that deserves fixing, it just needs someone to work on it.

jorisvandenbossche commented 1 year ago

I am not sure this issue prevents Ray from updating. First, this issue is not causing a "broken" package, but AFAIK it's only a performance issue (that can be worked around with a copy). Further, looking at the Ray PRs linked above, it seems they actually implemented a workaround on their side by using IPC to support Arrow 7+ (https://github.com/ray-project/ray/pull/29055). And the comment in Ray's setup.py mentions "Serialization workaround for pyarrow 7.0.0+ doesn't work for Windows.", so mentioning Windows, while this issue is not Windows-specific.

Of course we should still fix this issue, because it is a serious annoying an unexpected behaviour of pickling arrow data, but so not fully sure this is the blocker for updating Ray's pyarrow dependency on conda-forge's side.

jorisvandenbossche commented 1 year ago

Looking at the Ray PR I linked more closely, it indeed mentions after being merged that this was failing Windows' CI, and then it was reverted, and added back later on in steps with this restriction on the pyarrow version just for windows. But if the workaround to use IPC serialization instead of pickle doesn't work on Windows, it might be interesting to know why this is or which bug exactly causes that (it might be easier to fix).

jorisvandenbossche commented 1 year ago

Apologies for the delay on this one! I have an implementation of (1) working e2e locally now, where the buffer traversal + truncation is shared by both the IPC serialization and pickle paths, but I haven't had time to clean up the handling of dictionary arrays, which is currently a bit ugly. I'm hoping to get back to it this week and push up a PR.

@clarkzinzow apologies for the very slow reply on your earlier message about having a working implementation. If you would still have the time to create a PR for this, that's certainly welcome! (or push the branch somewhere, in case someone else might be able to finish it) In the meantime, @anjakefala is planning to make a PR for the simpler option (2), which shouldn't require too much custom code, but at least already allows to explore a fix.

clarkzinzow commented 1 year ago

Hi @jorisvandenbossche, apologies for the delays on my end! The application-level workaround for Ray ended up sufficing, so I never got back to submitting a PR, and I must have missed these notifications.

I have an out-of-date branch that works e2e for option (1), where the buffer traversal + truncation is shared by both the IPC serialization and pickle paths: a BufferAggregator interface is introduced with an IPC implementation and an implementation that accumulates buffers into a payload (similar to the C data interface) that can be serialized with language-specific schemes by Arrow front-ends (e.g. pickle via Python).

This has a few key pros over option (1) that I believe make it a compelling choice:

  1. This does not break pickle protocol 5's out-of-band data support for Arrow data, which I actually think is table stakes for users sending Arrow data over the wire using pickle. Option (2) will allow the array buffers to be treated as out-of-band data, eliding 2 extra data copies in the e2e pickling flow compared to option (1).
  2. Option (2) does not require wrapping all Arrow data in a RecordBatch, which elides 230 redundant bytes per payload compared to option (1). For systems sending small chunks of Arrow data over the wire, this could be non-negligible.

But this still has a few TODOs:

  1. Rebase onto latest master and resolve any pop-up conflicts.
  2. Move BufferAggregator, ArrayBufferPayload, and ArraySerializerBufferAggregator definitions/implementations out of the IPC code; these could be top-level, under arrow/util, under a new directory, etc.
  3. Misc. cleanup (e.g. removing debug logging).

I don't think that I'll have the bandwidth to get this across the line in the near-term, but I think that most of the difficult work is done if someone else has the time! cc @anjakefala

https://github.com/apache/arrow/compare/main...clarkzinzow:arrow:arrow-10739-pickle-buffer-truncation-fix

anjakefala commented 1 year ago

I had an already started branch for Option (2) (directly using IPC serialization for pickling), and confirmed that it does not support Pickle Protocol 5. That made that approach untenable.

@pitrou Would you be able to take a look at @clarkzinzow's branch. It aims to be an implementation for Option (1):

Refactor the IPC writer's per-type buffer truncation logic into utilities that can be shared by the IPC serialization path and the pickle serialization path.

Is it a decent starting point? I would be happy to adopt it, and break it up into smaller PRs if it looks like a promising approach.

pitrou commented 1 year ago

Hi @anjakefala , while I agree the Option (1) is a good approach, I'm a bit skeptical about the proposed implementation. Two general comments:

  1. This should be a general utility function, not something in ipc
  2. I'm not sure what the point is of introducing a ArrayBufferPayload class that looks a lot like ArrayData. I would instead expect a function that takes a ArrayData (or Array) and returns a new ArrayData (or Array respectively, depending on which one is more convenient) with its buffers adjusted.

I might be missing something, though. Are there some concrete constraints mandating this particular implementation?

jorisvandenbossche commented 1 year ago
  1. This should be a general utility function, not something in ipc

Just to be fair, @clarkzinzow noted that as one of the remaining todo's of his branch

2. 'm not sure what the point is of introducing a ArrayBufferPayload class that looks a lot like ArrayData

That is also not really clear to me. Above the following brief explanation was given (https://github.com/apache/arrow/issues/26685#issuecomment-1703199976):

a BufferAggregator interface is introduced with an IPC implementation and an implementation that accumulates buffers into a payload (similar to the C data interface) that can be serialized with language-specific schemes by Arrow front-ends (e.g. pickle via Python).

@clarkzinzow can you explain a bit your rationale of going with this payload class?

clarkzinzow commented 1 year ago

@clarkzinzow can you explain a bit your rationale of going with this payload class?

@jorisvandenbossche @pitrou IIRC I started with ArrayData as the payload aggregation class, but ended up wanting a parent pointer to make the visitor-based aggregation clean, so we could hop back to the parent payload at Seal() time. I thought that the payload class would end up deviating in other ways, but it ended up being very close to ArrayData, so I do think that we should find a way to make ArrayData work.

One option would be maintaining an explicit shared-pointer stack of in-progress ArrayDatas in the aggregator, which would obviate the need for the parent pointer:

With that change, I believe that ArrayData would suffice as the buffer aggregation payload.

pitrou commented 1 year ago

One option would be maintaining an explicit shared-pointer stack of in-progress ArrayDatas in the aggregator, which would obviate the need for the parent pointer:

I have no idea why you would need that. I would expect this API to be a single function call and the underlying implementation is free to keep temporary state while moving recursively along the tree.

clarkzinzow commented 1 year ago

@pitrou The API would be a single function call. I'm referring to internal implementation details of the aggregator that's passed to the array visitor.

The branch I linked refactors the IPC serialization code into a generic array visitor that takes a BufferAggregator, of which there are two implementations: one for IPC and one for serializing to some payload that can be used by language frontends for their own serialization schemes. I'm saying that the latter can explicitly maintain a stack internally, which would allow us to use ArrayData as the payload.

The API exposed to language frontends would just be a single function call returning that payload (ArrayData).

pitrou commented 1 year ago

I don't understand why you want a visitor API? Typically, slicing the buffers should be a cheap operation, so you can just do it all at once.

clarkzinzow commented 1 year ago

The visitor API isn't exposed to array serialization users, it's an (existing) implementation detail of the serialization. All that this PR does is factor out the IPC buffer aggregation so we can inject a different aggregator for frontend language serialization (pickle).

pitrou commented 1 year ago

I am not talking about RecordBatchSerializer, but about a new API to adjust buffers inside a ArrayData.

clarkzinzow commented 1 year ago

Oh, that makes more sense! So you're saying that an API should be exposed on Array/ArrayData to recursively truncate buffers, rather than piggyback off of IPC's array visitor + "serialize to a payload" logic? That would probably be cleaner from a frontend language's perspective.

There's still array-type-specific buffer truncation logic that would require an array-type-based visitor, and it's less clear to me how to marry that array visitor with the IPC serialization code in order to share that buffer truncation logic without making the IPC serialization code less efficient. Right now the IPC serialization truncates buffers and aggregates buffers into a flat list on a single recursive pass over the array tree, while factoring that buffer truncation logic out into an ArrayData.truncate_buffers() API would seem to necessitate two passes of the array tree: one to truncate buffers and one to aggregate the flat buffer list.

pitrou commented 1 year ago

I think we can definitely accept two passes on the array tree. There are existing benchmarks that can validate this, so we can refine later.

anjakefala commented 1 year ago

Thank you @clarkzinzow and @pitrou! I will put forth a design proposal. =)

anmyachev commented 6 months ago

Hello @anjakefala! May I know the status of the proposal for this issue?