Azure / azure-service-bus

☁️ Azure Service Bus service issue tracking and samples
https://azure.microsoft.com/services/service-bus
MIT License
580 stars 775 forks source link

Unable to Renew lock #692

Closed Divyasri1510 closed 5 months ago

Divyasri1510 commented 5 months ago

Description

I am using Azure Service Bus queue and currently the maximum lock duration set by us is 5minutes which I understand is the maximum lock duration. But our process might take longer than 15 or 30minutes so we need to keep lock on the message active. After receiving the message, I tried to simulate the same situation by making the process sleep for 299 seconds (as 300seconds that is 5minutes is max lock duration time) and I am trying to renew lock after that. Unfortunately the lock is not getting renewed and locked_until_utc is same for the message before and after renewing. I am attaching code snippet and the output. Please help in this regard.

from azure.servicebus import ServiceBusClient, AutoLockRenewer
from azure.servicebus import ServiceBusReceivedMessage,ServiceBusSession
import time
try:
    renewer = AutoLockRenewer()
    with servicebus_receiver:
        for message in servicebus_receiver:
            print("Received message:", message)
            print("Lock Details Before Processing:")
            print(f"Locked Until: {message.locked_until_utc}")
            time.sleep(299)
            renewer.register(servicebus_receiver, message, max_lock_renewal_duration=60)
            print("Lock Details After Processing:")
            print(f"Locked Until: {message.locked_until_utc}")
            servicebus_receiver.complete_message(message)
except Exception as e:
    print(e)

The output of the above code is:

Received message: test_message
Lock Details Before Processing:
Locked Until: 2024-01-09 07:28:38.710000+00:00
Lock Details After Processing:
Locked Until: 2024-01-09 07:28:38.710000+00:00

As seen, locked_until_utc is same for the message before and after renewing.

Actual Behavior

  1. The lock is not getting renewed.

Expected Behavior

  1. The lock should get renewed.
EldertGrootenboer commented 5 months ago

@Divyasri1510 can you try renewing the lock earlier? Currently the renew of the lock might come in / be processed after the lock has already expired.

Divyasri1510 commented 5 months ago

@EldertGrootenboer , I have tried renewing the lock at various timestamps earlier but the lock is not getting renewed. Do you want me to try renewing after any specific time interval?

l0lawrence commented 5 months ago

Hi @Divyasri1510 ,

If you are trying to renew your lock you need to register the renewal period for the lock before you start processing the message (in this case time.sleep()). You want to set a max_lock_renewal_duration that is longer than your processing time (it also should be longer than your lock duration of 5 minutes). max_lock_renewal_duration sets how long the AutoLockRenewer will keep the message lock renewed, in your case if you want to hold onto a lock for 20 minutes set max_lock_renewal_duration=1200. The AutoLockRenewer will then automatically renew your message for the next 20 minutes, for every 5 minutes when your lock starts to expire.

An example of this:

try:
    renewer = AutoLockRenewer()
    CONNECTION_STR = os.environ['SERVICEBUS_CONNECTION_STR']
    QUEUE_NAME = "q"

    servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)

    with servicebus_client:
        servicebus_receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, auto_lock_renewer=renewer)
        for message in servicebus_receiver:
            print("Received message:", message)
            print("Lock Details Before Processing:")
            print(f"Locked Until: {message.locked_until_utc}")
            # set this to be longer than the processing time
            renewer.register(servicebus_receiver, message, max_lock_renewal_duration=1200)
            # your lock will renew once because we have passed 5 minutes and you will see an updated locked_until_utc value
            time.sleep(310)
            print("Lock Details After Processing:")
            print(f"Locked Until: {message.locked_until_utc}")
            servicebus_receiver.complete_message(message)
except Exception as e:
    print(e)
Divyasri1510 commented 5 months ago

@l0lawrence , Thanks for clear explanation. I am able to renew lock successfully now.

EldertGrootenboer commented 5 months ago

Glad to see this now works @Divyasri1510, and thank you @l0lawrence for your support on this!