googleapis / google-cloud-go

Google Cloud Client Libraries for Go.
https://cloud.google.com/go/docs/reference
Apache License 2.0
3.73k stars 1.28k forks source link

pubsub: Receive sometimes hangs, without returning error, in synchronous mode #1302

Closed cristiangraz closed 5 years ago

cristiangraz commented 5 years ago

Client

pubsub

Describe Your Environment

alpine:3.8 running binary built with golang:1.11.5-alpine3.8 running in a GKE Kubernetes pod.

google-cloud-go version = v0.35.1 grpc version = v1.18.0

Expected Behavior

In synchronous mode, messages continue pulling when there are messages still on the queue to pull. If messages are not able to be pulled, Receive() should exit with an error.

Actual Behavior

Receive() hangs on occasion but does not always exit or return an error.

This causes our worker to appear to be running while no messages are processed. Once we've noticed, restarting the worker (which initiates a fresh Pull() will immediately process all of the messages that were sitting on the queue).

Also I want to be clear that sometimes Receive() does return an error and we see it logged. But not always, leading to a worker that is still technically running but not actually processing any messages.

Code sample

On ReceiveSettings, we set MaxOutstandingMessages=30 and Synchronous=true. In other subscriptions this has happened with MaxOutstandingMessages=10, MaxExtension=4 minutes, and Synchronous=true

ch := make(chan pubsub.Message)
go func(s *Subscriber, ch chan pubsub.Message) {
    defer close(ch)

    s.Logger.Info("starting subscriber...")
    if err := s.sub.Receive(s.ctx, func(_ context.Context, m *gpubsub.Message) {
        ch <- &message{
            id:   m.ID,
            msg:  m.Data,
            ack:  m.Ack,
            nack: m.Nack,
        }
    }); err != nil {
        s.Logger.WithError(err).Error("error receiving messages")
    }
}(s, ch)
return ch, nil

The caller runs in an infinite for loop with a select on either ctx.Done() or a message being received on ch. If the channel is closed we exit. Anytime we shut down (either due to receiving a SIGTERM/SIGKILL, context cancelation, messages failing to pull as logged above) we log the error, exit, and restart the pod.

We do observe the error receiving messages line occasionally logged, and the shutdown behavior of our code/pod works as expected. But there are times where no error occurs but no new messages are being pulled (when there are messages on the queue waiting to be processed).

We also tend to process fewer messages than others on these queues. In some cases a few hundred messages per day, and some days there are zero messages processed. I've been able to go back and see the hanging originate in the middle of messages being processed (vs hanging after a long period of "quiet" time).

Options explored

In synchronous mode, only MaxOutstandingMessages are pulled at a time. We use SQL locks to protect against race conditions/concurrent message processing as well as to avoid duplicate work on a message. We use exponential backoff and a jitter and only hold the messages for 30s to avoid head-of-line blocking.

I suspected that if the worker was holding MaxOutstandingMessages then Pull() would not pull any others until one or more slots opened up. However, in looking at the number of outstanding messages and SQL locks open there were only 8 of the 30 possible messages. We also cycle those out every 30sec so messages should still get through, even if there is a bit of a lag. What we're seeing is no messages getting through.

In one case (on another subscriber) over a 16 day stall period where hundreds of messages were continuously being added to the queue but not being pulled for the 16 days until we caught it and restarted.

jeanbza commented 5 years ago

When / where are you acking messages? Also, could you provide your ReceiveSettings?

cristiangraz commented 5 years ago

The only ReceiveSettings we set are the ones above -- MaxOutstandingMessages (the exact number varies by worker), Synchronous=true, and in some cases MaxExtension.

Every worker we've seen this issue occur had Synchronous=true and MaxOutstandingMessages set. Some of the workers had set MaxExtension and others hadn't. So that one must've fallen back to DefaultReceiveSettings.

We provide each worker with a custom Message interface that has a Done(ack bool) error method signature. Every worker has a defer m.Done(false) which acts as a no-op if m.Done has been called. Otherwise we m.Done(true) if completed successfully. So the acks/nacks are called from the worker doing the work. In the code above you can see where we set the ack and nack functions needed on message from gpubsub, which in turn are called by m.Done(ack bool) depending on the value of the ack bool.

We alias the import: gpubsub "cloud.google.com/go/pubsub"

limbalrings commented 5 years ago

We're seeing similar occasional Subscription issues where messages start backing up in the PubSub and stop calling our callback function, without Subscription.Receive throwing an error.

