googleapis / google-cloud-go

Google Cloud Client Libraries for Go.
https://cloud.google.com/go/docs/reference
Apache License 2.0
3.72k stars 1.27k forks source link

pubsub: Large number of duplicate messages suddenly #778

Closed cristiangraz closed 5 years ago

cristiangraz commented 6 years ago

I've been noticing an influx of duplicate messages. Previously I don't think I had ever came across one, suddenly started seeing large volumes of dupes. For example out of 50 messages that were all ACKd just recently, I saw 48 duplicates. It sounds similar to this issue: https://github.com/GoogleCloudPlatform/google-cloud-java/issues/2465

When I look in Google Cloud Console at the API requests, I'm seeing large numbers of 499 error codes. In the last 4 days I have 1,312 200 error codes, but 7,580 499 error codes.

MaxOutstandingMessages = 10 MaxExtension = 10 minutes

Versions:

jba commented 6 years ago

@marklr, @anorth2, @jfbus and anyone else experiencing problems with Receive: we've been working on understanding the problem better.

One important fact I noticed when looking carefully at the Stackdriver Monitoring UI: the "StreamingPull message operations" and "StreamingPull Acknowledge message operations" are delta metrics, and the graphs are created with a "mean" aligner by default, so each data point represents the average number of messages since the last data point. But the graph is a line graph, which (to me, at least) suggests a rate. For example, I just published about 6000 messages over the course of a minute, or 60 msgs/sec. They were received by a simple program that used the default MaxOutstandingMessages of 1000 and acked each message after 20 seconds, for a maximum ack rate of 50 msgs/sec. These are the graphs I got:

streamingpull message operations 2 streamingpull acknowledge message operations It's not clear where the numbers on the y-axis come from, and it looks like there's a gap of about 1,000 messages between pulls and acks, but that is misleading: the acks happen over a longer period of time. You have to integrate under the graph to compare. I find looking at rates to be much clearer. When I switched the aligner to "rate" (three-dot menu, Edit, Show more options, Aligner), I got these graphs:

streamingpull message operations 3 streamingpull acknowledge message operations 1 These clearly show that I was publishing at 60 msgs/sec and acking at a peak 50 msgs/sec, exactly as expected.

Apologies if this is obvious to you all, but it wasn't to me.

None of that implies that there isn't a problem with PubSub on GKE. To try to collect more data, I added OpenCensus instrumentation to the client, with several measures related to streaming pull. The measures include stream opens and RPC retries as well as pull, ack, nack and modack counts. The code is in the latest commit of this repo. The program I linked to above exports the measures to StackDriver, and also logs them. When adding a graph to the UI, search for metrics containing "cloud.google.com/go/pubsub". Avoid the ones that end in "/cumulative"; they are obsolete.

I ran that program as a standalone pod on GKE, using kubectl logs -f to watch the logs directly. I didn't see anything that looked unusual. Every message eventually was acked. The number of modacks was five times the number of messages, but that is expected: we modack immediately when we get a message and then every 5s (when the ack deadline is 10s), so 5 modacks for 20s of processing is about right. (The number would be much lower if the ack deadline were longer). I did see Unavailable errors cause the stream to reopen every 90s or so, but crucially, that was only during idle periods: while messages were actively being pulled, the stream never failed.

So in short, I'm unable to reproduce the problem, and in fact I'm still not really clear on what the symptoms are. I hope the new metrics will shed some light.

jba commented 6 years ago

We haven't heard any updates from those affected by this, and we currently have no way to reproduce. Closing until we have actionable information.

tecbot commented 6 years ago

@jba We updated pubsub vendor to the latest version of one service which was affected by this bug in the past to test it again. The issue still exists. The unacked msg counter increases linear and we get always the same message over and over again regardless we ack them immediately on the client side. How we can help you to debug this problem?

jba commented 6 years ago

@tecbot:

pongad commented 6 years ago

I'll unassign myself from this, but I'll keep an eye on it.

tecbot commented 6 years ago

@jba:

Are you also running on GKE? If so, does the same problem occur if you run your docker container directly on GCE?

