googleapis / google-cloud-go

Google Cloud Client Libraries for Go.
https://cloud.google.com/go/docs/reference
Apache License 2.0
3.72k stars 1.27k forks source link

pubsub: Old messages fail to ack because "expired" #1485

Closed jameshartig closed 4 years ago

jameshartig commented 5 years ago

This is related to https://github.com/googleapis/google-cloud-go/issues/1247 but I'm filing a new issue since I have more data now and different questions. We've been running our "high throughput" queues with Synchronous=false but it's not clear what the disadvantage is of always setting it to true.

We don't usually see any failed acks (with true or false) but this time in particular we were catching up on a subscription that was >9 million and >2 hours behind. These "expired" are making it almost impossible to catch up since at some points over half of the messages we ack are failing and being retried.

I'm not sure if there's an issue where we're receiving already expired messages or if the client is holding onto messages too long (since we have Synchronous=false). Though I raised the Ack Deadline in the Google Console to 60 seconds and I didn't see any change (19:30 the change was made relative to the graphs below) so I'm inclined to think something is wrong here. I'm also not sure if we should just be using Synchronous=true instead?

Client

PubSub (aef6eeb)

Describe Your Environment

CentOS 7 on GCE (specifically in us-east1) 2 workers in each region with

MaxOutstandingMessages = 2000
Timeout = 15*time.Second
NumConsumers = 2000
NumGoroutines = 20 (10 x CPUs)
Synchronous = false

Expected Behavior

Acks succeed and we're not losing half of our ack's to "expired" errors.

Actual Behavior

We're seeing thousands of messages failing ack with the error "expired": image

In https://github.com/googleapis/google-cloud-go/issues/1247#issuecomment-453314286 you mentioned that if Synchronous=false then the client fetches more than the MaxOutstandingMessages. During that same time, looking at the pubsub_pull_count OpenCensus metric compared to our internal count of acks shows that we're acking all of the pulled messages: image

Does the pubsub_pull_count not include the count "extra" messages that are pulled? If not, how can we determine that and graph it to aide with debugging.

This subscription in particular uses more CPU if a job is duplicated so more duplications cause the CPU to spike and for jobs to take longer to ack. The 95th percentile of time it takes to ack is < 5 seconds so I would imagine that even if the client fetched 2x MaxOutstandingMessages we could still ack all of them before the deadline (even with no ModAcks). image

jameshartig commented 5 years ago

Here's a graph from the last hour of the success (yellow) vs failed (green): image

It was going pretty well there for a while until 20:30 (UTC) where expired started happening a lot. We haven't made any changes on our side during this time period and the time to ack has stayed fairly constant: image

And to clarify, for the last hour we were only running 1 instance of the application for this subscription.

jameshartig commented 5 years ago

I decided to set Synchronous=true just to see how it would perform since we had nothing to lose. I started up the second worker and set it to true on both workers, hoping it could work through the now almost 3 hour backlog. The change went live just after 21:30 and it looks like that has eliminated all of the "expired" messages but we took a 20% hit on the rate of messages. The net "success" number is ultimately way higher with no failed acks. image

What I'm still confused about is how I can debug what was happening with Synchronous=false and how messages with a 60 second ack deadline were considered expired. Since the 95th percentile is <5 seconds to ack, we should be able to still handle the client fetching 6 x MaxOutstandingMessages, so either the client was fetching more than that or something else was going on.

jameshartig commented 5 years ago

Any update?

hongalex commented 5 years ago

Apologies, this slipped under my radar for a while. Thanks a lot for the detailed info between this and the older issue you referenced. I'll try to reproduce this and determine if pubsub_pull_count is inaccurate.

Just to be sure, have you seen this issue occur with versions of the client library between when you opened #1247 and now? I'm wondering if any of this might be related to changes we've made since then.

jameshartig commented 5 years ago

Just to be sure, have you seen this issue occur with versions of the client library between when you opened #1247 and now?