We have Synchronous = true and MaxOutstandingMessages < 100

Our Dockerfile is From golang:1.11 running in a GKE Kubernetes pod.

google-cloud-go version = v0.34.0 grpc version = v1.17.0

I will try updating google-cloud-go version and grpc version, but it looks like that won't be the fix.

cristiangraz commented 5 years ago

@jadekler Wanted to follow up on reclassifying this from type: question to investigating or bug. It seems others are also having this issue as well.

This has significant effect because it's an intermittent, quiet failure of either Pull failing or the callback not being called without an error. Since streaming pull was released, there are many issues here with comments encouraging users to use synchronous pull for their use case, and many may not realize this is happening in production since there are no errors/logs -- messages may just be quietly expiring after the 7 day period of not being acked.

And while there is some Stackdriver monitoring, as far as I know there is no way to periodically check via the API to see the number of unacknowledged messages on a particular subscription in order to manually intervene or even programmatically restart a worker that may not be completing pending work.

jeanbza commented 5 years ago

Sorry, this fell off my radar. Back to investigating it.

jeanbza commented 5 years ago

Thanks again for your experience reports. I've done more spelunking, and unfortunately failed to find a reason this might happen, nor am I able to reproduce this. Next steps:

The minimal repro is a very-nice-to-have, but we can make do with the first two. :)


We provide each worker with a custom Message interface that has a Done(ack bool) error method signature. Every worker has a defer m.Done(false) which acts as a no-op if m.Done has been called.

Quick note @cristiangraz : each Ack/Nack after the first is already a no-op on our side, so the ack bool may be superfluous.

jeanbza commented 5 years ago

Also, I don't suppose you have an opencensus exporter configured? If you have a stackdriver exporter set up, for example, we can use some OC metrics to help debug.

cristiangraz commented 5 years ago

@jadekler Thanks for the detailed reply and investigation.

We ack and nack every message that is processed -- I've double and triple checked. The only scenario where that wouldn't happen is if the worker's code didn't actually get called.

A year or so ago this library was sending significant amounts of duplicate (and concurrent) messages. We wrapped all of our workers in SQL locks by subscription/msg id to prevent concurrent or duplicate work. One possibility for failing to ack/nack is if we fill up our max prefetch waiting for locks before we were able to dispatch the worker's function where the ack or nack happens. In that scenario, we'd basically fill up our queue with workers waiting to start.

I'm not exactly sure how that could happen given some other protections we have in place, but to be on the safe side, after opening this issue I:

In terms of this library, would it make sense to have a safeguard to return an error if the flow control is exhausted beyond a configurable period of time? So for example, if the flow control is exhausted with no new messages coming in for var maxFlowControlExhausted time.Duration, a pubsub.ErrFlowControlExhausted error is returned from receive()?

If I know that my workers take no more than 5s to run, and I set the config at 10s, an error can be returned anytime the flow control is exhausted for more than 10s with no change.

I'm thinking of this more as a safeguard that could either prevent this issue now, or if it's not happening, serve as a protection in the future from that ever occurring.

jeanbza commented 5 years ago

Thanks, I look forward to hearing back from your investigation.

In terms of this library, would it make sense to have a safeguard to return an error if the flow control is exhausted beyond a configurable period of time? So for example, if the flow control is exhausted with no new messages coming in for var maxFlowControlExhausted time.Duration, a pubsub.ErrFlowControlExhausted error is returned from receive()?

cc @kolea2 @kir-titievsky for feature request.

cristiangraz commented 5 years ago

@jadekler After implementing these changes, I have not seen this issue appear again. Since we have not yet heard from @limbalrings, I'm going to close this issue.

Thanks for all your help and pointing me towards the "flow control exhausted" possibility. The timeouts and deferred nacks (as a failsafe) now act as an important safeguard for us in synchronous mode. Hopefully this is helpful to others who may run into this issue in the future.

jeanbza commented 5 years ago

Thanks for reporting back, @cristiangraz :+1:

alicebob commented 5 years ago

Hi,

we ran in the same issue. We didn't nack messages on purpose, as a way to wait before we retry on error. It's not really clear (to me) from the documentation that you need to call (n)ack for every message; I interpreted the documentation that calling nack is optional, and it only leads to quicker retries. Maybe the documentation can be improved?

Is there be a way to add a pause before a message is retried?

jeanbza commented 5 years ago

@alicebob There is not. All messages must be acked/nacked. Please let us know if there is a place we can make that more clear. (by filing a new issue)