mxsdev / LocalSandbox

Local Azure Cloud emulator. Currently supporting Azure Service Bus.
Apache License 2.0
54 stars 2 forks source link

Support azure-sdk-for-python ServicebusClient #70

Open jvanegmond opened 2 days ago

jvanegmond commented 2 days ago

azure-sdk-for-python uses its own _pyamqp implementation that is specifically targeting the behavior of Azure Servicebus instead of a generic implementation of AMQP 1.0. LocalSandbox does not closely enough emulate the real Azure ServiceBus on a protocol frame level, missing multiple optional fields that real ServiceBus does send, leading to azure-sdk-for-python not working with LocalSandbox.

The following AMQP frames returned by LocalSandbox lack fields that the real Azure ServiceBus does send: open, begin, attach (and possibly more). Notably the open frame has 10 fields as returned by Azure Servicebus whereas LocalSandbox sends the open frame with 1 field. Even though the 10 fields sent are mostly empty, this leads to an error in azure-sdk-for-python in file sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_connection_async.py method _incoming_open where those Amqp frame fields are accessed via array subscription without checking the length of the fields array. This leads to the Amqp session not being able to be established.

To reproduce: Run the following Python script:

import asyncio
import logging
import sys

# Disable TLS. Workaround for https://github.com/Azure/azure-sdk-for-python/issues/34273
from azure.servicebus._pyamqp import AMQPClient
org_init = AMQPClient.__init__
def new_init(self, hostname, **kwargs):
    kwargs["use_tls"] = False
    org_init(self, hostname, **kwargs)
AMQPClient.__init__ = new_init

# Set up logging
handler = logging.StreamHandler(stream=sys.stdout)
logger = logging.getLogger('azure.servicebus')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)

from azure.servicebus import ServiceBusMessage, TransportType
from azure.servicebus.aio import ServiceBusClient

queue_name = "default"

async def main():

    client = ServiceBusClient.from_connection_string(
        conn_str="Endpoint=sb://default.default.default.localhost.localsandbox.sh;SharedAccessKeyName=1234;SharedAccessKey=password;UseDevelopmentEmulator=true",
        transport_type=TransportType.Amqp,
        retry_total=0,
    )
    async with client:

        async with client.get_queue_sender(queue_name) as sender:
            # Sending a single message
            single_message = ServiceBusMessage("Hello, world!")
            await sender.send_messages(single_message)

        # max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
        # Default is None; to receive forever.
        async with client.get_queue_receiver(queue_name, max_wait_time=30) as receiver:
            async for msg in receiver:  # ServiceBusReceiver instance is a generator.
                print(str(msg))

print()

asyncio.run(main())

This prints

Handler failed: list index out of range.
  File "azure-sdk-for-python\sdk\servicebus\azure-servicebus\azure\servicebus\aio\_base_handler_async.py", line 235, in _do_retryable_operation
    return await operation(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "azure-sdk-for-python\sdk\servicebus\azure-servicebus\azure\servicebus\aio\_servicebus_sender_async.py", line 214, in _open
    while not await self._handler.client_ready_async():
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "azure-sdk-for-python\sdk\servicebus\azure-servicebus\azure\servicebus\_pyamqp\aio\_client_async.py", line 318, in client_ready_async
    if not await self.auth_complete_async():
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "azure-sdk-for-python\sdk\servicebus\azure-servicebus\azure\servicebus\_pyamqp\aio\_client_async.py", line 305, in auth_complete_async
    await self._connection.listen(wait=self._socket_timeout)
  File "azure-sdk-for-python\sdk\servicebus\azure-servicebus\azure\servicebus\_pyamqp\aio\_connection_async.py", line 744, in listen
    if await self._read_frame(wait=wait, **kwargs):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "azure-sdk-for-python\sdk\servicebus\azure-servicebus\azure\servicebus\_pyamqp\aio\_connection_async.py", line 269, in _read_frame
    return await self._process_incoming_frame(*new_frame)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "azure-sdk-for-python\sdk\servicebus\azure-servicebus\azure\servicebus\_pyamqp\aio\_connection_async.py", line 606, in _process_incoming_frame
    await self._incoming_open(channel, fields)
  File "azure-sdk-for-python\sdk\servicebus\azure-servicebus\azure\servicebus\_pyamqp\aio\_connection_async.py", line 419, in _incoming_open
    if frame[4]:
       ~~~~~^^^

An excerpt of the offending piece of code:

    def _incoming_open(self, channel: int, frame) -> None:
        """Process incoming Open frame to finish the connection negotiation.

        The incoming frame format is::

            - frame[0]: container_id (str)
            - frame[1]: hostname (str)
            - frame[2]: max_frame_size (int)
            - frame[3]: channel_max (int)
            - frame[4]: idle_timeout (Optional[int])
            - frame[5]: outgoing_locales (Optional[List[bytes]])
            - frame[6]: incoming_locales (Optional[List[bytes]])
            - frame[7]: offered_capabilities (Optional[List[bytes]])
            - frame[8]: desired_capabilities (Optional[List[bytes]])
            - frame[9]: properties (Optional[Dict[bytes, bytes]])

        :param int channel: The incoming channel number.
        :param frame: The incoming Open frame.
        :type frame: Tuple[Any, ...]
        :rtype: None
        """
        # .... error checking cut in example ....
        if frame[4]:
            self._remote_idle_timeout = cast(float, frame[4] / 1000)  # Convert to seconds
            self._remote_idle_timeout_send_frame = self._idle_timeout_empty_frame_send_ratio * self._remote_idle_timeout

        if frame[2] < 512:
            # Max frame size is less than supported minimum.
            # If any of the values in the received open frame are invalid then the connection shall be closed.
            # The error amqp:invalid-field shall be set in the error.condition field of the CLOSE frame.
            self.close(
                error=AMQPError(
                    condition=ErrorCondition.InvalidField,
                    description="Failed parsing OPEN frame: Max frame size is less than supported minimum.",
                )
            )
            _LOGGER.error(
                "Failed parsing OPEN frame: Max frame size is less than supported minimum.",
                extra=self._network_trace_params,
            )
            return
        self._remote_max_frame_size = frame[2]
        self._remote_properties = frame[9]

Additional context: Considering that Microsoft is the maintainer of azure-sdk-for-python and specifically targets Azure Servicebus, it is not reasonable to change this on the azure-sdk-for-python ServiceBusClient side. LocalSandbox should aim to emulate the real Azure ServiceBus as much as possible in order to support azure-sdk-for-python.

linear[bot] commented 2 days ago

LOC-237 Support azure-sdk-for-python ServicebusClient