Azure / azure-sdk-for-python

This repository is for active development of the Azure SDK for Python. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/python/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-python.
MIT License
4.36k stars 2.71k forks source link

[servicebus] AttributeError: 'NoneType' object has no attribute 'client_ready_async' when reusing async ServiceBusSender objects #35618

Open dougli opened 1 month ago

dougli commented 1 month ago

Describe the bug Connections & sessions to service bus are extremely expensive to set up, taking 0.5~1.5s to initialize and teardown. Reusing the ServiceBusSender object mitigates this, but a race condition in the SDK connection flow causes exceptions:

Traceback (most recent call last):
  File "/opt/venv/lib/python3.11/site-packages/azure/servicebus/aio/_base_handler_async.py", line 260, in _do_retryable_operation
    return await operation(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/venv/lib/python3.11/site-packages/azure/servicebus/aio/_servicebus_sender_async.py", line 238, in _send
    await self._amqp_transport.send_messages_async(
  File "/opt/venv/lib/python3.11/site-packages/azure/servicebus/aio/_transport/_pyamqp_transport_async.py", line 141, in send_messages_async
    await sender._open()
  File "/opt/venv/lib/python3.11/site-packages/azure/servicebus/aio/_servicebus_sender_async.py", line 221, in _open
    while not await self._handler.client_ready_async():
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'client_ready_async'

To Reproduce Steps to reproduce the behavior:

client = ServiceBusClient(...)
topic_sender = client.get_topic_sender(topic_name="my_topic")

def send_message(id: int):
    for i in range(100):
        await topic_sender.send_messages(ServiceBusMessage(f"Hello, World from {id}"))

# Normally you would send in bulk if you knew you had many messages to send in advance.
# But in a busy webserver, this is usually what happens since you have a variable high load of users
# and your request handler only sends 1 message per request
tasks = []
for i in range(10):
    tasks.append(send_message(i))

await asyncio.gather(*tasks)

Expected behavior async SDKs should be async safe and throw no exceptions.

Additional context I've found the smoking gun for this bug. This bug is a race condition near line 222 in _servicebus_sender_async.py. Here's the relevant code:

if self._running:
    return
if self._handler:
    await self._handler.close_async()
auth = None if self._connection else (await create_authentication(self))
self._create_handler(auth)
try:
    await self._handler.open_async(connection=self._connection)
    while not await self._handler.client_ready_async():
        await asyncio.sleep(0.05)
    self._running = True

Even though it seems impossible that self._handler would be None on the client_ready_async call since the previous line worked, because these are all async functions, there's a chance that other async code can unset self._handler elsewhere.

The culprit is the if-check right at the top of that code block:

if self._handler:
    await self._handler.close_async()

This calls up some superclass which unsets self._handler. When we're starting a connection, we have an indeterminate state where self._running is False, but self._handler is True. If another parallel call comes into this code during the indeterminate state, it will disconnect the handler and null it out while the first call is still waiting in the while-loop.

github-actions[bot] commented 1 month ago

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @EldertGrootenboer.

kashifkhan commented 1 month ago

Thank you for the feedback @dougli . As you know the library currently is not coroutine safe, so our recommendation to users is to use a lock when accessing the producer like in your repro.

salustiana commented 3 weeks ago

Thanks @dougli for opening this issue and pinpointing the exact line where the bug occurs, I'm currently dealing with the same thing on a production service we're hosting. @kashifkhan any suggestions on how to implement that lock? Thanks in advance.

nickpetzold commented 3 weeks ago

@salustiana this is my implementation, seems to have resolved our issues.

import asyncio
import uuid

from azure.identity.aio import DefaultAzureCredential
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage

credential = DefaultAzureCredential()

client = ServiceBusClient(
    fully_qualified_namespace="my-servicebus.servicebus.windows.net",
    credential=credential,
    retry_total=10,
    retry_mode="exponential",
)
topic_sender = client.get_topic_sender(topic_name="my-topic")

class MyTopicSender:
    def __init__(self, topic_sender):
        self.lock = asyncio.Lock()
        self.topic_sender = topic_sender

    async def send_messages(self, id):
        for _ in range(100):
            async with self.lock:

                service_bus_message = ServiceBusMessage(
                    f"Hell-World-{id}",
                    subject=f"received-v1/{str(uuid.uuid4())}",
                    correlation_id=str(uuid.uuid4()),
                    message_id=str(uuid.uuid4()),
                )

                await topic_sender.send_message(service_bus_message)

my_topic_sender = MyTopicSender(topic_sender)
tasks = []
for i in range(10):
    tasks.append(my_topic_sender.send_messages(i))

await asyncio.gather(*tasks)

@kashifkhan would appreciate any input if you think it could be improved.

salustiana commented 1 week ago

@nickpetzold thanks a lot man, appreciate it.