Azure / azure-sdk-for-python

This repository is for active development of the Azure SDK for Python. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/python/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-python.
MIT License
4.36k stars 2.71k forks source link

[ServiceBus] receive duplicate messages + hangs when link is force detached continuously #24901

Open swathipil opened 2 years ago

swathipil commented 2 years ago

Bug discovered while adding stress tests (#22852).

When a link is continuously force detached over a long period of time, we want to make sure that we continue receiving messages and the link attaches again/recovers. We also don't want to receive duplicate messages. However, we are receiving duplicate messages. I believe the message is not being dequeued from the internal message buffer/queue. To reproduce, run the following code:

import threading
from datetime import datetime
import os
import time
import concurrent
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.servicebus.management import ServiceBusAdministrationClient
import logging
import sys

# The logging levels below may need to be changed based on the logging that you want to suppress.
uamqp_logger = logging.getLogger('uamqp')
uamqp_logger.setLevel(logging.DEBUG)

# Configure a console output
#handler = logging.StreamHandler(stream=sys.stdout)
#uamqp_logger.addHandler(handler)

CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]

mgmt_client = ServiceBusAdministrationClient.from_connection_string(CONNECTION_STR, logging_enable=True)
queue_runtime_properties = mgmt_client.get_queue_runtime_properties(QUEUE_NAME)
print("Message Count:", queue_runtime_properties.total_message_count)
print("Active Message Count:", queue_runtime_properties.active_message_count)

NUM_MESSAGES = 2000
UPDATE_INTERVAL = 10
EXCEPTION = False
sb_client = ServiceBusClient.from_connection_string(
    CONNECTION_STR, logging_enable=True)
sender = sb_client.get_queue_sender(QUEUE_NAME)
batch_message = sender.create_message_batch()
for i in range(NUM_MESSAGES):
    try:
        batch_message.add_message(ServiceBusMessage(b"a" * 1024))
    except ValueError:
        # ServiceBusMessageBatch object reaches max_size.
        # New ServiceBusMessageBatch object can be created here to send more data.
        sender.send_messages(batch_message)
        batch_message = sender.create_message_batch()
sender.send_messages(batch_message)
time.sleep(10)
queue_runtime_properties = mgmt_client.get_queue_runtime_properties(QUEUE_NAME)
print("Message Count:", queue_runtime_properties.total_message_count)
print("Active Message Count:", queue_runtime_properties.active_message_count)
def receive():
    schedule_update_queue()
    try:
        receiver = sb_client.get_queue_receiver(QUEUE_NAME)
        msgs_received = {}
        with receiver:
            for msg in receiver:
                print(msg.sequence_number)
                num = msg.sequence_number
                if num in msgs_received:
                    print(f'duplicate msg:{num}')
                msgs_received[num] = 1
                receiver.complete_message(msg)

                if len(msgs_received) == NUM_MESSAGES:
                    print('all messages received')
                    EXCEPTION = True
                    #proc_pool.shutdown(wait=False, cancel_futures=True)
                    break
            print('outside for')
        print('outside with')
    except Exception as e:
        print('hit an exception need to kill program')
        print(e)
        EXCEPTION = True
        #sb_client.close()
        #servicebus_mgmt_client.close()
    print('after both')

def schedule_update_queue():
    def _update_queue_properties():
        if not EXCEPTION:
            queue_properties = mgmt_client.get_queue(QUEUE_NAME)
            if queue_properties.max_delivery_count == 10:
                queue_properties.max_delivery_count = 11
            else:
                queue_properties.max_delivery_count = 10
            mgmt_client.update_queue(queue_properties)
            print("Updating queue.")
    t = threading.Timer(UPDATE_INTERVAL, _update_queue_properties)
    t.start()

def update_queue():
    while True:
        queue_properties = mgmt_client.get_queue(QUEUE_NAME)
        if queue_properties.max_delivery_count == 10:
            queue_properties.max_delivery_count = 11
        else:
            queue_properties.max_delivery_count = 10
        mgmt_client.update_queue(queue_properties)
        print("Updating queue.")
        time.sleep(30)

print('start receiving')
with sb_client:
    receive()
print('end receive')
swathipil commented 2 years ago

breadcrumbs from Richard about running into something similar in Go: In go-amqp I have a function (called Prefetched()) that only exists to drain the internal message channel in the receiver. Since you have full and wonderful control over your receiver you could do the same. Like a "Flush()" One funny thing that happened to me is that I gave back a flushed message but it'd been so long since the last failure that they actually re-received it. So they got back a bum message (lock expired) and then they also, in the same batch, got the real message.

swathipil commented 2 years ago

more things Richard said: "Richard mentioned that you want to be careful to flush messages but do it quickly or else you might end up re-receiving a message when the older one expires"

richardpark-msft commented 2 years ago

I should have known that was going to happen. I regret all the time I spent helping you.

github-actions[bot] commented 2 weeks ago

Hi @swathipil, we deeply appreciate your input into this project. Regrettably, this issue has remained unresolved for over 2 years and inactive for 30 days, leading us to the decision to close it. We've implemented this policy to maintain the relevance of our issue queue and facilitate easier navigation for new contributors. If you still believe this topic requires attention, please feel free to create a new issue, referencing this one. Thank you for your understanding and ongoing support.