akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka/current/
Other
1.26k stars 647 forks source link

Provide ways to configure PubSub gRPC backpressure #2141

Closed jrudolph closed 4 years ago

jrudolph commented 4 years ago

As was observed in https://discuss.lightbend.com/t/akka-stream-initial-backpressure-throttle-on-source-has-no-effect/5865, the PubSub connector might pull down a full buffer of messages before it starts backpressuring. This might instantly create a sizable backlog if the message size is small and the processing time long.

This is explicitly discussed in the Google PubSub documentation. It would be good if the user could be given more fine-grained control over the backlog. (Also that problem and solutions should be documented). It's not quite clear to me if it can achieved using the streamingPull mechanism or if there should be an alternative implementation that uses (potentially multiple concurrent) pull requests to fetch in elements with tighter control from the downstream consumer.

See also https://github.com/akka/akka-grpc/issues/501

jrudolph commented 4 years ago

Maybe to give an additional point about the trade-offs:

mdibaiee commented 4 years ago

@jrudolph Thanks for filing the issue. I am willing to implement this if you can help me with an example of a similar implementation in other alpakka connectors perhaps. I would think polling is a common pattern so alpakka might already have abstractions for that?

As you pointed out, I think one solution is to use synchronous pull requests instead of streamingPull where max_messages has to respect backpressure.

https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#pullrequest

jrudolph commented 4 years ago

I am willing to implement this if you can help me with an example of a similar implementation in other alpakka connectors perhaps. I would think polling is a common pattern so alpakka might already have abstractions for that?

Thanks, @mdibaiee. Not to familiar with what can be found in all of alpakka, maybe @ennru would know about an example about how to do efficient polling in alpakka?

As you pointed out, I think one solution is to use synchronous pull requests instead of streamingPull where max_messages has to respect backpressure.

Yep, that would be the idea with max_messages being set to a configurable value to batch pulling. One could think of requesting the next batch even when the previous batch hasn't been consumed completely to try to optimize throughput even in that case.

seglo commented 4 years ago

I like the idea of changing the implementation to using batch polling PullRequests for more granular control over the response size. The current implementation uses a poll of sorts (there's even a poll-interval configuration). It's a little confusing as to why this is necessary because we're sending a stream of StreamingPullRequests and receiving a stream of StreamingPullResponses, I guess as a keep alive?

I think a simple Source.tick based poll based on the current poll-interval is a good starting point.

I'm not sure what a default of max_messages should be. We could experiment and see what's returned if we omit the parameter if it's not required.

jrudolph commented 4 years ago

I'm not sure what a default of max_messages should be. We could experiment and see what's returned if we omit the parameter if it's not required.

I don't think there's a good default for all use cases. Basically, how to set this value is all this ticket is about: sizing the buffer. If you only pull one element at a time, you have a "guaranteed latency" between two subsequent elements which is the latency of the grpc connection sending the request and pulling down an element. You want to hide that latency by introducing a prefetch which needs to be sized in a way that the grpc latency amortized per element is in about the same order of magnitude as the downstream processing time per element. That would be the optimal value (but maybe hard to find out and tune). So, one can either choose a value slightly bigger, in which case you have more elements buffered which are not immediately processed (and cannot be processed by another consumer). Or, you choose a smaller value than the optimum in which case downstream consumers have to wait for the grpc connection for more elements to process but it is guaranteed that every element is immediately processed and not stuck in the queue.

(There's also a point where the existing streamingPullRequest becomes the more efficient solution: that's when elements are so big that it's not the pull batch size that limits throughput but the grpc buffers, but that's probably something that doesn't need to be optimized for?)

seglo commented 4 years ago

You want to hide that latency by introducing a prefetch which needs to be sized in a way that the grpc latency amortized per element is in about the same order of magnitude as the downstream processing time per element. That would be the optimal value (but maybe hard to find out and tune).

That sounds ideal, but realistically that might be a hard solution to justify for this connector in particular.

So, one can either choose a value slightly bigger, in which case you have more elements buffered which are not immediately processed (and cannot be processed by another consumer). Or, you choose a smaller value than the optimum in which case downstream consumers have to wait for the grpc connection for more elements to process but it is guaranteed that every element is immediately processed and not stuck in the queue.

I guess our best option is something smaller. It's unfortunate we can not also set batch size based on bytes, that may be easier to reason about from our perspective, where as element sizes themselves could be large (though maybe PubSub has some limits, like Kafka, where the default max size per message is 1MB).

(There's also a point where the existing streamingPullRequest becomes the more efficient solution: that's when elements are so big that it's not the pull batch size that limits throughput but the grpc buffers, but that's probably something that doesn't need to be optimized for?)

We could support both and let the user choose with a feature flag, or just add more factory methods.

mdibaiee commented 4 years ago

@seglo @jrudolph so is there a plan of attack for this issue yet? I don't understand from the discussion how backpressure will be applied, as you seem to be discussing over what max_messages to set. My understanding was that this value could be dynamic based on backpressure / request from downstream?

Or perhaps you are thinking we can have a fixed max_messages and then control the number of requests we send per unit of time?

seglo commented 4 years ago

I don't understand from the discussion how backpressure will be applied, as you seem to be discussing over what max_messages to set. My understanding was that this value could be dynamic based on backpressure / request from downstream?

The max_messages request parameter determines the max messages returned in a response. From an Akka Streams perspective we're only aware that an outlet downstream is ready for an element, not how many elements it can accommodate upfron, so most Alpakka sources will simply request messages from the 3rd party and then buffer them internally and send them downstream as there is demand (by checking availability of the outlet port).

It's certainly possible to tune max_messages by tracking how frequently we can push elements downstream. That would be smarter than most (if not all) of the Alpakka source integrations we currently have, but it could be done. I think it might be a little more smarts than we need, but it would be really cool to see an implementation.

What would be easier would be to set a default max_messages that could be buffered in most systems. This default could be overridden depending on the user's app runtime characteristics.

mdibaiee commented 4 years ago

Thank you all for your help, the pull-request is now merged :tada: so I think this issue can be closed unless there are plans for a more sophisticated implementation.

ennru commented 4 years ago

PullRequest support was introduced with #2166