Azure / azure-sdk-for-python

This repository is for active development of the Azure SDK for Python. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/python/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-python.
MIT License
4.54k stars 2.76k forks source link

Azure Service Bus message retrieval not retrieving correct lock times #34270

Open jamespavett opened 7 months ago

jamespavett commented 7 months ago

Describe the bug When receiving messages off of the service bus using the receiver using recieve_messages, inconsistent behaviors seem to be occurring surrounding the locked_until_utc field.

received_messages is being called inside a while loop retrieving messages in batches, for each iteration returning, I would expect the locked_until_utc to be relatively similar, and then the next batch should have locked_until_utc values slightly in the future, and so on, as locked_until_utc should be set when the messages are received after their retrieval in each iteration of the loop.

Instead of this, locked_until_utc barely seems to increase at all while going through the script, regardless of the number of messages processed, or how long the script has been running The locked_until_utc times almost seem to be locked to the time of the first message retrieval, or the calling of get_queue_receiver.

To Reproduce Basic code example to show the issue, I should also add that I have tried doing the same synchronously with the same result.

import asyncio
from azure.core.exceptions import AzureError
from azure.servicebus.aio import ServiceBusClient
from termcolor import cprint
from datetime import datetime, timezone

async def process_messages(
        queue_name: str,
        limit: int,
        dry_run: bool,
        delete: bool):
    connection_string = "connection_string"

    matching_messages = 0

    servicebus_client = ServiceBusClient.from_connection_string(connection_string)

    sender = servicebus_client.get_queue_sender(queue_name)

    receiver = servicebus_client.get_queue_receiver(
        queue_name
    )

    async with servicebus_client, receiver, sender:
        while matching_messages < limit:

            print("**************************************************")
            print("New Batch")
            received_messages = await receiver.receive_messages(max_message_count=10)

            if not received_messages:
                break

            for msg in received_messages:
                print("Current Time: " + str(datetime.now(timezone.utc)))
                print("Locked Until: " + str(msg.locked_until_utc))
                cprint(f"Processing SequenceNumber: {msg.sequence_number}, MessageId: {msg.message_id}", "light_blue")

                matching_messages += 1
                if dry_run is False:
                    try:
                        if delete is False:
                            cprint("Moving message back to active queue", "light_green")
                            #receiver.complete_message(msg)
                            #sender.send_messages(msg)
                        else :
                            cprint("Deleting message", "light_green")
                            #receiver.complete_message(msg)
                    except AzureError:                          
                        cprint(f"Failed to renew lock or complete message SequenceNumber: {msg.sequence_number}", "red")
                else:
                    cprint("Message would be moved back to active queue", "light_yellow")

            cprint(
                f"{str(matching_messages)} messages processed ", 
                "light_green")