Yeah, I was reproducing using aef6eeb4a44b3d7557fe3c902e58cc2d51652a4e. As I mentioned I'm not sure yet if its a client or server issue. It seems to only happen when the backlog is filled with tons of events.

I managed to reproduce it with https://gist.github.com/fastest963/95c1148db456190fc61034accf3e85d1

I originally ran it with NumGoroutines = 1 and 100% of the were success but then once I raised it to NumGoroutines = 10 the expired count instantly rose. But also the messages had been sitting there for ~30 minutes by that point so I'm not sure if its because of the NumGoroutines or the length of time they've sat there unacknowledged. image (The above is pubsub.googleapis.com/subscription/pull_ack_message_operation_count grouped by response_code in stackdriver where green is success and blue is expired)

Here was the output when I started it with NumGoroutines = 10:

~ INFO -- starting receive
~ INFO -- last minute -- acked="36703" published="0" pullCount="38114"
~ INFO -- last minute -- acked="37558" published="0" pullCount="37666"
~ INFO -- last minute -- acked="37514" published="0" pullCount="37388"
~ INFO -- last minute -- acked="37664" published="0" pullCount="37806"
~ INFO -- last minute -- acked="37426" published="0" pullCount="37465"
~ INFO -- last minute -- acked="37485" published="0" pullCount="37455"
~ INFO -- last minute -- acked="37307" published="0" pullCount="37302"
~ INFO -- last minute -- acked="37541" published="0" pullCount="37462"
~ INFO -- last minute -- acked="37361" published="0" pullCount="36813"
~ INFO -- last minute -- acked="37461" published="0" pullCount="37994"
~ INFO -- last minute -- acked="37483" published="0" pullCount="37625"

Since the pullCount is using opencensus and I'm keeping track of acked separately I'm not sure how to relate those but there were some minutes where pullCount was less than acked so I'm still under the impression that it's not accurate since I would expect it to always be higher. I might be able to modify the script to make acked use opencensus as well but I'm not that familiar with opencensus.

I'll keep playing around with the script above and see if I can narrow down a better idea of what exactly is causing it.

Update: After running it a few more times. I'm leaning more towards it being caused by the age of the messages on the server. I ran the test with NumGoroutines = 10 and none of the ack's were erroring so instead I waited ~10 minutes and restarted the test script with ./test --num-publish 0 so it would skip the publishing step and then I started noticing a bunch of ack's failing with expired.

jameshartig commented 5 years ago

Any update on this? It seems like the issue is caused by jobs that have been sitting around on the server for a while (>10 minutes) so I'm not sure how else I can help, but let me know.

hongalex commented 5 years ago

Hi, thanks very much for the detailed report.

I believe that PullCount from the opencensus metrics does not currently account for errors, as indicated here. I'm currently working on improving the opencensus metrics recording to better handle error metrics which might help with debugging this issue. In short, we aren't recording opencensus metrics beyond simple count, where the plan is to tag individual metrics with success/error values. I'm starting this change with the topic publish metrics, but will be updating the subscription metrics to reflect this as well.

This doesn't address your issue right away, but hopefully the changes I make will help shed light on what's going on here. More information to follow soon.

jameshartig commented 5 years ago

Thanks for the update @hongalex. I'll keep an eye out and re-run my script posted earlier once the changes are out. If there are specific metrics you want me to focus on or record let me know.

marrrcin commented 5 years ago

This issue is similar to https://github.com/googleapis/google-cloud-go/issues/1584, the common part being: 1. huge backlog of unprocessed messages, 2. more expired ACKs than valid ACKs. Configuration seems similar too.

jameshartig commented 5 years ago

@hongalex any update on this? I updated the title to reflect what the underlying issue is. Although I'm not sure how else to help without better metrics to understand if its a server problem or a client problem.

If I run the gist I posted earlier on the latest master:

