Closed wojtekmach closed 5 years ago
That makes sense. I think the distinction I had really wanted to make was between sync and async pull, which isn't REST or gRPC specific.
That being said, I'm not even sure async pull makes sense within a Broadway implementation, as you have no control over the inbound message rate.
I didn't even know there an async pull. I did notice there's a push API which would require us to expose an endpoint. In both cases, exactly as you pointed out, providing back pressure would be a challenge, and afair that was by far the biggest challenge in our rabbitmq connector. So yeah, I think sticking to sync pull for now is the way to go!
@wojtekmach I've been looking more closely at the other two connectors, primarily RabbitMQ, and I'm wondering if something like this might be a better fit:
defmodule BroadwayCloudPubSub.Client do
@moduledoc """
A generic behaviour to implement Pub/Sub clients for `BroadwayCloudPubSub.Producer`.
"""
alias Broadway.Message
@type messages :: [Message.t()]
@type ack_ids :: [String.t()]
@type ack_deadline :: 0..600
@callback init(opts :: any) :: {:ok, normalized_opts :: any} | {:error, any}
@callback ack(ack_ids, opts :: any) :: :ok | {:error, any}
@callback pull(demand :: pos_integer, opts :: any) :: messages
@callback modify_ack_deadline(ack_ids, ack_deadline, opts :: any) :: :ok | {:error, any}
end
The Synchronous Pull example from the Google Docs shows the sort of thing I'd like to eventually be able to do behind the scenes: automatically modify the ack deadline while a message is still being processed.
To do so, I think ultimately the Producer module would become responsible for implementing Acknowledger, so the client can really just serve as a facade to the underlying request implementation.
The functions above would apply equally to a gRPC implementation as they do to the current REST impl. gRPC may require some additional callbacks, too.
Interesting @mcrumm. I haven't heard of the Sync Pull before. Do you have an example of where it would be beneficial? The docs didn't tell me much. :(
@josevalim Apologies, I may have muddied the context. Synchronous pull is the mechanism we are currently using to retrieve messages from a Cloud Pub/Sub topic. From the guide, synchronous pull might be used when, "[f]or example, the application logic might rely on a polling pattern to retrieve messages or require a precise cap on a number of messages retrieved by the client at any given time" (emphasis mine).
Synchronous pulls are made using a Pull request (as opposed to a StreamingPull request, which is not currently available in :google_api_pub_sub), which allows defining a maxMessages
value so that the response will never contain more messages than the requested demand. The producer is implemented to queue pending demand, and will start another pull request immediately if there is still demand to fulfill. Otherwise, it will poll, waiting :receive_interval
between each request.
This differs from an Asynchronous Pull, which is only available via the gRPC service. Clients that currently support async pull allow the user to define a callback to process messages as they are received. In the client implementations is where things get interesting, as most of the official clients (Python, Java, Ruby, etc.) have some type of "FlowControl" mechanism that gives the appearance of being able to limit the number of messages received via StreamingPull, but in actuality, they're just queueing messages internally based on the flow control settings:
The Python StreamingPullManager, for instance: https://github.com/googleapis/google-cloud-python/blob/7e16c08581a059a6c8e40b0403f4af1ebfe0fea6/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py#L128
So Synchronous Pull can apply to both HTTP and gRPC, and provides a built-in back-pressure mechanism which makes it a really great fit for the Broadway connector.
What I was trying to point out in the example code is that Google also has a mechanism to keep a message from being put back on the queue if it's still being processed. You do this by modifying the ack deadline on the message. Ideally, we could do this automatically because we know what messages have been received for processing, and we know what messages have yet to be acknowledged (either as a success or a failure).
For instance, the Python Leaser
class provides the mechanism to extend the message "lease" if it has not yet been acknowledged:
https://github.com/googleapis/google-cloud-python/blob/7e16c08581a059a6c8e40b0403f4af1ebfe0fea6/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
The Leaser is aided by a Histogram
that, by default, tracks the 99th percentile of ack requests to determine how long to extend the lease for a given message:
https://github.com/googleapis/google-cloud-python/blob/7e16c08581a059a6c8e40b0403f4af1ebfe0fea6/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/histogram.py
Does that make more sense?
Landed in 736abd8
Given Pub/Sub has both REST and gRPC interfaces, perhaps we should rename this to something more generic like
*.Client
or*.PubSubClient
? Our client contract is very simple and it should work for gRPC too.Thoughts?