Azure / azure-sdk-for-python

This repository is for active development of the Azure SDK for Python. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/python/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-python.
MIT License
4.56k stars 2.78k forks source link

TypeError: Received message 257:None is not bytes #36280

Open SzilvasiPeter opened 3 months ago

SzilvasiPeter commented 3 months ago

Describe the bug We have deployed a service on an OpenShift cluster that ingests data from an Azure Service Bus resource. However, it throws the following error three times and then stops throwing it:

2024-06-28 11:29:32,258.258 ERROR _pyamqp_transport - create_servicebus_exception: Unexpected error occurred (TypeError('Received message 257:None is not bytes')). Handler shutting down.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/aio/_base_handler_async.py", line 260, in _do_retryable_operation
    return await operation(**kwargs)
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/aio/_servicebus_receiver_async.py", line 451, in _receive
    receiving = await amqp_receive_client.do_work_async()
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_pyamqp/aio/_client_async.py", line 352, in do_work_async
    return await self._client_run_async(**kwargs)
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_pyamqp/aio/_client_async.py", line 760, in _client_run_async
    await self._connection.listen(wait=self._socket_timeout, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_pyamqp/aio/_connection_async.py", line 794, in listen
    if await self._read_frame(wait=wait, **kwargs):
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_pyamqp/aio/_connection_async.py", line 276, in _read_frame
    new_frame = await self._transport.receive_frame(timeout=timeout, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_pyamqp/aio/_transport_async.py", line 76, in receive_frame
    header, channel, payload = await asyncio.wait_for(
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 445, in wait_for
    return fut.result()
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_pyamqp/aio/_transport_async.py", line 98, in read
    await self._read(8, buffer=frame_header, initial=True)
  File "/usr/local/lib/python3.10/site-packages/azure/servicebus/_pyamqp/aio/_transport_async.py", line 532, in _read
    data = await self.sock.receive_bytes()
  File "/usr/local/lib/python3.10/site-packages/aiohttp/client_ws.py", line 296, in receive_bytes
    raise TypeError(f"Received message {msg.type}:{msg.data!r} is not bytes")
TypeError: Received message 257:None is not bytes

There is also another kind of error in addition to the previous error:

2024-06-28 11:25:58,245.245 ERROR client - __del__: Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x7f6d906d4a60>

We were catching the errors and kept reinitializing the ServiceBusReceiver and it is an okay workaround.

To Reproduce Steps to reproduce the behavior:

  1. Initializing the Service Bus with connection string and HTTP proxy and starting listening on a specific topic and subscription.
# main.py
    http_proxy = {
        "proxy_hostname": http_settings.proxy_hostname,
        "proxy_port": http_settings.proxy_port,
    }

    async with ServiceBusClient.from_connection_string(
        conn_str=message_bus_settings.connection_string, http_proxy=http_proxy
    ) as service_bus_client:
        client = AzureServiceBus(service_bus_client, handle_storage_event)
        asyncio.create_task(
            client.listen(
                message_bus_settings.topic,
                message_bus_settings.subscription,
                message_bus_settings.max_message_count, # The max message count is 400
                message_bus_settings.max_wait_time, # The max wait time is 60
            )
        )
# azure_servicebus.py
class AzureServiceBus(Servicebus):
    def __init__(
        self, client: ServiceBusClient, handler: Callable[[StorageEventDto], Responses]
    ) -> None:
        self._client = client
        self._handler = handler

    async def listen(
        self, topic: str, subscription: str, max_msg_cnt: int, max_wait_time: float
    ) -> None:
        while True:
            await self.receive_events(topic, subscription, max_msg_cnt, max_wait_time)

    async def receive_events(
        self, topic: str, subs: str, max_msg_cnt: int, max_wait_time: float
    ) -> None:
        try:
            async with self._client.get_subscription_receiver(topic, subs) as recv:
                logger.info("The AzureServiceBus receiver is initialized.")
                messages = await recv.receive_messages(max_msg_cnt, max_wait_time)
                for msg in messages:
                    message: dict[str, Any] = json.loads(str(msg))
                    self._handler(StorageEventDto(**message))
                    await recv.complete_message(msg)
        except Exception as ex:
            logger.error(f"Error during receiving: {ex}")
  1. Deploying a FastAPI service (that receives messages from the Service Bus) to an OpenShift cluster

Expected behavior I expect to initialize the ServiceBusReceiver without encountering an error. So that the workaround can be omitted.

Additional context Locally, I haven't encountered this error.

kashifkhan commented 3 months ago

Thank you for the feedback @SzilvasiPeter . We will investigate and get back to you asap.