$ ./pubsubtest
~ INFO -- starting publishing
~ INFO -- last minute -- acked="0" published="544766" pullCount="0"
~ INFO -- starting receive
~ INFO -- last minute -- acked="6914" published="455234" pullCount="8375"
~ INFO -- last minute -- acked="37496" published="0" pullCount="37570"

and let that run for 2-3 minutes then wait >10 minutes before running (I waited 30 but that's just because I had other stuff I was doing in the meantime):

$ ./pubsubtest --num-publish 0
~ INFO -- starting receive
~ INFO -- last minute -- acked="36631" published="0" pullCount="38025"
~ INFO -- last minute -- acked="37568" published="0" pullCount="37590"
~ INFO -- last minute -- acked="37509" published="0" pullCount="37594"
~ INFO -- last minute -- acked="37668" published="0" pullCount="37623"
~ INFO -- last minute -- acked="37423" published="0" pullCount="37450"
~ INFO -- last minute -- acked="37488" published="0" pullCount="37343"
~ INFO -- last minute -- acked="37318" published="0" pullCount="36808"
~ INFO -- last minute -- acked="37527" published="0" pullCount="38068"
~ INFO -- last minute -- acked="37367" published="0" pullCount="36918"
~ INFO -- last minute -- acked="37454" published="0" pullCount="38109"
~ INFO -- last minute -- acked="37500" published="0" pullCount="37294"
~ INFO -- last minute -- acked="37537" published="0" pullCount="36917"
~ INFO -- last minute -- acked="37368" published="0" pullCount="37520"
~ INFO -- last minute -- acked="37353" published="0" pullCount="37873"
~ INFO -- last minute -- acked="37589" published="0" pullCount="37769"
~ INFO -- last minute -- acked="37394" published="0" pullCount="37317"
~ INFO -- last minute -- acked="37467" published="0" pullCount="37499"
~ INFO -- last minute -- acked="37470" published="0" pullCount="37433"
~ INFO -- last minute -- acked="37773" published="0" pullCount="37664"
~ INFO -- last minute -- acked="37543" published="0" pullCount="37676"

You can see from the stackdriver graphs that many of the acks failed because expired. (Keep in mind that stackdriver is a few minutes behind):

image

I still see the problem where a majority of the messages fail to be acked because expired. You can actually see where the expired (green) end up being ~80% of the acks. This makes it extremely difficult to catch up if only 20% of the acks are actually successful. It definitely seems to be an issue with the length of time that the messages have been in the backlog. If I don't pause the receiving for a few minutes the problem is not as pronounced.

hongalex commented 5 years ago

Apologies for the delay! Seems like this issue (with expired acks) has been brought up a number of times, but it mostly chalks up to our documentation needing work. As mentioned in this comment, with Synchronous=false, MaxOutstandingMessages is not guaranteed to actually be the max that the library pulls in.

For more context, Synchronous=true and Synchronous=false sets the kind of pull method we use: Pull (synchronous pull), and StreamingPull (asynchronous pull) respectively. In the former, we can specify the max number of messages to pull with every single RPC. With the latter, we can only stop receiving messages after MaxOutstandingMessages or MaxOutstandingBytes has been exceeded. A large batch of published messages will cause a large backlog of messages, and inevitably some messages will expire.

So essentially, if you expect to need to process a large number of messages at once (that exceeds your normal worker count), setting Synchronous = true will prevent acks from expiring.

Hopefully that helps. I'm also still in the process of updating some our OpenCensus metrics for subscribes, but that's unrelated to your original issue now.

Long story short: large message backlogs are better processed with Synchronous=true, unless you scale up (either with number of goroutines or machines entirely) to process a majority of the backlog < 10 minutes.

jameshartig commented 5 years ago

@hongalex thanks for the refresher. What's the best solution to handle the case where you unexpectedly have a huge backlog and then can't catch up because only 20% of the acks are successful? Is that a sign that MaxOutstandingMessages is too high? Or instead should we always use Synchronous=true mode? Does that mode scale as high (messages per second) as the streaming mode?

If we wanted to debug this further to ensure that it's not a bug is there a metric (or can one be added) that indicates the current outstanding bytes?

Also, does the client not do modack's for messages it has received via StreamingPull and is that why they're expiring? Is there a way for the client to know that it's already expired before even calling the callback?

Thanks again for helping out. I feel like there are some things that the client could do to help out this case, maybe MaxOutstandingBytes could be used in addition to something monitoring if the Consumers are all blocked to stop the stream. Or maybe we should just be using the Synchronous=true mode all of the time.

hongalex commented 5 years ago

What's the best solution to handle the case where you unexpectedly have a huge backlog and then can't catch up because only 20% of the acks are successful? Is that a sign that MaxOutstandingMessages is too high?

The best solution could be to temporarily switch to synchronous mode until you can get through the backlog of messages. MaxOutstandingMessages likely is not causing you problems. The large first batch of messages published will likely exceed MaxOutstandingMessages, which will still result in messages expiring. If you are consistently seeing messages expire in 99% of the time, that means you are processing messages too slowly and in that case MaxOutstandingMessages should be lowered.

Or instead should we always use Synchronous=true mode? Does that mode scale as high (messages per second) as the streaming mode?

This could work, but you will need to understand the tradeoffs of Synchronous = false. In StreamingPull mode, we open a bidirectional gRPC stream, and we receive messages and publish modacks along the stream. In synchronous mode, we rely on the Pull RPC (mentioned above). This creates more overhead on the system (since it needs to establish these connections per RPC). I believe this is somewhat mitigated by multiplexing in gRPC/HTTP2, but still is not as efficient as keeping open long-running streams. In addition, the latency is higher with synchronous mode, for the same reason. In short, if your use case isn't dependent on super-low latency and isn't network-bound, synchronous mode will be a great fit.

If we wanted to debug this further to ensure that it's not a bug is there a metric (or can one be added) that indicates the current outstanding bytes?

Unfortunately, we don't currently track outstanding bytes right now (or bytes in general). I'll look into this to see if I can add this to our metrics once I revamp that a bit. In the meantime, there's ResponseCount which you can use to estimate how many messages have been pulled in StreamingPull mode. https://github.com/googleapis/google-cloud-go/blob/4a6c543f3234977f3e2f5bc8350e11c9ed75c59b/pubsub/trace.go#L93-L95

Also, does the client not do modack's for messages it has received via StreamingPull and is that why they're expiring?

The client does indeed call modack messages in StreamingPull, but we can only extend the deadline for a period of time (10 minutes). After that, messages will expire and be re-sent by Pub/Sub. If messages continue to come in on the topic, this causes sort of a snowball effect where it's hard for subscribers to catch up if messages without additional scaling.

Is there a way for the client to know that it's already expired before even calling the callback?

The client is currently unaware of the incoming timestamp of when the message was pulled, since we only track when the message was originally published. This is theoretically possible, and I'll be sure to run this by the team to see if we can easily implement this.

I feel like there are some things that the client could do to help out this case, maybe MaxOutstandingBytes could be used in addition to something monitoring if the Consumers are all blocked to stop the stream.

Definitely. I'm investigating ways we can make this better for intuitive for users, first by updating the docs. Keep in mind, expired messages are not exactly an error per se, since they will eventually be redelivered. We are looking into a solution for these large backlog cases, where it could actually be better for messages to be expired earlier (so that they don't take up space in memory), and so they can be processed at a later time.

