Azure / azure-sdk-for-python

This repository is for active development of the Azure SDK for Python. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/python/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-python.
MIT License
4.57k stars 2.79k forks source link

Concurrent send_messages with async ServiceBusSender fails #36334

Open iangoldby opened 3 months ago

iangoldby commented 3 months ago

Describe the bug Sending multiple messages on the Service Bus concurrently using the same asyncio sender results in the handler shutting down, leading ultimately to ServiceBusError: Handler failed: 'NoneType' object has no attribute 'create_sender_link'.

To Reproduce

import asyncio

from azure.servicebus import ServiceBusMessage
from azure.servicebus.aio import ServiceBusClient

CONN_STR = <service bus connection string>
QUEUE_NAME = <queue name>

async def main():

    async with ServiceBusClient.from_connection_string(conn_str=CONN_STR) as client:
        async with client.get_queue_sender(QUEUE_NAME) as sender:
            await asyncio.gather(*(sender.send_messages(ServiceBusMessage("hello")) for _ in range(10)))

if __name__ == "__main__":
    asyncio.run(main())

This results in (usually) two messages being successfully queued, and the following traceback:

Unexpected error occurred (AttributeError("'NoneType' object has no attribute 'create_sender_link'")). Handler shutting down.
Traceback (most recent call last):
  File "<REDACTED>\site-packages\azure\servicebus\aio\_base_handler_async.py", line 260, in _do_retryable_operation
    return await operation(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<REDACTED>\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 239, in _send
    await self._amqp_transport.send_messages_async(
  File "<REDACTED>\site-packages\azure\servicebus\aio\_transport\_pyamqp_transport_async.py", line 141, in send_messages_async
    await sender._open()
  File "<REDACTED>\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 222, in _open
    while not await self._handler.client_ready_async():
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<REDACTED>\site-packages\azure\servicebus\_pyamqp\aio\_client_async.py", line 329, in client_ready_async
    if not await self._client_ready_async():
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<REDACTED>\site-packages\azure\servicebus\_pyamqp\aio\_client_async.py", line 510, in _client_ready_async
    self._link = self._session.create_sender_link(
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'create_sender_link'
Unexpected error occurred (AttributeError("'NoneType' object has no attribute 'client_ready_async'")). Handler shutting down.
Traceback (most recent call last):
  File "<REDACTED>\site-packages\azure\servicebus\aio\_base_handler_async.py", line 260, in _do_retryable_operation
    return await operation(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<REDACTED>\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 239, in _send
    await self._amqp_transport.send_messages_async(
  File "<REDACTED>\site-packages\azure\servicebus\aio\_transport\_pyamqp_transport_async.py", line 141, in send_messages_async
    await sender._open()
  File "<REDACTED>\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 222, in _open
    while not await self._handler.client_ready_async():
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'client_ready_async'
Unexpected error occurred (AttributeError("'NoneType' object has no attribute 'client_ready_async'")). Handler shutting down.
Traceback (most recent call last):
  File "<REDACTED>\site-packages\azure\servicebus\aio\_base_handler_async.py", line 260, in _do_retryable_operation
    return await operation(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<REDACTED>\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 239, in _send
    await self._amqp_transport.send_messages_async(
  File "<REDACTED>\site-packages\azure\servicebus\aio\_transport\_pyamqp_transport_async.py", line 141, in send_messages_async
    await sender._open()
  File "<REDACTED>\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 222, in _open
    while not await self._handler.client_ready_async():
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'client_ready_async'
Unexpected error occurred (AttributeError("'NoneType' object has no attribute 'client_ready_async'")). Handler shutting down.
Traceback (most recent call last):
  File "<REDACTED>\site-packages\azure\servicebus\aio\_base_handler_async.py", line 260, in _do_retryable_operation
    return await operation(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<REDACTED>\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 239, in _send
    await self._amqp_transport.send_messages_async(
  File "<REDACTED>\site-packages\azure\servicebus\aio\_transport\_pyamqp_transport_async.py", line 141, in send_messages_async
    await sender._open()
  File "<REDACTED>\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 222, in _open
    while not await self._handler.client_ready_async():
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'client_ready_async'
Unexpected error occurred (AttributeError("'NoneType' object has no attribute 'client_ready_async'")). Handler shutting down.
Traceback (most recent call last):
  File "<REDACTED>\site-packages\azure\servicebus\aio\_base_handler_async.py", line 260, in _do_retryable_operation
    return await operation(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<REDACTED>\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 239, in _send
    await self._amqp_transport.send_messages_async(
  File "<REDACTED>\site-packages\azure\servicebus\aio\_transport\_pyamqp_transport_async.py", line 141, in send_messages_async
    await sender._open()
  File "<REDACTED>\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 222, in _open
    while not await self._handler.client_ready_async():
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'client_ready_async'
Unexpected error occurred (AttributeError("'NoneType' object has no attribute 'client_ready_async'")). Handler shutting down.
Traceback (most recent call last):
  File "<REDACTED>\site-packages\azure\servicebus\aio\_base_handler_async.py", line 260, in _do_retryable_operation
    return await operation(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<REDACTED>\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 239, in _send
    await self._amqp_transport.send_messages_async(
  File "<REDACTED>\site-packages\azure\servicebus\aio\_transport\_pyamqp_transport_async.py", line 141, in send_messages_async
    await sender._open()
  File "<REDACTED>\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 222, in _open
    while not await self._handler.client_ready_async():
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'client_ready_async'
Unexpected error occurred (AttributeError("'NoneType' object has no attribute 'client_ready_async'")). Handler shutting down.
Traceback (most recent call last):
  File "<REDACTED>\site-packages\azure\servicebus\aio\_base_handler_async.py", line 260, in _do_retryable_operation
    return await operation(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<REDACTED>\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 239, in _send
    await self._amqp_transport.send_messages_async(
  File "<REDACTED>\site-packages\azure\servicebus\aio\_transport\_pyamqp_transport_async.py", line 141, in send_messages_async
    await sender._open()
  File "<REDACTED>\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 222, in _open
    while not await self._handler.client_ready_async():
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'client_ready_async'
Unexpected error occurred (AttributeError("'NoneType' object has no attribute 'client_ready_async'")). Handler shutting down.
Traceback (most recent call last):
  File "<REDACTED>\site-packages\azure\servicebus\aio\_base_handler_async.py", line 260, in _do_retryable_operation
    return await operation(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<REDACTED>\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 239, in _send
    await self._amqp_transport.send_messages_async(
  File "<REDACTED>\site-packages\azure\servicebus\aio\_transport\_pyamqp_transport_async.py", line 141, in send_messages_async
    await sender._open()
  File "<REDACTED>\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 222, in _open
    while not await self._handler.client_ready_async():
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'client_ready_async'
Traceback (most recent call last):
  File "<REDACTED>\site-packages\azure\servicebus\aio\_base_handler_async.py", line 260, in _do_retryable_operation
    return await operation(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<REDACTED>\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 239, in _send
    await self._amqp_transport.send_messages_async(
  File "<REDACTED>\site-packages\azure\servicebus\aio\_transport\_pyamqp_transport_async.py", line 141, in send_messages_async
    await sender._open()
  File "<REDACTED>\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 222, in _open
    while not await self._handler.client_ready_async():
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<REDACTED>\site-packages\azure\servicebus\_pyamqp\aio\_client_async.py", line 329, in client_ready_async
    if not await self._client_ready_async():
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<REDACTED>\site-packages\azure\servicebus\_pyamqp\aio\_client_async.py", line 510, in _client_ready_async
    self._link = self._session.create_sender_link(
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'create_sender_link'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "c:\Users\I0892\source\coode-test\test_servicebus_gather.py", line 17, in <module>
    asyncio.run(main())
  File "<REDACTED>\asyncio\runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "<REDACTED>\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<REDACTED>\asyncio\base_events.py", line 687, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "c:\Users\I0892\source\coode-test\test_servicebus_gather.py", line 13, in main
    await asyncio.gather(*(sender.send_messages(ServiceBusMessage("hello")) for _ in range(10)))
  File "<REDACTED>\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 447, in send_messages
    await self._do_retryable_operation(
  File "<REDACTED>\site-packages\azure\servicebus\aio\_base_handler_async.py", line 267, in _do_retryable_operation
    last_exception = await self._handle_exception(exception)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<REDACTED>\site-packages\azure\servicebus\aio\_base_handler_async.py", line 208, in _handle_exception
    raise error
azure.servicebus.exceptions.ServiceBusError: Handler failed: 'NoneType' object has no attribute 'create_sender_link'.

Expected behavior Ten messages sent to the queue and no errors.

Additional context Note that this is just a minimal example to reproduce the issue. The real application where I needed to make concurrent requests with asyncio.gather() was an RPC application with session_id.

I have also noticed that if I use an asyncio.Lock() to prevent the sender.send_messages() requests running concurrently then the problem is not exhibited.

iangoldby commented 3 months ago

As a side-note, if you are not supposed to be able to use the same ServiceBusSender concurrently in different tasks then it would be helpful if that were made unmissable in the documentation.

kashifkhan commented 3 months ago

Hi @iangoldby, the Servicebus library is not concurrent safe and as such is not meant to be used like that. The recommended approach would be to use an asyncio.Lock around the sender.

I do agree we need to make it more prominent ( with examples ) in our documentation and bring it up in to view compared to where it is today

iangoldby commented 3 months ago

@kashifkhan Thanks for your response and I'm glad you agree that the documentation would benefit from improving. In particular, I think it is fair to say that most developers would not expect a warning about thread safety also to apply to async code. A lot of developers would regard the async world as an escape from thread concurrency problems :-).