Azure / azure-sdk-for-python

This repository is for active development of the Azure SDK for Python. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/python/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-python.
MIT License
4.56k stars 2.78k forks source link

[Service Bus] Connection Pool #33608

Open cmillani opened 9 months ago

cmillani commented 9 months ago

The issue

The service bus example for sending messages instantiates a new Client and Sender, and uses context managers to control the life cycle of the connections. Other examples on the same repository follow the same pattern:

# Creates serviceBusClient
with servicebus_client:
    # Creates a queueSender
    with sender:
        # Sends a message!

This way a new connection is created every time the code is executed, which may not be ideal for some scenarios like a daemon or web application.

If an instance of ServiceBusSender is created and used in a long-lived fashion we may be surprised with connection errors, and investigating the source code I found no way to detect if the connection was still valid without catching the following error:

ValueError: The handler has already been shutdown. Please use ServiceBusClient to create a new instance.

Proposed Solution

I would love a simple and resilient way to reduce the creation of new connection when sending messages.

I find the way SqlAlchemy works, where I'm able to create a session with a predefined pool size and then retrieve sessions from this pool, very neat!

# Somewhere I define a singleton instance of my engine
engine = create_engine(cls.get_connection_string_for(ConnectorEngines.SQL_ALCHEMY),
                                        pool_size=12,
                                        pool_pre_ping=True)

# Anywhere I need to run queries, I simple create a new session:
with Session(engine) as session:
   session.query(...)

Alternatives

By exposing methods that allow checking if the connection has been closed already, and maybe specializing the exception thrown in those cases so it is easier to catch than a ValueError, that is too generic, it would be easier to manage long lived connections, so we could get some benefits of a pool without needing to implement too much!

Additional Context & Data

Some simple comparison. As expected, reusing the connection is much faster, but lifecycle management is needed, so it is not as simple as saving the connection instance forever.

First, define all variables:

from azure.servicebus.aio import ServiceBusClient, ServiceBusSender
from azure.servicebus import ServiceBusMessage
import time

SERVICE_BUS_CONNECTION_STR=""
queue=""

Then, run reusing and recreating the connection:

t1 = time.time()
async with ServiceBusClient.from_connection_string(conn_str=SERVICE_BUS_CONNECTION_STR) as bus_client:
    async with bus_client.get_queue_sender(queue_name=queue) as sender:
        for i in range(10):
            await sender.send_messages(ServiceBusMessage('teste'))

print(time.time() - t1)

1.0489232540130615

Passing only the sender or both bus_client and sender inside the loop it takes 10 times more:

t1 = time.time()
async with ServiceBusClient.from_connection_string(conn_str=SERVICE_BUS_CONNECTION_STR) as bus_client:
    for i in range(10):
        async with bus_client.get_queue_sender(queue_name=queue) as sender:
            await sender.send_messages(ServiceBusMessage('teste'))

print(time.time() - t1)

10.35313105583191

Conclusions

Is there already any work being done to reduce instantiation of new connections? I would love to help creating it if it makes sense for the project!

Thanks for the help!

kashifkhan commented 9 months ago

Hi @cmillani ,

Thank you for the detailed and well laid out issue :)

I will have to check with some of my colleagues behind the scenes around long-lived connections and sending with AMQP, so I can provide you with a better answer or path forward. Given this towards the end of the year, a reply will be delayed as a number of folks are out of the office.

cmillani commented 9 months ago

Hello @kashifkhan, thanks for the fast reply!

Sure! No problem :)

I know very little about AMQP (be it 0.9 or 1.0), but while I was looking for references I found that Kombu implements pools, I also found an old implementation for Pika (but it seems unmaintained).

Another interesting points is this RabbitMQ article, although AFAIK it uses AMQP 0.9 it may provide some guidance! :)

I'll keep studying, if I'm able to come up with a PoC I'll make sure to share here!

fivecar commented 8 months ago

I've found related issues in a large-scale deployment that caused outages for us. There are two core issues that I've noticed:

Is there an ETA on when connection pooling will be enabled for Service Bus? Because without the pooling, the only choice for any large-scale service is to hold one ServiceBusClient open globally -- which itself is prone to issues because it's not threadsafe.

wenleix commented 6 months ago

As a workaround, we could do connection pooling on app side. But would really appreciate official support in Azure Python SDK. An over-simplified demo would be:

class ServiceBusSenderPool:
    def __init__(
        self,
        client: ServiceBusClient,
        queue_name: str,
        pool_size: int,
    ):
        self.client = client
        self.available_senders = Queue()
        self.pool_name = pool_name

        for _ in range(pool_size):
            self.available_senders.put_nowait(
                client.get_queue_sender(queue_name=queue_name)
            )

    def get_sender(self) -> "_SenderContextManager":
        return _SenderContextManager(self)

    async def close(self):
        while not self.available_senders.empty():
            sender: ServiceBusSender = self.available_senders.get_nowait()
            await sender.close()

    def _return_sender(self, sender: ServiceBusSender):
        self.available_senders.put_nowait(sender)

class _SenderContextManager:
    def __init__(self, sender_pool: ServiceBusSenderPool):
        self.sender_pool = sender_pool

    async def __aenter__(self):
        start_time = time.monotonic()
        self.sender = await self.sender_pool.available_senders.get()
        return self.sender

    async def __aexit__(self, exc_type, exc, tb):
        # NOTE: Sometimes need to recreate the sender, for example if certain exceptions are thrown.
        self.sender_pool._return_sender(self.sender)

And we could use it in concurrency environment:


SERVICE_BUS_CLIENT = None
SENDER_POOL = None

async def send_message(id: int):
    for i in range(100):
        async with SENDER_POOL.get_sender() as sender:
            await sender.send_messages(ServiceBusMessage(f"Hello, World from {id}"))

    print(f"Sent 100 messages from {id}")

async def main():
    global SERVICE_BUS_CLIENT, SERVICE_BUS_SENDER

    SERVICE_BUS_CLIENT = ServiceBusClient.from_connection_string(...)
    SENDER_POOL = ServiceBusSenderPool(SERVICE_BUS_CLIENT, queue_name="test", pool_size=10)

    tasks = []
    for i in range(20):
        tasks.append(send_message(i))

    await asyncio.gather(*tasks)

    await SERVICE_BUS_SENDER.close()
    await SERVICE_BUS_CLIENT.close()

if __name__ == "__main__":
    asyncio.run(main())
dougli commented 4 months ago

I found the root cause that was causing reuse of ServiceBusSender object to fail. See issue #35618

cmillani commented 1 month ago

Hello @kashifkhan, any updates on this?

It would be a game changer for us, we implemented a simple reuse but having a low level implementation would be cleaner and more efficient.