Lastly, depending on how important your backlog is, if your only goal is to avoid a backlog, you can discard messages in bulk without acking them using Seek. This might not be what you need, but just wanted to put that option out there.

Sorry for the long-winded response, and hopefully this helps! Feel free to comment more if anything I mentioned needs more clarification.

jameshartig commented 5 years ago

@hongalex Thanks! No worries about the long response. I appreciate you responding with the detail you have.

The best solution could be to temporarily switch to synchronous mode until you can get through the backlog of messages.

This makes sense but I'm not sure how we'd do this programmatically (detect when the backlog is large enough and we're failing to ack enough) and without human-intervention and manually deploying different code.

In the meantime, there's ResponseCount which you can use to estimate how many messages have been pulled in StreamingPull mode.

I did a quick test:

~ INFO -- last minute -- acked="36797" published="0" pullCount="38273" responseCount="494"
~ INFO -- last minute -- acked="37549" published="0" pullCount="37709" responseCount="449"
~ INFO -- last minute -- acked="37536" published="0" pullCount="37356" responseCount="443"
~ INFO -- last minute -- acked="37644" published="0" pullCount="37730" responseCount="458"
~ INFO -- last minute -- acked="37429" published="0" pullCount="37357" responseCount="447"

Is the number of messages in each streaming-pull response somewhat consistent? Actually PullCount seems like it should be what I'm looking for, or is it missing some amount of messages? https://github.com/googleapis/google-cloud-go/blob/4a6c543f3234977f3e2f5bc8350e11c9ed75c59b/pubsub/trace.go#L63

If that's the case, then like I'm acking messages as fast as I'm receiving them over the stream?

Keep in mind, expired messages are not exactly an error per se, since they will eventually be redelivered.

Yeah it's just hard to catch up with the backlog and it ends up making our application do way more work than necessary if we're only actually acking a small percent of messages and the rest of them we did the work but it was for nothing since we have to do it again in a few seconds.

hongalex commented 4 years ago

I'm not sure how we'd do this programmatically (detect when the backlog is large enough and we're failing to ack enough) and without human-intervention and manually deploying different code.

This is fairly tricky. The best way would probably be to use the metrics we have in Stackdriver such as subscription/num_undelivered_messages or subscription/oldest_unacked_message_age. After determining there's a large backlog, simply cancelling the original Receive and restarting it should work (although some messages will expire in that case). I think it might be simpler to just use always Synchronous=true, and offload the work to a pool of workers such as that each Receive callback is very short. Keep in mind in synchronous mode, NumGoroutines is ignored, aka set to 1.

Is the number of messages in each streaming-pull response somewhat consistent?

Yes, though I think it depends on the size of the messages you are pulling in. So if that's consistent, I would expect PullCount to be fairly consistent as well.

Actually PullCount seems like it should be what I'm looking for, or is it missing some amount of messages?

Yeah that works. PullCount should be the total amount of messages you're pulling in.

If that's the case, then like I'm acking messages as fast as I'm receiving them over the stream?

It seems like it, but from the data that you gave, it seems like you have about 1470 messages you haven't acked (sum pullCount - sum acked). It looks like it's only for 5 minutes though, so maybe the sample size is too small.

jameshartig commented 4 years ago

I think it might be simpler to just use always Synchronous=true

Yeah I think that's what we'll end up going with and trying.

It seems like it, but from the data that you gave, it seems like you have about 1470 messages you haven't acked (sum pullCount - sum acked). It looks like it's only for 5 minutes though, so maybe the sample size is too small.

Yeah they might not match up exactly, but when 80% of the ack's are returning an error because expired I'd expect to be pulling a significant (almost 10x) more messages than I'm acking. Since I'm pulling as fast as I'm acking then it's not clear to me how it's getting so behind and starting to ack old messages.

hongalex commented 4 years ago

So I forgot to mention that we have a section on Streaming Pull in our docs about your original issue: https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages. Seeing as a lot of other people have raised similar issues, there might be a discoverability issue. Were you aware that this existed / have seen this page before? If not, we'll look into improving this to make it more clear/discoverable.

We're also aware that the current implementation of Streaming Pull is a bit lacking in this regard. We are looking at a proper solution for this, but cannot share a specific timeline.