Azure / azure-sdk-for-python

This repository is for active development of the Azure SDK for Python. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/python/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-python.
MIT License
4.64k stars 2.84k forks source link

[EventHub] pyamqp write timeout for large batches + long roundtrip #29177

Open swathipil opened 1 year ago

swathipil commented 1 year ago

Sending a large batch to an EH located in east-asia/China cloud from US results in a write timeout error, then link is closed. Seems to only be happening on sync. Stack trace:

  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\_connection.py", line 838, in close
    self._outgoing_close(error=error)
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\_connection.py", line 441, in _outgoing_close
    self._send_frame(0, close_frame)
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\_connection.py", line 264, in _send_frame
    raise self._error
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\session.py", line 463, in end
    self._outgoing_end(error=error)
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\session.py", line 165, in _outgoing_end
    self._connection._process_outgoing_frame(  # pylint: disable=protected-access
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\_connection.py", line 653, in _process_outgoing_frame
    self._send_frame(channel, frame)
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\_connection.py", line 264, in _send_frame
    raise self._error
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\link.py", line 253, in detach
    self._outgoing_detach(close=close, error=error)
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\link.py", line 215, in _outgoing_detach
    self._session._outgoing_detach(detach_frame) # pylint: disable=protected-access
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\session.py", line 411, in _outgoing_detach
    self._connection._process_outgoing_frame(  # pylint: disable=protected-access
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\_connection.py", line 653, in _process_outgoing_frame
    self._send_frame(channel, frame)
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\_connection.py", line 264, in _send_frame
    raise self._error
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\link.py", line 253, in detach
    self._outgoing_detach(close=close, error=error)
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\link.py", line 215, in _outgoing_detach
    self._session._outgoing_detach(detach_frame) # pylint: disable=protected-access
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\session.py", line 411, in _outgoing_detach
    self._connection._process_outgoing_frame(  # pylint: disable=protected-access
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\_connection.py", line 653, in _process_outgoing_frame
    self._send_frame(channel, frame)
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\_connection.py", line 264, in _send_frame
    raise self._error
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\link.py", line 253, in detach
    self._outgoing_detach(close=close, error=error)
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\link.py", line 215, in _outgoing_detach
    self._session._outgoing_detach(detach_frame) # pylint: disable=protected-access
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\session.py", line 411, in _outgoing_detach
    self._connection._process_outgoing_frame(  # pylint: disable=protected-access
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\_connection.py", line 653, in _process_outgoing_frame
    self._send_frame(channel, frame)
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\_connection.py", line 264, in _send_frame
    raise self._error
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_client_base.py", line 583, in _do_retryable_operation
    return operation(
           ^^^^^^^^^^
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_producer.py", line 178, in _send_event_data
    self._amqp_transport.send_messages(
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_transport\_pyamqp_transport.py", line 296, in send_messages
    producer._handler.send_message(
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\client.py", line 665, in send_message
    self._do_retryable_operation(self._send_message_impl, message=message, **kwargs)
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\client.py", line 270, in _do_retryable_operation
    raise retry_settings["history"][-1]
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\client.py", line 249, in _do_retryable_operation
    return operation(*args, timeout=absolute_timeout, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\client.py", line 637, in _send_message_impl
    running = self.do_work()
              ^^^^^^^^^^^^^^
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\client.py", line 389, in do_work
    return self._client_run(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\client.py", line 564, in _client_run
    self._link.update_pending_deliveries()
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\sender.py", line 157, in update_pending_deliveries
    sent_and_settled = self._outgoing_transfer(delivery)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\sender.py", line 107, in _outgoing_transfer
    self._session._outgoing_transfer(  # pylint:disable=protected-access
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\session.py", line 325, in _outgoing_transfer
    self._connection._process_outgoing_frame(  # pylint: disable=protected-access
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\_connection.py", line 653, in _process_outgoing_frame
    self._send_frame(channel, frame)
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_pyamqp\_connection.py", line 264, in _send_frame
    raise self._error
azure.eventhub._pyamqp.error.AMQPConnectionError: Error condition: ErrorCondition.SocketError    
 Error Description: Can not send frame out due to exception: The write operation timed out       

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\samples\sync_samples\asia_client_secret_sync.py", line 67, in <module>
    client.send_batch(batch)
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_producer_client.py", line 692, in send_batch
    cast(EventHubProducer, self._producers[partition_id]).send(
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_producer.py", line 285, in send
    self._send_event_data_with_retry(timeout=timeout)
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_producer.py", line 183, in _send_event_data_with_retry
    return self._do_retryable_operation(self._send_event_data, timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\eventhub\azure-eventhub\azure\eventhub\_client_base.py", line 607, in _do_retryable_operation
    raise last_exception
azure.eventhub.exceptions.ConnectError: Can not send frame out due to exception: The write operation timed out
Error condition: ErrorCondition.SocketError
 Error Description: Can not send frame out due to exception: The write operation timed out

Sample to reproduce:

from azure.eventhub import EventHubProducerClient, EventData
import logging
import os
import sys
logger = logging.getLogger('azure.eventhub')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

eventhub_name = os.environ['EVENT_HUB_NAME']
connection_str = os.environ['EVENT_HUB_CONN_STR']
uamqp_transport = False

client = EventHubProducerClient.from_connection_string(
    connection_str, eventhub_name=eventhub_name, retry_total=0, uamqp_transport=uamqp_transport, logging_enable=True
    )
with client:
    payload = 250 * 1024
    batch = client.create_batch()
    batch.add(EventData("A" * payload))
    print('add batch')
    client.send_batch(batch)
    print('sent batch')
    client.send_event(EventData("A" * payload))

TODO:

kashifkhan commented 1 year ago

@swathipil does this mean that when the message was sent using send_event it went across fine but failed on send_batch ?

swathipil commented 1 year ago

@kashifkhan - Failed for both send_event/send_batch with the same write timeout error.