dashbitco / broadway_cloud_pub_sub

A Broadway producer for Google Cloud Pub/Sub
Apache License 2.0
70 stars 24 forks source link

Unblock producer while polling PubSub #86

Closed joladev closed 2 years ago

joladev commented 2 years ago

An attempt at fixing #85. I realised there were more issues with the existing implementation and I've mentioned some below.

This moves the blocking long-polling call to PubSub into a worker Task, which allows the producer process to continue responding to messages while waiting for messages from PubSub. Also ensures that any ongoing long polling request to PubSub is terminated when the prepare_for_draining callback is invoked.

Some examples of behaviors the existing producer implementation exhibits:

  1. When using receive_interval of 0 the producer would prior to this PR get blocked on every handle_demand. Say you have 100 processors each registering 1 demand, so you have 100 messages in the mailbox. The producer would pick the first one (with demand 1), kick off a long poll request to PubSub (with max_messages 1), wait for it to finish and return that message, then read the next message (with demand 1) etc. This means it only ever requests 1 message off of PubSub.
  2. When consuming a subscription with no messages and the default timeout of infinity, the producer would block for an infinite amount of time (or until GCP PubSub closes the connection). When Broadway is attempting to shut down it sends a message to the producer, which would be handled by the prepare_for_draining callback, but because the producer is waiting for PubSub it can't handle the message. Eventually the graceful termination deadline is hit and the producer is terminated.
  3. When Broadway starts shutting down the producer will still produce one last batch of messages, as it refuses to acknowledge the shutdown until it has finished waiting for PubSub. This leads to messages being lost during shutdown.

These things were not covered by tests because the test implementation of the message server did not mimic long polling. It would instantly return 0 messages instead of waiting for receive_timeout for messages to appear, which means the blocking issue did not exist in the tests.

I changed the test implementation to mimic long polling, which broke some tests.

How this PR fixes the examples:

  1. Because the producer is no longer blocking on long polling requests to PubSub it can freely register all the demand while asynchronously waiting for messages from the long polling request.
  2. Because the producer is no longer blocking on long polling requests to PubSub it can handle the prepare_for_draining callback promptly and shut down cleanly.
  3. Because the producer is no longer blocking on long polling requests to PubSub it can terminate the ongoing long polling request, reducing the risk of the producer pulling one last batch of messages during shutdown.

Let me know what you think of this approach and if you have any concerns!

josevalim commented 2 years ago

:green_heart: :blue_heart: :purple_heart: :yellow_heart: :heart: