snowplow-incubator / common-streams

Other
1 stars 0 forks source link

Pubsub source should not await futures when acking #14

Closed istreeter closed 12 months ago

istreeter commented 1 year ago

There was an occasion where “ack’ing” a message failed because of a network issue, and this caused the loader to get stuck waiting for a Future to complete.

When we call "ack" on a pubsub message, the pubsub sdk returns to us a Future. The Lake Loader awaits the result of that future, so we can check that the message got ack'd properly.

But... if there is an error acking and if the subscription does not have exactly-once-delivery enabled... then the sdk never completes the Future. It simply ignores the error. The Loader is stuck waiting for a Future that won't complete. Here is the problematic line in the google sdk source code.

To fix the problem we need to either change the loader to not wait on the Future, or change the subscription to enable the exactly-once feature. It is dangerous to rely on the latter, because we (app developers) don't have control over the subscription.

This PR works by changing "ack" so it is fire-and-forget, from point of view of the app. We trust that the underlying pubsub Subscriber will do the ack eventually. This is relatively safe because the underlying Subscriber has retry logic built-in, and anyway it's not a critical error if we fail to ack something.

By the way, the permutive pubsub source does something equivalent to this. So this is consistent with existing snowplow apps.

istreeter commented 12 months ago

Do I understand correctly that with this change, if we had the same case as before where the app got stuck, we would just re process the event and get one duplicate (which is fine) ?

Yes, exactly right. Pubsub would eventually re-send the event to a pod (possibly same pod, possibly a different pod) and we would get a duplicate.

Out of curiosity, why are defining bufferMaxBytes instead of using the usual maximum outstanding request bytes in the subscriber?

It's because of apps that require windowing:

Those apps deliberately wait several minutes before acking events. If we set maximum outstanding request bytes on the subscriber, then after a few seconds the underlying pubsub client would stop pulling events from pubsub. Because it puts a limit on how many un-acked messages you can hold at once. Whereas we need the underlying client to keep on pulling events even if we have not yet acked previous events.

But then there is a danger that the client pulls too many events, which would cause the app to run out of memory. So this is why I used the buffer + a semaphore, so it throttles the pubsub client.

benjben commented 12 months ago

Oh yeah I was too focused on Enrich and missed the windowing. The mechanism with Semaphore looks great :+1: It might be a little less performant than maxRequestBytes but at least it is generic for all our apps.