Yep, running on GKE but we have not tested it on GCE yet, but we had this service in the past in our own datacenter with the same error, so it shouldn't depend on it.

Please share as much of your Receive code as you're comfortable with. Especially useful are the ReceiveSettings you're using.

Our Receive looks simplified like that (removed code is protobuf parsing and uses a different fn to execute), maybe one important point is that we delay the execution for 20s, also the real fn can take 2min to complete (depends on the data).

const delay = 20 * time.Second

func Do(ctx context.Context, sub *pubsub.Subscription, fn func(byte[]) error) error {
    err := sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
        // delay the execution
        select {
        case <-ctx.Done():
            m.Nack()
            return
        case <-time.NewTimer(delay).C:
        }
        if err := fn(m.Data); err != nil {
            m.Nack()
            return
        }
        m.Ack()
    })
    if err != context.Canceled {
        return err
    }
    return nil
}

Creating the subscription, we set MaxOutstandingMessages to 30 to limit throughput:

        ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
    defer cancel()

    client, err := pubsub.NewClient(ctx, projectID)
    if err != nil {
        ...
    }
    sub := client.Subscription(subID)
    ok, err := sub.Exists(ctx)
    if err != nil {
        ...
    }
    if !ok {
        sub, err = client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
            Topic:       client.Topic(topicID),
            AckDeadline: 60 * time.Second,
        })
        if err != nil {
            ...
        }
    }
    sub.ReceiveSettings.MaxOutstandingMessages = 30
    return sub

Could you enable the client instrumentation we recently added? Details are in my comment above.

We added the instrumentation, here you have multiple graphs. What we can see so far is that there are intervals without any Acks, then afterwards a burst happens of Acks. Doesn't look right to me.

Graph for 12 hours:

pubsub_12hours

Graph for 3 hours:

pubsub_3hours

Graph for a time frame with out peaks:

pubsub_extract

Here are graph from stackdriver:

bildschirmfoto 2018-03-08 um 10 40 26
tecbot commented 6 years ago

@jba we tested a different service without any delay and there is the same issue. There is always only a short period of time where message comes in, and then there is ~20 min break without any messages. This repeats every time. Maybe to mentioned both topics have a low throughput on the publisher side, ~1 new msg/s. Do you bundle/buffer msgs on the server side before publish them to stream subscriptions?

tecbot commented 6 years ago

@jba any updates? We tested more scenarios and for us it's clear that it depends on the publish rate of the topic. We have always problems with subscriptions if the publish rate is slow. We can see that subscribers don't receive messages at all or with a really big delay ~10-20 minutes but at the same time stackdriver unacked messages increases. Subscriptions with a publish rate ~1000/s have no problems.

Edit: We added now a "ping" msg to publish every 100ms a message in the topic, which "resolves" the stuck subscribers. If we stop the pinger, the subscribers stuck instantly.

jba commented 6 years ago

Thanks for all the info. We don't have a theory yet about what you're seeing. The server doesn't buffer, and it doesn't rebundle—if the publisher sends 3 messages in one Publish RPC, then the subscriber will receive those three messages in one chunk.

The fact that your "ping" messages unsticks the subscribers is very interesting—we haven't observed that before.

I'm trying to look at the black graphs but am having trouble interpreting the y-axis. The raw counters always increment, so I don't think you could be using those directly since your graphs go down. Are you showing a delta? For example, in the third graph (between peaks), the average value for Ack is about 88. Is that 88 acks over the last 10s sampling interval? (OpenCensus sends sample data every 10s, unless you call SetReportingPeriod.) Is it 88 acks per second? Neither makes sense with the numbers you provided: if you're handling 30 messages every 2 minutes or so, your ack rate should be .25/s.

jeanbza commented 6 years ago

@tecbot Something is funky, or I'm misunderstanding.

