apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.01k stars 3.42k forks source link

[C++][FlightRPC] Memory tracking for arrow flight over grpc #37900

Open jkammerer opened 10 months ago

jkammerer commented 10 months ago

Describe the enhancement requested

We use Arrow flight over GRPC in C++ to request large volumes of data from another service. This data is streamed to us using potentially large individual messages. Each request uses its own grpc::channel. To guarantee the stability of our service, we need to be able to track the memory that is allocated for a request. When reading a message, we see that the process memory intermittently increases by the message’s size. Since we have messages of multiple megabytes, we need to be able to track these internal allocations. It is important for us that this memory tracking happens before the allocation, such that, we can decide to either allow or forbid the allocation based on our memory budget.

Our problems could be solved with some kind of custom allocator support or if we could get the size of the next incoming message before it is completely read into memory.

Using GRPC’s NextMessageSize() does not fit our needs as it always returns the set MaxReceiveMessageSize of the underlying GRPC channel. However, we would need to know the actual upcoming message size because our maximum message size is a lot larger than what we expect to receive in the “normal” cases. Always expecting the maximum message size would lead to large amounts of unused memory being held in reserve by our system.

We also investigated whether this problem can be solved on the GRPC level but did not find a satisfying solution. We opened an issue for this as well.

Component(s)

C++, FlightRPC

lidavidm commented 10 months ago

I don't think there is anything we can do on the Arrow side about gRPC. By the time the message reaches Arrow code, gRPC has already allocated the memory for the message. (And even if we redesigned the protocol to send message headers separately from the data - which would be a massive breaking change - there's no guarantee that gRPC won't read ahead in the stream anyways while the application is processing the first message. That might be tunable, of course. Relevant SO answer from a gRPC team member.)

gRPC used to support a custom allocator API, but they removed it a long time ago: https://github.com/grpc/proposal/blob/master/L60-core-remove-custom-allocator.md

From the justifications there, it sounds unlikely they'd care to bring it back (though perhaps you could persuade them to use one only for message data).

One workaround: this would add a ton of round trips, but you could do something with DoExchange at the application level, where the server tells the client the size of each message and waits for the client to send a message actively confirming that it is ready for data.

ankoh commented 10 months ago

@lidavidm let me maybe follow-up here with a more general question:

Isn't this a general problem for all Arrow Flight (SQL) users that challenges the general "one batch per message" approach that is currently locked on with FlightData?

We agree that this memory-tracking raises more questions for gRPC than for Arrow Flight but still, this limitation is a very intricate issue for any database system that considers adopting Arrow Flight (SQL) for general data exchange. gRPC defaults to very small message sizes and this message size is sometimes hard to control (custom clients, L7 proxies like Envoy). In a more complex environment, it happens very quickly that some component on the request path is not aware or prepared to deal with larger-than-default gRPC messages.

The question that I ask myself for the Arrow Flight (SQL) case is: Isn't this reason enough to make the "one batch per message" design at least optional for FlightData? Today, Arrow Flight (SQL) clients are not prepared to buffer up multiple FlightData messages to retrieve a record batch. I think that all these issues around message sizes may be reason enough to reconsider this decision, at least as opt-in. We care a lot about performance, but I get the impression that sparing the single additional copy that would be needed to copy the IPC bytes out of multiple FlightDatas does not justify all the trouble around message size limits.

One detail that complicates this discussion significantly though is that the FlightData message has split data_header and data_body fields. I'm quite wondering why this was done in the first place as you could have just written the encapsulated Arrow IPC stream format into a single data field. It wouldn't make a difference for reading FlightData zero-copy out of the gRPC ByteBuffer. What you would gain is that you could just pass bytes to an Arrow IPC stream reader as-is and it would also allow for optional binary chunking.

ankoh commented 10 months ago

We're currently considering using Arrow over gRPC widely, and a standard like Arrow Flight (SQL) seems like a very promising candidate for such discussions. But looking at FlightData, I'm very unsure if Arrow Flight is prepared for dealing with these size limits that will turn into a serious problem in any larger service mesh.

Rolling an own Arrow over gRPC protocol loses us compatibility to Arrow Flight (SQL) clients but we would be able to stream chunks into a buffering Arrow IPC stream reader. That stays compatible to the gRPC default message size limit with the option to use larger message sizes and omit the copy if the mesh allows for it.

Also @jacques-n @wesm .

lidavidm commented 10 months ago

@ankoh, could we continue that discussion in https://github.com/apache/arrow/issues/34485?