if __name__ == "__main__":
    args_queue_name = 'queue-name'
    args_limit = 1000
    args_dry_run = false
    args_delete = false

    asyncio.run(process_messages(
        args_queue_name,
        args_limit,
        args_dry_run,
        args_delete))`

Expected behavior

locked_until_utc times should be set when the message is received from the queue, and I would expect the time to be set to the current time + lock duration of the queue. While this does not currently seem to be the case.

Screenshots

At the start of the script, there is about a minute gap between the Current Time and Locked Until Time. However, the same time is seen across different batches, which I would not expect to be the case, as this should be moving further into the future. Sometimes the Locked Until Time does move forward by a few ms, but not as the same rate as the retrieval time.

image

As the script progresses this gap gets smaller and smaller, even though each message is being returned in a batch of no greater than 10 messages.

image

Eventually, I started getting errors due to being unable to complete messages, as the messages I just retrieved were already past their locked_until_utc times.

image

Additional context

I did try and replicate this with the .NET SDK for the Azure Service Bus, and while I could replicate it in part, I could also get around the issue, something I was unable to do with the Python SDK.

Edit:

Add Logger Output:

**************************************************
New Batch
-> FlowFrame(next_incoming_id=348, incoming_window=65189, next_outgoing_id=1, outgoing_window=65535, handle=3, delivery_count=346, link_credit=10, available=None, drain=None, echo=None, properties=None)
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
<- TransferFrame(handle=2, delivery_id=None, delivery_tag=None, message_format=None, settled=None, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
<- TransferFrame(handle=2, delivery_id=None, delivery_tag=None, message_format=None, settled=None, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
<- TransferFrame(handle=2, delivery_id=None, delivery_tag=None, message_format=None, settled=None, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
<- TransferFrame(handle=2, delivery_id=None, delivery_tag=None, message_format=None, settled=None, more=False, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
<- TransferFrame(handle=2, delivery_id=36, delivery_tag=b'\x1d\xa0\xd5\xf7\xdf\xb7yE\xaa\xda\xbb\xba\x8b\xcd\xfb\xb5', message_format=0, settled=None, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=True, payload=b'***')
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
<- TransferFrame(handle=2, delivery_id=None, delivery_tag=None, message_format=None, settled=None, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
<- TransferFrame(handle=2, delivery_id=None, delivery_tag=None, message_format=None, settled=None, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
<- TransferFrame(handle=2, delivery_id=None, delivery_tag=None, message_format=None, settled=None, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
<- TransferFrame(handle=2, delivery_id=None, delivery_tag=None, message_format=None, settled=None, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
<- TransferFrame(handle=2, delivery_id=None, delivery_tag=None, message_format=None, settled=None, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
<- TransferFrame(handle=2, delivery_id=None, delivery_tag=None, message_format=None, settled=None, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
Current Time: 2024-02-10 00:57:14.193835+00:00
Locked Until: 2024-02-10 00:58:07.559000+00:00
Processing SequenceNumber: 5247, MessageId: 14f855eace054350af814757a8f074ee
Moving message back to active queue
-> DispositionFrame(role=True, first=35, last=None, settled=True, state=Accepted(), batchable=None)
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
-> TransferFrame(handle=3, delivery_id=32, delivery_tag=b'\x00\x00\x00 ', message_format=0, settled=False, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
-> TransferFrame(handle=3, delivery_id=32, delivery_tag=b'\x00\x00\x00 ', message_format=0, settled=False, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
-> TransferFrame(handle=3, delivery_id=32, delivery_tag=b'\x00\x00\x00 ', message_format=0, settled=False, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
-> TransferFrame(handle=3, delivery_id=32, delivery_tag=b'\x00\x00\x00 ', message_format=0, settled=False, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
-> TransferFrame(handle=3, delivery_id=32, delivery_tag=b'\x00\x00\x00 ', message_format=0, settled=False, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
-> TransferFrame(handle=3, delivery_id=32, delivery_tag=b'\x00\x00\x00 ', message_format=0, settled=False, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
-> TransferFrame(handle=3, delivery_id=32, delivery_tag=b'\x00\x00\x00 ', message_format=0, settled=False, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
-> TransferFrame(handle=3, delivery_id=32, delivery_tag=b'\x00\x00\x00 ', message_format=0, settled=False, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
-> TransferFrame(handle=3, delivery_id=32, delivery_tag=b'\x00\x00\x00 ', message_format=0, settled=False, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
-> TransferFrame(handle=3, delivery_id=32, delivery_tag=b'\x00\x00\x00 ', message_format=0, settled=False, more=False, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
<- DispositionFrame(role=True, first=32, last=None, settled=True, state={'accepted': []}, batchable=None)
32 messages processed
**************************************************
New Batch
-> FlowFrame(next_incoming_id=359, incoming_window=65178, next_outgoing_id=1, outgoing_window=65535, handle=3, delivery_count=357, link_credit=10, available=None, drain=None, echo=None, properties=None)
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
<- TransferFrame(handle=2, delivery_id=None, delivery_tag=None, message_format=None, settled=None, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
<- TransferFrame(handle=2, delivery_id=None, delivery_tag=None, message_format=None, settled=None, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
<- TransferFrame(handle=2, delivery_id=None, delivery_tag=None, message_format=None, settled=None, more=False, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
<- TransferFrame(handle=2, delivery_id=37, delivery_tag=b's\xbfW\xa7\x08\xb1\xb9G\x81f~K\xb8M\xcf\x12', message_format=0, settled=None, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=True, payload=b'***')
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
<- TransferFrame(handle=2, delivery_id=None, delivery_tag=None, message_format=None, settled=None, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
<- TransferFrame(handle=2, delivery_id=None, delivery_tag=None, message_format=None, settled=None, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
<- TransferFrame(handle=2, delivery_id=None, delivery_tag=None, message_format=None, settled=None, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
<- TransferFrame(handle=2, delivery_id=None, delivery_tag=None, message_format=None, settled=None, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
Current Time: 2024-02-10 00:57:14.562591+00:00
Locked Until: 2024-02-10 00:58:07.559000+00:00
Processing SequenceNumber: 5248, MessageId: 237ccf4474ca41339a66b36b77cac356
Moving message back to active queue
-> DispositionFrame(role=True, first=36, last=None, settled=True, state=Accepted(), batchable=None)
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
CBS status check: state == <CbsAuthState.OK: 0>, expired == False, refresh required == False
-> TransferFrame(handle=3, delivery_id=33, delivery_tag=b'\x00\x00\x00!', message_format=0, settled=False, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
-> TransferFrame(handle=3, delivery_id=33, delivery_tag=b'\x00\x00\x00!', message_format=0, settled=False, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
-> TransferFrame(handle=3, delivery_id=33, delivery_tag=b'\x00\x00\x00!', message_format=0, settled=False, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
-> TransferFrame(handle=3, delivery_id=33, delivery_tag=b'\x00\x00\x00!', message_format=0, settled=False, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
-> TransferFrame(handle=3, delivery_id=33, delivery_tag=b'\x00\x00\x00!', message_format=0, settled=False, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
-> TransferFrame(handle=3, delivery_id=33, delivery_tag=b'\x00\x00\x00!', message_format=0, settled=False, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
-> TransferFrame(handle=3, delivery_id=33, delivery_tag=b'\x00\x00\x00!', message_format=0, settled=False, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
-> TransferFrame(handle=3, delivery_id=33, delivery_tag=b'\x00\x00\x00!', message_format=0, settled=False, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
-> TransferFrame(handle=3, delivery_id=33, delivery_tag=b'\x00\x00\x00!', message_format=0, settled=False, more=True, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
-> TransferFrame(handle=3, delivery_id=33, delivery_tag=b'\x00\x00\x00!', message_format=0, settled=False, more=False, rcv_settle_mode=None, state=None, resume=None, aborted=None, batchable=None, payload=b'***')
<- DispositionFrame(role=True, first=33, last=None, settled=True, state={'accepted': []}, batchable=None)
33 messages processed

Also attempted using uamqp transport and issue still occurs using that too.

mccoyp commented 7 months ago

Hi @jamespavett, thank you for opening an issue! I'll tag some folks who can help; we'll get back to you as soon as possible.

kashifkhan commented 7 months ago

Thank you for the detailed bug report @jamespavett . I had a few follow up questions for you:

The clients don't set the value for lock time, its received from the service. Once we are able to repro on our side, we will have next steps.

github-actions[bot] commented 7 months ago

Hi @jamespavett. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

jamespavett commented 7 months ago

@kashifkhan So I'll put a selection below of how the queue is configured, if you need anything else just let me know.

image

image

image

In .NET, originally I was setting a prefetchCount of 20. After removing the value and going back to the default, the locked until times seem to return as I would expect. I did try experimenting with the prefetch value in the python SDK but it never seemed to make a difference. I can share the .NET code if you like, but it does use pretty much an identical implementation.

I will give the autolock renewer a go to see if that mitigates anything, but I was getting the same issue before when manually trying to renew the locks.

jamespavett commented 7 months ago

@kashifkhan just tried with the autolock renewer, and didn't change the behavior at all.

kashifkhan commented 7 months ago

thanks for the update @jamespavett. We will try and repro on our end and go from there.

qq:- The code to complete messages is commented out, I assume its just for the sake of the repro ?

jamespavett commented 7 months ago

@kashifkhan yea that was just commented out for the repo. Same behaviour can be observed on my end either way.

kashifkhan commented 7 months ago

@jamespavett are you receiving large messages from your queue? If possible can you send us a sample message

jamespavett commented 7 months ago

@kashifkhan Unfortunately I can't send a sample, but they range from around 300 - 650 kb mostly. They are Symfony Envelopes for messages.

jamespavett commented 7 months ago

@kashifkhan done some more work surrounding this today. Message size seems to be a big factor, when processing messages that are around only a few kbs, everything seems to work as expected. But I seem to get this problem when processing larger messages.

kashifkhan commented 7 months ago

@jamespavett We figured that was the case looking at your logs but repro has been evading us. Are you able to see if the issue still happens when you send in max_message_count = 1

github-actions[bot] commented 7 months ago

Hi @jamespavett. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

jamespavett commented 7 months ago

Issue still persists after setting max_message_count to 1. To be fair I actually have tried a lot of configuration already surrounding this value and the prefetch_count.

yvgopal commented 6 months ago

LockedUntilUtc is set exactly when a message is locked in the queue. If 10 messages have the same LockedUntilUtc means they are all locked at the same instant. In this case, the receiver is prefetching messages when you first call receive or even before. Messages are locked when they are prefetched. When those messages are handed over to your application determines how much of lock duration is left. If a message is prefetched at instant x, but your application gets it via receive() call at x+10 seconds, you will see 10 seconds less LockedUntilUtc.

That's the reason you don't have this problem when you disabled prefetch in .net SDK. That's also the reason we suggest SDKs to default to 0 prefetch count.

I don't know how, but the SDK is prefetching messages in this case.

jarekhr commented 3 days ago

Hi,

I am experiencing exactly the same problem. I am sending quite large messages (ca. 1MB) onto a queue, and on another process, when listening with Azure SDK for python, I see exact same symptoms as explained above - time when lock is held is decreasing quickly and eventually my receiver is reporting problem as MessageLockLostError.

    await receiver.complete_message(msg)
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/aio/_servicebus_receiver_async.py", line 852, in complete_message
    await self._settle_message_with_retry(message, MESSAGE_COMPLETE)
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/aio/_servicebus_receiver_async.py", line 494, in _settle_message_with_retry
    raise MessageLockLostError(
azure.servicebus.exceptions.MessageLockLostError: The lock on the message lock has expired.

My setup:

Tried setting prefetch to 0, but this does not help - actually it is a default value anyway. @yvgopal your reasoning makes sense - question is why in this case SDK is prefetching multiple messages when all the parameters specify not to do so? In our use case we planned to exchange messages up to 50MB, but the pilot implementation fails with 1MB making Azure Service Bus unsuitable for the job.

@jamespavett were you able to somehow solve this problem?

l0lawrence commented 3 days ago

Hi @jarekhr , @jamespavett we have a PR out that we believe will help address this issue, it will be released in the next version of Service Bus

jarekhr commented 1 day ago

@l0lawrence , is the fix on the server side or in the client library? When do you expect the fix to be released to West Europe region? Thanks!

kashifkhan commented 5 hours ago

@jarekhr the fix is on the client library, so it will be available to everyone as soon as it lands on pypi. We will update this thread once that happens :)