For the top graphs: I think your graphs are saying that acks are happening steadily at around 90 per minute (or per second? I think it's saying per minute... but not sure), which seems good.

For the bottom graphs: Could you provide a also a graph showing num_undelivered_messages, as well as instrumentation around the time it takes to ack messages? The two reasons for the spikiness I could think of: there actually aren't any undelivered messages, or the processing time is ~20-40m. I think it is really important to have instrumentation around process time for us to understand what's going on with your app.

Furthermore, I've taken your code and replicated it (see program here). I'm not sure how you're sending, so I used the gcloud console to send a message each second (actually, initially 2s, then 1s - you'll see a slight uptick early on). Here are the charts:

screen shot 2018-03-14 at 10 27 02 am

So unfortunately I'm unable to repro. Could you describe the way you're publishing? I'm interested in the following:

edit: If you need any help instrumenting your app to show time to process the message, please let me know - happy to provide code snippet!

sfriquet commented 6 years ago

FWIW we also encounter a similar experience using PubSub on GKE.

We reproduce this behavior when we suddenly stop consuming a subscription and then starting to acknowledge messages again. When we notice such behavior the number of undelivered messages is in the order of the dozen of thousands at least.

When we restart consuming the subscription again by acking the messages, we see a surge in duplicated messages until the number of unack'ed messages drop to 0. We also notice that the rate of duplicate message decreases as the number of undelivered messages drop. Once the messages no longer accumulate in the queue we see almost no duplicate messages.

jba commented 6 years ago

@sfriquet:

stop consuming a subscription and then starting to acknowledge messages again

So you exit from the call to Receive, then wait a while, then call Receive again?

When we restart consuming the subscription again by acking the messages, we see a surge in duplicated messages...

The timer for a message's ack deadline begins when the message arrives on the client, or after 10 minutes, whichever comes first. Messages arrive on the client in bulk, and may be buffered (by proxies or the client's network layer) before they arrive. If you ack slowly, messages may time out, and then you will see duplicates.

For example, say there are 10,000 undelivered messages when you call Receive, and you process them one at a time (one subscriber process, NumGoroutines=1, MaxOutstandingMessages=1). You ack each message after 1 second. After 10 minutes you have seen 600 messages, but there may be many more that have left the server and are in a buffer somewhere. Ten seconds later (assuming the default ack deadline of 10s), these messages will time out. You will see and ack them as you continue processing, but the server will have already marked them as expired and will redeliver them.

The bottom line is that it can be hard to distinguish this behavior—which is "normal," though undesirable—from a bug in the PubSub client or service.

It would help if you could give us more data.

sfriquet commented 6 years ago

@jba

So you exit from the call to Receive, then wait a while, then call Receive again?

Yes and no. We notice duplicates in 2 circumstances.

pubsub_dup_rate

Also, slightly unrelated I guess, but it looks like Nack'ed messages are put back at the front of the queue, so that they are immediately fetched again by the clients. We saw a few times our pipeline completely stalling because of relatively few number of "invalid" messages that kept being retried.

For example [...]

Thanks for the example, if I understand this right:

So that'd mean that if a queue has accumulated too many undelivered messages, such that they can't be processed within (10 minutes + subscription deadline), then duplicates are to be expected. Is that right?

That'd match what we see then. What could we possibly do to mitigate this?

It would help if you could give us more data.

What would help?

In terms of settings, we use:

Thanks a lot for the clarification In the meantime what we did is we added a cache layer to filter out messages based on their ID.

jba commented 6 years ago

When the client pulls messages, an undefined? amount of messages are actually pulled from the server

The number of messages pulled will usually be the same as the number published together. For very high publish rates, the server may additionally batch separate messages that arrive close in time.

We saw a few times our pipeline completely stalling because of relatively few number of "invalid" messages that kept being retried.

Nacking a message will indeed cause it to be redelivered, perhaps promptly. One solution to this is to have a separate topic for invalid messages. If the messages are permanently invalid, then that topic should probably be subscribed to by something that alerts humans. If they are temporarily invalid, then the subscriber should ack the message, sleep on it a while, then republish it to the original topic. (There is no feature that publishes a message after a delay, or at a particular time in the future).

So that'd mean that if a queue has accumulated too many undelivered messages, such that they can't be processed within (10 minutes + subscription deadline), then duplicates are to be expected. Is that right?

