Closed jtressle closed 1 year ago
Thanks for filing an issue! I will see if someone has time to look through your code to see where your issue might be coming from but in the meantime, there's a relatively recently published blog post tackling this subject here that includes some options for identifying where your duplicates are coming from as well as options for resolving. https://cloud.google.com/blog/products/data-analytics/handling-duplicate-data-in-streaming-pipeline-using-pubsub-dataflow
@meredithslota thanks for the link! I went ahead and implemented a similar deduplication strategy. I used Firestore to monitor duplicate messages.
I forgot to mention my duplicates all have the same messageIDs so it looks like my workers are not extending the ack deadline. This could be because my subprocesses are CPU-intensive, and pubsub isn't unable to send the extension request?
Here are more notes and observations.
My subscription acknowledgement duration is set to 600 seconds.
I just observed a duplicate message being sent out. The original message was acknowledged 659.52 seconds after it was received. Immediately after acknowledgement (24 msec), a duplicate message was resent. Is there a case where a subscriber would send a bad ack signal back to pubsub that would cause pubsub to resend the message?
Thanks,
@meredithslota just following up on this to see if anyone has taken a look. I'm also happy to add logging and/or troubleshoot to see what the issue is. My main concern is that pubsub isn't behaving correctly on a machine under heavy CPU-loads. Specifically, if the lease extension fails to connect to pubsub provider, is there a retry? Or does one failed attempt immediately issue a new message? Thanks
The library will keep modacking the message as long as the max_lease_duration
time isn't up. However, acks/modacks are best-effort and duplicates may be sent for any reason. CPS isn't optimized for such high latency acks, but should work regardless - you should be able to keep modacking until the retention duration of the subscription.
If a duplicate was given to your callback immediately after you acked the message, it's likely that that duplicate was already queued in the client library while the previous message was being processed. Your flow control settings allow just one message at a time, so this is possible.
You can try playing with setting the min_duration_per_lease_extension
and max_duration_per_lease_extension
fairly high, perhaps 10 minutes each, and seeing if that helps. 'min_duration_per_lease_extension` was added in the 2.10.0.
But to mitigate duplicates, you need to either use the brand new exactly-once delivery feature, in preview, or to keep a look-aside DB like you're doing with Firestore.
@pradn thanks for the tips. I'll test out both the min_duration_per_lease_extension and the new exactly-once feature. Will report back after running it for a bit.
@jtressle, have you had a chance to test with the new settings?
@pradn I'm about to start testing on our servers. I'll update this in the next couple of days. Thanks and sorry for the delay.
Ok, let us know how it goes!
Hi @pradn,
I ended up changing the subscriber initialization to:
subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path(project_id, subscription) flow_control = pubsub_v1.types.FlowControl( max_messages=1, min_duration_per_lease_extension = 600, max_duration_per_lease_extension = 600, max_lease_duration = 7200)
I also changed pubsub to only issue exactly one message. I did receive the following error on one of my runs. It happened after an ack(). I'll continue to test to see if it repeats.
Thanks
Error in queue callback worker: 'NoneType' object has no attribute 'set_exception' Traceback (most recent call last): File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/_protocol/helper_threads.py", line 118, in __call__ self._callback(items) File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py", line 159, in dispatch_callback self.ack(ack_requests) File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py", line 190, in ack requests_completed, requests_to_retry = self._manager.send_unary_ack( File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 615, in send_unary_ack requests_completed, requests_to_retry = _process_requests( File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 187, in _process_requests future.set_exception(exc) AttributeError: 'NoneType' object has no attribute 'set_exception' AcknowledgeError when lease-modacking a message. Traceback (most recent call last): File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 920, in _send_lease_modacks req.future.result() File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/futures.py", line 125, in result return super().result(timeout=timeout) File "/usr/lib/python3.8/concurrent/futures/_base.py", line 437, in result return self.__get_result() File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result raise self._exception google.cloud.pubsub_v1.subscriber.exceptions.AcknowledgeError: None AcknowledgeStatus.INVALID_ACK_ID
jtressle@, this is a bug that has been fixed but hasn't been released yet. We can continue when that happens. I'll let you know.
@pradn, what is the correct way in handling this so it doesn't kill the subscriber and instead just issues a warning? My code is above. I'm using PubSub in Kubernetes workers, and this bug kills my workers, and therefore stops my pipeline.
If we can't ignore the error, is this an issue with the exactly-one-message implementation? Or with the min_duration_per_lease_extension? I can roll-back my changes until the fix is released, or until there's a branch I can test.
Thanks for your help.
We released the 2.12.0 version just now. Please try with that version. This will fix the bug in https://github.com/googleapis/python-pubsub/issues/593#issuecomment-1089692480.
Did you enable exactly-once delivery on your subscription?
@pradn, I just started testing now and will update here after the weekend.
I set min_duration_per_lease_extension
and max_duration_per_lease_extension
to 600 and enabled the exactly-once delivery on each of my subscribers. I also disabled my database duplicate check to ensure we can catch the duplicates.
So far, I've had none of the NoneType
issues with version 2.12.0.
Thanks for your help.
@pradn I wanted to report back. Version 2.12.0 reduced the number of NoneType errors, but I did get three exact errors. They occurred at the same place. The frequency has been reduced.
Error:
File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 971, in _send_lease_modacks req.future.result() File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/futures.py", line 125, in result return super().result(timeout=timeout) File "/usr/lib/python3.8/concurrent/futures/_base.py", line 437, in result return self.__get_result() File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result raise self._exception google.cloud.pubsub_v1.subscriber.exceptions.AcknowledgeError: None AcknowledgeStatus.INVALID_ACK_ID
My python packages are:
google-api-core 2.7.1 google-api-python-client 2.43.0 google-auth 2.6.3 google-auth-httplib2 0.1.0 google-cloud-appengine-logging 1.1.1 google-cloud-audit-log 0.2.0 google-cloud-core 2.2.3 google-cloud-firestore 2.4.0 google-cloud-logging 3.0.0 google-cloud-pubsub 2.12.0 google-cloud-storage 2.2.1 google-crc32c 1.3.0 google-resumable-media 2.3.2 googleapis-common-protos 1.56.0 grpc-google-iam-v1 0.12.4
I'm also still getting the same number of duplicates as I had before turning on the exactly-once delivery. I had one message run 3 times and another message run 5 times. The min and max lease extensions were set to 600, but I did see one message was sent twice 428 seconds apart. Is there anything else I can change?
Also, is there a configuration change, or a earlier pubsub version I can use to isolate the errors above? I have another pub-sub system that has been running for years without this issue, but those tasks are a lot shorter. The pubsub errors unfortunate error out my long-running tasks and also making the Kubernete workers non-responsive because pubsub stops subscribing.
Thanks for your help, Jan-Michael
Hi @pradn,
I wanted to update you regarding this issue. After disabling the exactly once delivery, all the "AcknowledgeStatus.INVALID_ACK_ID" issues have gone away. Is this expected?
Is there a way to have my subscriber immediately ACK the message, and then in a closure block perform my work. Once my work is done, I'll turn on the subscriber to wait for the next message. This will alleviate the issue of running pubsub with other CPU intensive tasks, and also running long-running tasks.
Thanks and much appreciated, Jan-Michael
Hi @jtressle
Please take a look at the sample EOD code here: https://cloud.google.com/pubsub/docs/samples/pubsub-subscriber-exactly-once In particular, note that with EOD, the acking behavior should be changed to:
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
print(f"Received {message}.")
# Use `ack_with_response()` instead of `ack()` to get a future that tracks
# the result of the acknowledge call. When exactly-once delivery is enabled
# on the subscription, the message is guaranteed to not be delivered again
# if the ack future succeeds.
ack_future = message.ack_with_response()
try:
# Block on result of acknowledge call.
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
ack_future.result(timeout=timeout)
print(f"Ack for message {message.message_id} successful.")
except sub_exceptions.AcknowledgeError as e:
print(
f"Ack for message {message.message_id} failed with error: {e.error_code}"
)
I believe this should help you with gracefully handling INVALID_ACK_ID errors.
Some more context around why we want you to use ack_with_response
instead of ack
. When exactly-once delivery is enabled on a subscription, we return more errors to the user for ack/modack. The reason is you need to know whether your ack/modack when through or not. If you see that it succeeded, you are guaranteed that you wont get your message again. If you see that it failed, you need to decide what to do with that message. You may want to retry the ack/modack or do something else. But, in the case of failure, the message may be redelivered. Since the actual impact of an ack/modack failure depends on the impact of a re-delivery, we require you to make that choice. Only you know what to do in this case, per your usage of "exactly-once delivery".
@pradn @acocuzzo thanks for this. I've updated my servers and will run this over the next couple of days. Initial tests have been good. I'll update here afterwards.
UPDATE:
I'm doing further testing and I've received 11 INVALID_ACK_ID
errors in the last 20 minutes. It's happening on my largest instances which has the most intensive CPU load (all threads). The errors are all basically this:
Traceback (most recent call last): File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 971, in _send_lease_modacks req.future.result() File "/usr/local/lib/python3.8/dist-packages/google/cloud/pubsub_v1/subscriber/futures.py", line 125, in result return super().result(timeout=timeout) File "/usr/lib/python3.8/concurrent/futures/_base.py", line 437, in result return self.__get_result() File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result raise self._exception google.cloud.pubsub_v1.subscriber.exceptions.AcknowledgeError: None AcknowledgeStatus.INVALID_ACK_ID
ORIGINAL:
@pradn @acocuzzo I tested the ack_with_response
and the workers were able to recover from a few INVALID_ACK_ID
errors I received.
Many of the errors happen while I'm processing data, which returned back errors to our users. The reason for this is processing of the data is self-contained in the callback function. For context, I have another app that has been using Pub-Sub for years without issue in a GKE environment. The Pub-Sub failure rate is basically 0% as I have the exactly once delivery feature disabled. So I haven't had to add communication between Pub-Sub and my other processes.
I have a few questions:
You stated that in the case of failure, the message may be redelivered? What dictates whether a message retries or not? Is it only if that message is dead-lettered or not?
Is there a way to not stop the callback when we receive these errors? If we can't ignore these errors, the best solution might be to have the subscriber immediately return an ACK, then have it go off-line until processing has completed. The subscriber can go back online after processing has finished.
Thanks
@pradn @acocuzzo I wanted to provide more insight based on my testing. When running on a compute instance, I see no INVALID_ACK_ID
errors. However, when I run these same workers on GKE, I see INVALID_ACK_ID
issues every minute or so.
I've updated my subscriber to ignore the invalid errors, but I'm getting multiple retries and multiple runs of the same message.
Next steps are to verify GKE resources are configured correctly. If they are, I'll go back to my original configuration and work on starting/stopping Pubsub.
Thanks,
In the update in this comment are you saying that your program crashes when this error occurs? If you look in the code, we call future.result() and then catch the resulting exception (an AcknowledgeError). When caught, the exception just logs the error. So if your program is crashing / seeing an unexpected exception bubble up, it wasn't the intention and should be investigated.
You stated that in the case of failure, the message may be redelivered? What dictates whether a message retries or not? Is it only if that message is dead-lettered or not?
Cloud Pub/Sub by default always delivers messages until they're acked. An ack is best-effort, so the system might lose the ack. So then you get the message again. Moreover, if a message isn't acked within the ack deadline, an ack expiration and re-delivery occur. Note your ack might race with the ack expiration time. Ack status for a message is stored in memory and periodically synced to disk on the server-side, so our server restarts may lose message ack statuses. So, there's several reasons for re-delivery.
When a DLQ is enabled, we keep failure counts for when ack expirations occur and explicit nacks are sent for a message. If the failure count goes above a threshold, we move to to the DLQ topic. IF a DLQ is not enabled, we keep re-delivering messages until the message expires (based on the subscription settings, defaulting to 7 days).
Is there a way to not stop the callback when we receive these errors? If we can't ignore these errors, the best solution might be to have the subscriber immediately return an ACK, then have it go off-line until processing has completed. The subscriber can go back online after processing has finished.
It's up to you what you do with the exceptions returned by ack_with_response
. You may choose to catch them and keep the processing going. The exception will only occur when you call the result
function.
I wanted to provide more insight based on my testing. When running on a compute instance, I see no INVALID_ACK_ID errors. However, when I run these same workers on GKE, I see INVALID_ACK_ID issues every minute or so.
It's possible there's environmental differences that account for that - maybe network latency. But you need to be able to deal with them occurring, in any environment.
I've updated my subscriber to ignore the invalid errors, but I'm getting multiple retries and multiple runs of the same message.
This is expected. The "exactly-once" guarantee is that if you successfully ack the message, it wont come back. These invalid ack errors indicate an unsuccessful ack.
Next steps are to verify GKE resources are configured correctly. If they are, I'll go back to my original configuration and work on starting/stopping Pubsub.
Ok, sounds good. Keep us posted.
@pradn please see below:
In the update in https://github.com/googleapis/python-pubsub/issues/593#issuecomment-1102826417 are you saying that your program crashes when this error occurs? If you look in the code, we call future.result() and then catch the resulting exception (an AcknowledgeError). When caught, the exception just logs the error. So if your program is crashing / seeing an unexpected exception bubble up, it wasn't the intention and should be investigated.
What happens when I get the AcknowledgeError
is that the callback completes, causing my long-running processes to end prematurely. Below is the code I'm using:
`def worker( project_id, subscription):
#create subscriber
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription)
flow_control = pubsub_v1.types.FlowControl(
max_messages=1,
max_duration_per_lease_extension = 60,
max_lease_duration = 7200)
def callback(message):
print ('Received message {} of message ID {}'.format(message, message.message_id))
cmd = '......'
run_command('Long Task', cmd)
#ack message
ack_future = message.ack_with_response()
try:
# Block on result of acknowledge call.
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
ack_future.result()
print(f"Ack for message {message.message_id} successful.")
except sub_exceptions.AcknowledgeError as e:
print(f"Ack for message {message.message_id} failed with error: {e.error_code}")
future = subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control)
print('Listening for messages on {}..\n'.format(subscription_path))
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
future.result()
except:
print ('Exiting subscriber')
future.cancel()`
I updated my code to what was specified just above. Is this the correct way of handling the warning without stopping the callback?
Note your ack might race with the ack expiration time. Ack status for a message is stored in memory and periodically synced to disk on the server-side, so our server restarts may lose message ack statuses. So, there's several reasons for re-delivery.
My subscriber is set as:
subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path(project_id, subscription) flow_control = pubsub_v1.types.FlowControl( max_messages=1, min_duration_per_lease_extension = 600, max_duration_per_lease_extension = 600, max_lease_duration = 7200)
My understanding is I should never get a failure or warning anytime before 600 seconds and should hold the lease until 7200. However, I'm getting duplicates (when DLQ is not enabled) and AcknowledgeError warnings way before 7200 seconds and before the minimum time of 600 seconds. How can I configure PubSub to adhere to these times?
It's up to you what you do with the exceptions returned by ack_with_response. You may choose to catch them and keep the processing going. The exception will only occur when you call the result function.
Can you please provide example code on how to do this? The code I presented above ends the callback when I get the AcknowledgeError.
It's possible there's environmental differences that account for that - maybe network latency. But you need to be able to deal with them occurring, in any environment.
How should one go about handling the network latency? If my lease extensions are 600 seconds, shouldn't this be sufficient for PubSub to retry connections?
Thanks
I updated my code to what was specified just above. Is this the correct way of handling the warning without stopping the callback?
Yes, you should be calling ack_future.result()
in the try block. Your code looks as expected.
My understanding is I should never get a failure or warning anytime before 600 seconds and should hold the lease until 7200.
It's possible to have a problem with the background modacking process. In your case, the background leaser would send a modack every 600 seconds up to 7200 seconds. If there's a modack error, we log the error. So when your code finishes working and calls ack_with_response
for that message, that ack might have expired. There's not really a way around this since acks/modacks may actually fail when exactly-once is enabled. But at least you do know about this and can re-try the work when the message is re-delivered. I realize this weakens the appearance of the "exactly-once" property, but that's the best we can do given that failures are a fact in distributed systems.
Can you please provide example code on how to do this? The code I presented above ends the callback when I get the AcknowledgeError.
There isn't any code after the except
catch block, so the callback should end, yes? Are you saying the program itself stops or the scheduler/thread-pool stops calling the callback for subsequent messages?
How should one go about handling the network latency? If my lease extensions are 600 seconds, shouldn't this be sufficient for PubSub to retry connections?
I think setting a long lease timeout is the best you can do. I can't think of other mitigations.
Hi,
Is there any updates about this issue? I'm facing the same problem. A large amount of messages are redelivered due to the INVALID_ACK_ID
error. I've also enabled the "exactly-once" feature and using the 2.12.0
version.
This is problematic because we have processes that can take hours. We don't want to receive the same message multiple times and waste time on recomputing things.
Is there any workaround to force acknowledging the message?
Thank you :pray:
@ramirek I never got it to work properly. What I'm currently doing is turning off "exactly-once", living with the duplicates and handling them by logging messages in a database.
I think the best solution would be to have pubsub immediately return an ACK, and then take that worker offline until the process has completed.
Hi all,
I think this may be an issue we're also dealing with. We take messages and run computations on batches of them before acking them. The batch calculation can be cpu-heavy and in the exact example I've been testing with takes about 9 minutes. We're getting tons of redeliveries.
The exact test I'm running is using 127 clients with a subscription that has a backlog of 1M messages. The clients process message at about 3 messages per second and run for an hour.
With this setup, I'm consistently seeing acks get lost (the workers record the message_id
of messages before calling msg.ack()
. I compare these message_ids with the messages remaining in the queue) 50%+.
This is adding a ton of required compute to get through a subscription and even when the clients run until they can't pull any more messages, messages get leftover anyways.
Is this issue well-understood at all? Unfortunately we really can't use PubSub with our application like this. Happy to give more debugging details, I've turned on the PubSub client libraries logging along with timestamps so there's quite a lot of info to sift through. The modacks seems to be going out at the right time and there's no info reported about the AckRequests so I'm really not sure where the issue might lie.
Hi all,
We also have the same issue, consuming one message at a time with a long running task in GKE as @jtressle. We also observed that this error got raised right after we successfully ack one previous message and before we consume the next one, as the screenshot shows. Shouldn't we stop triggering the callback from the dispatcher if we already know this ack/modack failed at beginning? @pradn
Hi all,
We also have the same issue, consuming one message at a time with a long running task in GKE as @jtressle. We also observed that this error got raised right after we successfully ack one previous message and before we consume the next one, as the screenshot shows. Shouldn't we stop triggering the callback from the dispatcher if we already know this ack/modack failed at beginning? @pradn
Is possible that the background lease management actually load more messages than we request for?
@pradn any update on this issue by chance?
@ramirek and @hjtran Invalid ack id errors are due to acking a message that is past expiration and has already been redelivered. These errors are passed to the user with exactly once delivery because in exactly once delivering, acking is transactional. Therefore they are not being redelivered due to the INVALID_ACK_ID error, but due to expiration.
Depending on what your flow control settings are, and the health of your tasks, you may have more messages being delivered than you can process in the time to ack. The library should automatically lease the messages until they can be processed and acked by the users. However, if CPU intensive work is starving the client, it may not be able to send these modacks, and therefore the messages pass their ack expiration. There is also a max_lease_duration in Flow Control for your messages, defaults to 1 hr. This lease begins when the message is delivered to the client, not when the callback begins, so depending on how large your flow control backlog is, it may expire sooner than you expect. Adjusting your flow control settings to lower the number of messages that are being delivered to you may help reduce duplicates.
As for @YungChunLu, with flow control set to 1 message, the lease will hold 1 message, so depending on how long your "long runnuing task" is, there still could be an expiration happening. How long does the processing take per message, and what are your flow control settings?
@hjtran What are your flow control settings? and can you please clarify what you mean by "acks get lost" and "messages get leftover"?
@YungChunLu @hjtran @ramirek Most customers with CPU intensive tasks/long running tasks who see duplicates, have seen more luck with adjusting their flow control settings than with enabling exactly once delivery.
@acocuzzo My flow control settings are set to allow three times the batch size of messages we process at one time (in this case, we process 1536 messages in a batch, so max_messages=4608
). The rest of the FlowControl settings are left as default. The amount of time it takes to process a batch of messages is ~8-9 minutes, so three batches should complete well before the 60m max lease time I think.
Our workers log the message ID of messages right before they ack them. By "acks get lost", I mean the messages whose IDs we've seen in our worker's logs (e.g. messages that we've acked) and yet are still are in the subscription, suggesting that the ack never arrived.
By "messages getting left over", I mean that even when we scale the number of workers/clients really high to just push through the high redelivery rate, in the end when the workers/clients finish because they're not receiving any more messages, there are actually still messages still left in the queue (presumably because some acks for the final messages aren't being received by the PubSub server but are still leased out).
We've tried a couple of strategies to reduce CPU intensity including reserving half the number of CPUs to "idle" and adding a pause to the CPU intensive subprocesses but haven't had much luck with those strategies. FWIW,I turned on the logging of the PubSub python client libraries and I noticed that modacks were actually sent on time even during CPU intensive tasks.
I can try messing with the FlowControl
settings a little more. Thanks for the help @acocuzzo !
As for @YungChunLu, with flow control set to 1 message, the lease will hold 1 message, so depending on how long your "long runnuing task" is, there still could be an expiration happening. How long does the processing take per message, and what are your flow control settings?
Hi @acocuzzo , we only modify following flow control setting parameters, others are set default
What we are experiencing is that the duplicated messages just come even we certainly have not reached lease duration yet. Also, as shown in the screenshot, the extension_request often have failures.
We have discovered a possible internal mitigation for this issue. @ramirek, and anyone else that is experiencing this, please open an internal Customer support ticket, or share an email where I can discuss this with you off-thread. If you open a customer ticket, please indicate that this is a python client library related issue so that I see it. Thanks for your patience and we are hoping this mitigation can be helpful.
Hi @acocuzzo can you please email me at jmt@trnio.com? I've mitigated this issue by checking for duplicate messages, but it'll be nice to see if your resolution works.
Thanks
Hi @jtressle, @YungChunLu , @hjtran , @ramirek Can you please kindly try to update the latest library version? We had a recent fix which could mitigate your issues: https://github.com/googleapis/python-pubsub/releases/tag/v2.13.4
In particular, do you see redeliveries after approx 35 min?
Hi @acocuzzo, I ran the latest version in our pipeline overnight. We had 12 occurrences of duplicate messages being sent, out of a possible 200 or so. Since we are only consuming 1 message at a time, I don't think the fix v2.13.4 will address our issue? Most of our duplicate messages do occur with our longer processes (> 30 minutes). My lease time is set to 7200.
Thanks,
@jtressle Yes, I agree that v2.13.4 is unlikely to fix your issue. There should be a new release very shortly, v2.13.6, that may fix issues with message expirations and redeliveries at the beginning of the process. I think it would be worth updating to rule it out, but unlikely to fix redeliveries at the 35 min mark. Thanks for the update.
@acocuzzo Our team has upgraded the version to be v2.13.6 this week, but we do not see any ack/modack failure rates decrease. We also have forwarded more detail through the support ticket.
@acocuzzo 2.13.6 has not improved the duplicates issue or the ACK retries. Two questions:
(1) is it possible for your team to reproduce the error by running a long-running CPU intensive task on a Google Instance? Can something like this be added to the release tests?
(2) Is there a recommended way to turn on/off a pub-sub worker? In our case, we don't need all the leasing logic that is causing this issue. I rather receive a message, immediately ACK that message, turn OFF Pub-sub on the worker to prevent pulling any more messages, have the worker run the long-running task, and then turn ON Pub-sub for the next message.
Thanks.
Hi @jtressle
Thanks for checking on the new version.
(1) I am currently working on a reproduction with processes of 1-2 hours, it would be helpful to know the specific memory/CPU usage for a proper reproduction. I'm not sure we could add this to the release tests but I can investigate what our options are.
(2) If you don't want the leasing logic, you can try using SynchronousPull instead of StreamingPull aka AsynchronousPull, please see the docs for reference: https://cloud.google.com/pubsub/docs/pull#synchronous_pull
One thing I would caution for SynchronousPull is that it is necessary to "overpull" meaning you should send many more pull requests than you might expect, as they will often return an empty response or fewer messages than requested rather than waiting for all of the messages requested. This is done to lower delivery latency. For example: https://medium.com/javarevisited/gcp-how-to-achieve-high-performance-synchronous-pull-with-pub-sub-and-spring-boot-12cb220c4d65
Hi @acocuzzo,
This is excellent news! I've noticed it on both our servers that run longer processes. The servers in question are c2-standard-4 and c2-standard-30. The larger one runs a longer process, and has more frequent duplicates. For both servers, the cpu usage is basically 100% on all cores. One thing to note is that both servers are running in a GKE docker configuration, and I've set aside about 10% of the CPU for GKE related tasks.
Thanks for the pull documentation. This might be a good alternative. I'll test implementing it and will report back.
Thanks.
@jtressle @YungChunLu @hjtran @ramirek Through my 2hr+ testing I discovered a bug related to flow control buffering, which I've added logic to account for. If you are using exactly once delivery, please update to google-cloud-pubsub v2.13.7, it should reduce any INVALID_ACK_ID error logs to once per redelivered message, and prevent duplicates from being delivered to the user's specified callback. Please let me know if this fix helps!
@acocuzzo That's exciting news. Unfortunately my application does not use eod. We tried some tests with it yesterday but the throughput of messages sent to subscribers was too little for our application and relative to non-eod subscription.
@hjtran Unfortunately without eod the INVALID_ACK_ID errors are not surfaced for us to check. I am looking in the underlying reasons for this issue with flow control (which are either server side or in the grpc layer), and hopefully I'll be able to get some answers there.
Thanks @acocuzzo! I appreciate the help. FWIW, I've also been talking with our GCP account manager about this for a few months in case you want to be linked to that, we've gone back and forth trying to debug this for a while. If that's helpful, I can include you on the email chain.
Hi, Facing the same problem since 2 months, any news ? Thanks, Gautier
Hi,
I'm having an issue where I have a long process (up to 80 minutes) running on a Kubernetes docker instance. The instance is running Ubuntu 20.04, and I'm using Python 3.8.10. The docker container runs a python worker script, which runs a subprocess. The subprocess is multi-threaded and can use all threads during some CPU intensive tasks.
I'm getting a lot of duplicates (about 5 to 10 duplicates). This is repeatable and probably due to the intense CPU usage. What is the correct way to handle this? Thanks in advance.
My pip versions are:
My worker code is similar to this:
Thanks in advance,