Almost. They don't have to be processed by your code in that time, but they do have to make it onto the client. That may be a minor distinction or a large one, depending on a number of factors.

You should be able to mitigate the problem by

Discarding the dups, as you're doing, is also fine. When you say you added a "cache layer," do you mean a separate process/server? Because you may be able to get away with having each client process get rid of just the duplicates it sees. That won't be perfect, but it may be enough, since your system has to tolerate duplicates anyway.

sfriquet commented 6 years ago

Thanks for the advice.

When you say you added a "cache layer," do you mean a separate process/server? Because you may be able to get away with having each client process get rid of just the duplicates it sees.

Reading your explanations again it indeed seems that the duplicates would be on a per client basis. In such case 'caching' at the client level should work too and be much simpler to implement/maintain then.

jba commented 6 years ago

duplicates would be on a per client basis

I don't think that's right. The service will load balance messages (including re-sent ones) across all streams in a subscription. However, if you only have a handful of streams, a significant fraction will end up on the same one. For instance, if you have two streams (processes), then each will see half the messages, and get half the dups. So a per-client solution will weed out a quarter of the dups. I guess that's not a great ratio, now that I do the math. In any case, my point was that a per-client solution is much simpler architecturally, and maybe it gives you enough de-duping to make your system perform adequately.

rayrutjes commented 6 years ago

It seems like this issue is haunting us as we have it in multiple projects now. After having followed this whole discussion and also spent time studying the internals of this library, we came to the conclusion that the current library is designed for pipelines having a setup with a somewhat stable incoming messages, and a high throughput in terms of processing.

In our case, we receive spikes of messages every 5 minutes, and the processing time of 1 message can vary and take up to a couple of seconds sometimes and there is not expected correlation between the number of incoming messages and the speed at which we want to process them.

If our understanding is correct, the streaming pull strategy used in this library can eventually fetch more messages than the MaxOutstandingMessages, which from a developer experience point of view is a bit hard to understand. I do understand now that this allows for a very high throughput in some scenarios. However it also introduces all issues discussed in this thread.

On our side, we tried leveraging the non streaming pull approach and so far it seems to address the problems. However our solution required us to re-implement parts of this pubsub client in order to re-create some of the needed features.

Is there any chance you could introduce a parameter letting the user choose whether to use the experimental streaming subscription pulling, or using the API endpoint? It seems like the latter respects the MaxOutstandingMessages and would work very fine in our use case.

Otherwise, if you plan to somehow deprecate the Pull endpoint in favour of the StreamingPull, is there any chances we could implement an option forcing the client to respect the max outstanding messages? Even a hack in the beginning, for example if the client was to Nack directly all messages after MaxOutstandingMessages amount has been received could help us solve our issue.

I hope this all makes sense. We feel like our current implementation re-invents the wheel, and given that you mentioned earlier that you were working on this case, I wanted to share our experimentations and expectations. I hope this is somewhat useful.

jba commented 6 years ago

@rayrutjes, we'll pass your comments along to the PubSub team.

sfriquet commented 6 years ago

@cristiangraz I'm curious why is this issue closed?

johnreutersward commented 6 years ago

@jba Can we reopen this since people are still experiencing problems? I don't want this issue to lose visibility.

cristiangraz commented 6 years ago

@sfriquet @johnreutersward The initial issue I was having regarding excessive duplicate messages has been fixed. There were lots of other unrelated comments on this issue that went quiet, but looks like there are some additional cases (like this one https://github.com/GoogleCloudPlatform/google-cloud-go/issues/778#issuecomment-380049492) related to duplicate messages that I missed. Apologies, reopening.

@jba Will leave this up to you or the team to close this whenever it's ready to be closed.

jba commented 5 years ago

As of afb80096eae340697e1153d7af9a5a418ba75067, we support synchronous mode for Receive. If you want more control over throughput, please try it. (It ensures that you will never pull more than MaxOutstandingMessages from the Pub/Sub service at a time.)

I'm going to close this now. Reopen if you are still experiencing many duplicates and synchronous mode isn't helping.