Azure / azure-sdk-for-python

This repository is for active development of the Azure SDK for Python. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/python/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-python.
MIT License
4.61k stars 2.82k forks source link

MessageReceiverState Error : Not receiving events from some partitions #35948

Closed Benniah closed 3 months ago

Benniah commented 5 months ago

Describe the bug I have an Evenethub Consumer client that streams data into a snowflake table running inside an azure container instance . The Eventhub has 6 partitions which the consumer client can fetch events from when started.

However after a couple of hours , the client stops receiving events from one or more partitions .When the container is manually restarted the client is bale to claim ownership of those partitions again and the stream is resumed .

To Reproduce Steps to reproduce the behavior:

  1. I am using boilerplate code from the documentation with a little modification to process and insert events into snowflake

Expected behavior continue to receive events from all partitions

LOGS

INFO:__main__:Received events from partition: 1
INFO:uamqp.c_uamqp:CBS error occurred on connection b'EHReceiver-xxxxxxxxxxx-partition0'.
INFO:uamqp.receiver:Receiver link failed to open - expecting to receive DETACH frame.
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxxxxx-partition0' state changed from <ConnectionState.OPENED: 9> to <ConnectionState.CLOSE_RCVD: 10>
INFO:uamqp.receiver:Message receiver b'receiver-link-734198d4-db38-4768-877f-71d72b919bd7' state changed from <MessageReceiverState.Error: 5> to <MessageReceiverState.Error: 5> on connection: b'EHReceiver-xxxxxxxxxxx-partition0'
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxxxxx-partition0' state changed from <ConnectionState.CLOSE_RCVD: 10> to <ConnectionState.END: 13>
INFO:uamqp.receiver:Message receiver b'receiver-link-734198d4-db38-4768-877f-71d72b919bd7' state changed from <MessageReceiverState.Error: 5> to <MessageReceiverState.Error: 5> on connection: b'EHReceiver-xxxxxxxxxxx-partition0'
INFO:uamqp.connection:Received Connection close event: b'amqp:connection:forced'
Connection: b'EHReceiver-xxxxxxxxxxx-partition0'
Description: b"The connection was closed by container 'af980dee699c43f3a9d903a9fcd72432_G4' because it did not have any active links in the past 60000 milliseconds. TrackingId:af980dee699c43f3a9d903a9fcd72432_G4, SystemTracker:gateway5, Timestamp:2024-06-06T09:00:42"
Details: None
INFO:uamqp.c_uamqp:b'SSL channel closed in decode_ssl_received_bytes.' (b'/project/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/adapters/tlsio_openssl.c':b'decode_ssl_received_bytes':886)
INFO:uamqp.c_uamqp:b'Error callback received in unexpected state' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'on_underlying_io_error':243)
INFO:uamqp.c_uamqp:b'Error in decode_ssl_received_bytes.' (b'/project/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/adapters/tlsio_openssl.c':b'on_underlying_io_bytes_received':935)

INFO:uamqp.async_ops.connection_async:Shutting down connection b'EHReceiver-xxxxxxxxxxx-partition0'.
INFO:uamqp.authentication.cbs_auth_async:Shutting down CBS session on connection: b'EHReceiver-xxxxxxxxxxx-partition0'.
INFO:uamqp.authentication.cbs_auth_async:Auth closed, destroying session on connection: b'EHReceiver-xxxxxxxxxxx-partition0'.
INFO:uamqp.authentication.cbs_auth_async:Finished shutting down CBS session on connection: b'EHReceiver-xxxxxxxxxxx-partition0'.
INFO:uamqp.c_uamqp:b'send called while not open' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_send_async':1181)
INFO:uamqp.c_uamqp:b'Cannot send encoded bytes' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'on_bytes_encoded':268)
INFO:uamqp.c_uamqp:b'saslclientio_close called while not open' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_close_async':1130)
INFO:uamqp.c_uamqp:b'xio_close failed' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'on_bytes_encoded':272)
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxxxxx-partition0' state changed from <ConnectionState.END: 13> to <ConnectionState.END: 13>
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxxxxx-partition0' state changed from <ConnectionState.END: 13> to <ConnectionState.END: 13>
INFO:uamqp.c_uamqp:b'saslclientio_close called while not open' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_close_async':1130)
INFO:uamqp.c_uamqp:b'xio_close failed' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'connection_close':1437)
INFO:uamqp.async_ops.connection_async:Connection shutdown complete b'EHReceiver-xxxxxxxxxxx-partition0'.
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxxxxx-partition0' state changed from <ConnectionState.START: 0> to <ConnectionState.START: 0>
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxxxxx-partition0' state changed from <ConnectionState.START: 0> to <ConnectionState.START: 0>
Received events from partition: 5
INFO:__main__:Received events from partition: 5
Received events from partition: 3
INFO:__main__:Received events from partition: 3

INFO:uamqp.async_ops.client_async:Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-xxxxxxxxxxx-partition0'
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxxxxx-partition0' state changed from <ConnectionState.START: 0> to <ConnectionState.END: 13>
INFO:uamqp.connection:Connection with ID b'EHReceiver-xxxxxxxxxxx-partition0' unexpectedly in an error state. Closing: False, Error: None
INFO:uamqp.c_uamqp:b'AMQP management instance not open' (b'/project/src/vendor/azure-uamqp-c/src/amqp_management.c':b'amqp_management_close':1061)
INFO:uamqp.c_uamqp:CBS for connection b'EHReceiver-xxxxxxxxxxx-partition0' completed opening with status: 2
WARNING:uamqp.async_ops.connection_async:ConnectionClose('ErrorCodes.UnknownError: Connection in an unexpected error state.')
INFO:uamqp.async_ops.client_async:Connection keep-alive for 'ReceiveClientAsync' failed: ConnectionClose('ErrorCodes.UnknownError: Connection in an unexpected error state.').
INFO:uamqp.receiver:Message receiver b'receiver-link-4e45327d-1d30-465b-bae4-ff5816f2331f' state changed from <MessageReceiverState.Idle: 1> to <MessageReceiverState.Opening: 2> on connection: b'EHReceiver-xxxxxxxxxxx-partition0'
INFO:uamqp.receiver:Receiver link failed to open - expecting to receive DETACH frame.
WARNING:uamqp.async_ops.connection_async:ConnectionClose('ErrorCodes.UnknownError: Connection in an unexpected error state.')
INFO:uamqp.async_ops.client_async:CBS session pending b'EHReceiver-xxxxxxxxxxx-partition0'.
INFO:uamqp.async_ops.client_async:Closing exclusive connection b'EHReceiver-xxxxxxxxxxx-partition0'.
Received events from partition: 5
INFO:__main__:Received events from partition: 5
Received events from partition: 1
INFO:__main__:Received events from partition: 1
INFO:uamqp.async_ops.connection_async:Shutting down connection b'EHReceiver-xxxxxxxxxxx-partition0'.
INFO:uamqp.authentication.cbs_auth_async:Shutting down CBS session on connection: b'EHReceiver-xxxxxxxxxxx-partition0'.
INFO:uamqp.c_uamqp:Token put complete with result: 2, status: 0, description: b'CBS Session closed.', connection: b'EHReceiver-xxxxxxxxxxx-partition0'
INFO:uamqp.authentication.cbs_auth_async:Auth closed, destroying session on connection: b'EHReceiver-xxxxxxxxxxx-partition0'.
INFO:uamqp.authentication.cbs_auth_async:Finished shutting down CBS session on connection: b'EHReceiver-xxxxxxxxxxx-partition0'.
INFO:uamqp.c_uamqp:b'send called while not open' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_send_async':1181)
INFO:uamqp.c_uamqp:b'Cannot send encoded bytes' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'on_bytes_encoded':268)
INFO:uamqp.c_uamqp:b'saslclientio_close called while not open' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_close_async':1130)
INFO:uamqp.c_uamqp:b'xio_close failed' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'on_bytes_encoded':272)
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxxxxx-partition0' state changed from <ConnectionState.END: 13> to <ConnectionState.END: 13>
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxxxxx-partition0' state changed from <ConnectionState.END: 13> to <ConnectionState.END: 13>
INFO:uamqp.c_uamqp:b'saslclientio_close called while not open' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_close_async':1130)
INFO:uamqp.c_uamqp:b'xio_close failed' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'connection_close':1437)
INFO:uamqp.async_ops.connection_async:Connection shutdown complete b'EHReceiver-xxxxxxxxxxx-partition0'.

Additional context using uamqp_transport=True

github-actions[bot] commented 5 months ago

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @kasun04.

kashifkhan commented 5 months ago

Thank you for the feedback @Benniah. A few questions for you

Benniah commented 5 months ago

Hi @kashifkhan ,

kashifkhan commented 5 months ago

@Benniah based on #2 , is it possible if that task is slowing down things enough that there is a delay on the network?

github-actions[bot] commented 5 months ago

Hi @Benniah. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

Benniah commented 5 months ago

HI @kashifkhan ,

here are a couple of changes i made recently and observations i made about the event stream .

OBSERVATIONS

LOGS

INFO:__main__:Processed batch in 2.49 seconds
Received events from partition: 3
INFO:__main__:Received events from partition: 3
Loaded 4 records into Snowflake
INFO:__main__:Loaded 4 records into Snowflake
INFO:azure.core.pipeline.policies.http_logging_policy:Request URL: 'REDACTED'
Request method: 'PUT'
Request headers:
    'x-ms-encryption-algorithm': 'REDACTED'
    'x-ms-version': 'REDACTED'
    'Accept': 'application/xml'
    'x-ms-meta-offset': 'REDACTED'
    'x-ms-meta-sequencenumber': 'REDACTED'
    'x-ms-date': 'REDACTED'
    'x-ms-client-request-id': 'REDACTED'
    'User-Agent': 'azsdk-python-storage-blob/12.7.1 Python/3.9.1 (Linux-5.10.102.2-microsoft-standard-x86_64-with-glibc2.28)'
    'Authorization': 'REDACTED'
No body was attached to the request
INFO:azure.eventhub._pyamqp.aio._link_async:An error occurred when detaching the link: AMQPConnectionError('Error condition: ErrorCondition.SocketError\n Error Description: Can not read frame due to exception: [Errno 104] Connection reset by peer')
INFO:azure.eventhub._pyamqp.aio._link_async:Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACHED: 0>
INFO:azure.eventhub._pyamqp.aio._link_async:An error occurred when detaching the link: AMQPConnectionError('Error condition: ErrorCondition.SocketError\n Error Description: Can not read frame due to exception: [Errno 104] Connection reset by peer')
INFO:azure.eventhub._pyamqp.aio._link_async:Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACHED: 0>
INFO:azure.eventhub._pyamqp.aio._management_link_async:Management link receiver state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACHED: 0>
INFO:azure.eventhub._pyamqp.aio._link_async:An error occurred when detaching the link: AMQPConnectionError('Error condition: ErrorCondition.SocketError\n Error Description: Can not read frame due to exception: [Errno 104] Connection reset by peer')
INFO:azure.eventhub._pyamqp.aio._link_async:Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACHED: 0>
INFO:azure.eventhub._pyamqp.aio._management_link_async:Management link sender state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACHED: 0>
INFO:azure.eventhub._pyamqp.aio._session_async:An error occurred when ending the session: AMQPConnectionError('Error condition: ErrorCondition.SocketError\n Error Description: Can not read frame due to exception: [Errno 104] Connection reset by peer')
INFO:azure.eventhub._pyamqp.aio._session_async:Session state changed: <SessionState.MAPPED: 3> -> <SessionState.UNMAPPED: 0>
INFO:azure.eventhub._pyamqp.aio._connection_async:An error occurred when closing the connection: AMQPConnectionError('Error condition: ErrorCondition.SocketError\n Error Description: Can not read frame due to exception: [Errno 104] Connection reset by peer')
INFO:azure.eventhub._pyamqp.aio._connection_async:Connection state changed: <ConnectionState.OPENED: 9> -> <ConnectionState.END: 13>
Loaded 4 records into Snowflake
INFO:__main__:Loaded 4 records into Snowflake
INFO:azure.core.pipeline.policies.http_logging_policy:Request URL: 'Redacted'
INFO:__main__:Processed batch in 2.49 seconds
Received events from partition: 3
INFO:__main__:Received events from partition: 3
Loaded 4 records into Snowflake
INFO:__main__:Loaded 4 records into Snowflake
INFO:azure.core.pipeline.policies.http_logging_policy:Request URL: 'REDACTED'
Request method: 'PUT'
Request headers:
    'x-ms-encryption-algorithm': 'REDACTED'
    'x-ms-version': 'REDACTED'
    'Accept': 'application/xml'
    'x-ms-meta-offset': 'REDACTED'
    'x-ms-meta-sequencenumber': 'REDACTED'
    'x-ms-date': 'REDACTED'
    'x-ms-client-request-id': 'REDACTED'
    'User-Agent': 'azsdk-python-storage-blob/12.7.1 Python/3.9.1 (Linux-5.10.102.2-microsoft-standard-x86_64-with-glibc2.28)'
    'Authorization': 'REDACTED'
No body was attached to the request
INFO:azure.eventhub._pyamqp.aio._link_async:An error occurred when detaching the link: AMQPConnectionError('Error condition: ErrorCondition.SocketError\n Error Description: Can not read frame due to exception: [Errno 104] Connection reset by peer')
INFO:azure.eventhub._pyamqp.aio._link_async:Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACHED: 0>
INFO:azure.eventhub._pyamqp.aio._link_async:An error occurred when detaching the link: AMQPConnectionError('Error condition: ErrorCondition.SocketError\n Error Description: Can not read frame due to exception: [Errno 104] Connection reset by peer')
INFO:azure.eventhub._pyamqp.aio._link_async:Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACHED: 0>
INFO:azure.eventhub._pyamqp.aio._management_link_async:Management link receiver state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACHED: 0>
INFO:azure.eventhub._pyamqp.aio._link_async:An error occurred when detaching the link: AMQPConnectionError('Error condition: ErrorCondition.SocketError\n Error Description: Can not read frame due to exception: [Errno 104] Connection reset by peer')
INFO:azure.eventhub._pyamqp.aio._link_async:Link state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACHED: 0>
INFO:azure.eventhub._pyamqp.aio._management_link_async:Management link sender state changed: <LinkState.ATTACHED: 3> -> <LinkState.DETACHED: 0>
INFO:azure.eventhub._pyamqp.aio._session_async:An error occurred when ending the session: AMQPConnectionError('Error condition: ErrorCondition.SocketError\n Error Description: Can not read frame due to exception: [Errno 104] Connection reset by peer')
INFO:azure.eventhub._pyamqp.aio._session_async:Session state changed: <SessionState.MAPPED: 3> -> <SessionState.UNMAPPED: 0>
INFO:azure.eventhub._pyamqp.aio._connection_async:An error occurred when closing the connection: AMQPConnectionError('Error condition: ErrorCondition.SocketError\n Error Description: Can not read frame due to exception: [Errno 104] Connection reset by peer')
INFO:azure.eventhub._pyamqp.aio._connection_async:Connection state changed: <ConnectionState.OPENED: 9> -> <ConnectionState.END: 13>
Loaded 4 records into Snowflake
INFO:__main__:Loaded 4 records into Snowflake
INFO:azure.core.pipeline.policies.http_logging_policy:Request URL: 'REDACTED'
Request method: 'PUT'
Request headers:
    'x-ms-encryption-algorithm': 'REDACTED'
    'x-ms-version': 'REDACTED'
    'Accept': 'application/xml'
    'x-ms-meta-offset': 'REDACTED'
    'x-ms-meta-sequencenumber': 'REDACTED'
    'x-ms-date': 'REDACTED'
    'x-ms-client-request-id': 'REDACTED'
    'User-Agent': 'azsdk-python-storage-blob/12.7.1 Python/3.9.1 (Linux-5.10.102.2-microsoft-standard-x86_64-with-glibc2.28)'
    'Authorization': 'REDACTED'
No body was attached to the request
Loaded 4 records into Snowflake
INFO:__main__:Loaded 4 records into Snowflake
INFO:azure.core.pipeline.policies.http_logging_policy:Request URL: 'REDACTED'
Request method: 'PUT'
Request headers:
    'x-ms-encryption-algorithm': 'REDACTED'
    'x-ms-version': 'REDACTED'
    'Accept': 'application/xml'
    'x-ms-meta-offset': 'REDACTED'
    'x-ms-meta-sequencenumber': 'REDACTED'
    'x-ms-date': 'REDACTED'
    'x-ms-client-request-id': 'REDACTED'
    'User-Agent': 'azsdk-python-storage-blob/12.7.1 Python/3.9.1 (Linux-5.10.102.2-microsoft-standard-x86_64-with-glibc2.28)'
    'Authorization': 'REDACTED'
No body was attached to the request
INFO:azure.core.pipeline.policies.http_logging_policy:Response status: 200
Response headers:
    'Content-Length': '0'
    'Last-Modified': 'Thu, 06 Jun 2024 16:07:30 GMT'
    'Etag': '"0x8DC8642C4736420"'
    'Server': 'Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0'
    'x-ms-request-id': 'REDACTED'
    'x-ms-client-request-id': 'REDACTED'
    'x-ms-version': 'REDACTED'
    'x-ms-request-server-encrypted': 'REDACTED'
    'Date': 'Thu, 06 Jun 2024 16:07:29 GMT'
Processed batch in 3.47 seconds
INFO:__main__:Processed batch in 3.47 seconds
Received events from partition: 4
INFO:__main__:Received events from partition: 4
Loaded 4 records into Snowflake
INFO:__main__:Loaded 4 records into Snowflake
INFO:azure.core.pipeline.policies.http_logging_policy:Request URL: 'REDACTED'
Request method: 'PUT'
Request headers:
    'x-ms-encryption-algorithm': 'REDACTED'
    'x-ms-version': 'REDACTED'
    'Accept': 'application/xml'
    'x-ms-meta-offset': 'REDACTED'
    'x-ms-meta-sequencenumber': 'REDACTED'
    'x-ms-date': 'REDACTED'
    'x-ms-client-request-id': 'REDACTED'
    'User-Agent': 'azsdk-python-storage-blob/12.7.1 Python/3.9.1 (Linux-5.10.102.2-microsoft-standard-x86_64-with-glibc2.28)'
    'Authorization': 'REDACTED'
No body was attached to the request
INFO:azure.core.pipeline.policies.http_logging_policy:Response status: 200
Response headers:
    'Content-Length': '0'
    'Last-Modified': 'Thu, 06 Jun 2024 16:07:30 GMT'
    'Etag': '"0x8DC8642C4B26D00"'
    'Server': 'Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0'
    'x-ms-request-id': 'REDACTED'
    'x-ms-client-request-id': 'REDACTED'
    'x-ms-version': 'REDACTED'
    'x-ms-request-server-encrypted': 'REDACTED'
    'Date': 'Thu, 06 Jun 2024 16:07:30 GMT'
Processed batch in 2.67 seconds
INFO:__main__:Processed batch in 2.67 seconds
Received events from partition: 2
INFO:__main__:Received events from partition: 2
INFO:azure.core.pipeline.policies.http_logging_policy:Response status: 200
Response headers:
    'Content-Length': '0'
    'Last-Modified': 'Thu, 06 Jun 2024 16:07:31 GMT'
    'Etag': '"0x8DC8642C50186B9"'
    'Server': 'Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0'
    'x-ms-request-id': 'REDACTED'
    'x-ms-client-request-id': 'REDACTED'
    'x-ms-version': 'REDACTED'
    'x-ms-request-server-encrypted': 'REDACTED'
    'Date': 'Thu, 06 Jun 2024 16:07:30 GMT'
Processed batch in 2.28 seconds
INFO:__main__:Processed batch in 2.28 seconds
Received events from partition: 0
INFO:__main__:Received events from partition: 0
INFO:azure.core.pipeline.policies.http_logging_policy:Response status: 200
Response headers:
REDACTED'
Request method: 'PUT'
Request headers:
    'x-ms-encryption-algorithm': 'REDACTED'
    'x-ms-version': 'REDACTED'
    'Accept': 'application/xml'
    'x-ms-meta-offset': 'REDACTED'
    'x-ms-meta-sequencenumber': 'REDACTED'
    'x-ms-date': 'REDACTED'
    'x-ms-client-request-id': 'REDACTED'
    'User-Agent': 'azsdk-python-storage-blob/12.7.1 Python/3.9.1 (Linux-5.10.102.2-microsoft-standard-x86_64-with-glibc2.28)'
    'Authorization': 'REDACTED'
No body was attached to the request
Loaded 4 records into Snowflake
INFO:__main__:Loaded 4 records into Snowflake
INFO:azure.core.pipeline.policies.http_logging_policy:Request URL: 'REDACTED'
Request method: 'PUT'
Request headers:
    'x-ms-encryption-algorithm': 'REDACTED'
    'x-ms-version': 'REDACTED'
    'Accept': 'application/xml'
    'x-ms-meta-offset': 'REDACTED'
    'x-ms-meta-sequencenumber': 'REDACTED'
    'x-ms-date': 'REDACTED'
    'x-ms-client-request-id': 'REDACTED'
    'User-Agent': 'azsdk-python-storage-blob/12.7.1 Python/3.9.1 (Linux-5.10.102.2-microsoft-standard-x86_64-with-glibc2.28)'
    'Authorization': 'REDACTED'
No body was attached to the request
INFO:azure.core.pipeline.policies.http_logging_policy:Response status: 200
Response headers:
    'Content-Length': '0'
    'Last-Modified': 'Thu, 06 Jun 2024 16:07:30 GMT'
    'Etag': '"0x8DC8642C4736420"'
    'Server': 'Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0'
    'x-ms-request-id': 'REDACTED'
    'x-ms-client-request-id': 'REDACTED'
    'x-ms-version': 'REDACTED'
    'x-ms-request-server-encrypted': 'REDACTED'
    'Date': 'Thu, 06 Jun 2024 16:07:29 GMT'
Processed batch in 3.47 seconds
INFO:__main__:Processed batch in 3.47 seconds
Received events from partition: 4
INFO:__main__:Received events from partition: 4
Loaded 4 records into Snowflake
INFO:__main__:Loaded 4 records into Snowflake
INFO:azure.core.pipeline.policies.http_logging_policy:Request URL: 'REDACTED'
Request method: 'PUT'
Request headers:
    'x-ms-encryption-algorithm': 'REDACTED'
    'x-ms-version': 'REDACTED'
    'Accept': 'application/xml'
    'x-ms-meta-offset': 'REDACTED'
    'x-ms-meta-sequencenumber': 'REDACTED'
    'x-ms-date': 'REDACTED'
    'x-ms-client-request-id': 'REDACTED'
    'User-Agent': 'azsdk-python-storage-blob/12.7.1 Python/3.9.1 (Linux-5.10.102.2-microsoft-standard-x86_64-with-glibc2.28)'
    'Authorization': 'REDACTED'
No body was attached to the request
INFO:azure.core.pipeline.policies.http_logging_policy:Response status: 200
Response headers:
    'Content-Length': '0'
    'Last-Modified': 'Thu, 06 Jun 2024 16:07:30 GMT'
    'Etag': '"0x8DC8642C4B26D00"'
    'Server': 'Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0'
    'x-ms-request-id': 'REDACTED'
    'x-ms-client-request-id': 'REDACTED'
    'x-ms-version': 'REDACTED'
    'x-ms-request-server-encrypted': 'REDACTED'
    'Date': 'Thu, 06 Jun 2024 16:07:30 GMT'
Processed batch in 2.67 seconds
INFO:__main__:Processed batch in 2.67 seconds
Received events from partition: 2
INFO:__main__:Received events from partition: 2
INFO:azure.core.pipeline.policies.http_logging_policy:Response status: 200
Response headers:
    'Content-Length': '0'
    'Last-Modified': 'Thu, 06 Jun 2024 16:07:31 GMT'
    'Etag': '"0x8DC8642C50186B9"'
    'Server': 'Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0'
    'x-ms-request-id': 'REDACTED'
    'x-ms-client-request-id': 'REDACTED'
    'x-ms-version': 'REDACTED'
    'x-ms-request-server-encrypted': 'REDACTED'
    'Date': 'Thu, 06 Jun 2024 16:07:30 GMT'
Processed batch in 2.28 seconds
INFO:__main__:Processed batch in 2.28 seconds
Received events from partition: 0
INFO:__main__:Received events from partition: 0
INFO:azure.core.pipeline.policies.http_logging_policy:Response status: 200
Response headers:

Container Metrics

Screenshot 2024-06-06 at 18 27 10
Benniah commented 5 months ago

Hi @kashifkhan ,

I made a few changes to the client and got rid of that async task that was preprocessing events and i think you are right about it interfering with the network somehow . I just started a fresh data stream that has been running for a while now with no issues .

I have a dashboard to track the lag from each partition in snowflake . I will monitor this over the weekend . hopefully nothing changes . Screenshot 2024-06-07 at 14 18 58

kashifkhan commented 5 months ago

Thank you for the update @Benniah . If we get this issue resolved, I'm still interested in the odd behavior when you went to uamqp_transport=False, if its not a bother would you be able to share some detail logs, Ill send you the code on how to instrument.

Benniah commented 5 months ago

Hi @kashifkhan , So the event stream run fine over the weekend until today when i noticed i slight lag in one partition . After taking a look at the logs , i found the following which seems to be connection related :

INFO:uamqp.async_ops.connection_async:Shutting down connection b'EHReceiver-xxxxxxxx-partition0'.
INFO:uamqp.authentication.cbs_auth_async:Shutting down CBS session on connection: b'EHReceiver-xxxxxxxx-partition0'.
INFO:uamqp.c_uamqp:Token put complete with result: 2, status: 0, description: b'CBS Session closed.', connection: b'EHReceiver-xxxxxxxx-partition0'
INFO:uamqp.c_uamqp:CBS for connection b'EHReceiver-xxxxxxxx-partition0' completed opening with status: 3
INFO:uamqp.authentication.cbs_auth_async:Auth closed, destroying session on connection: b'EHReceiver-xxxxxxxx-partition0'.
INFO:uamqp.authentication.cbs_auth_async:Finished shutting down CBS session on connection: b'EHReceiver-xxxxxxxx-partition0''.
INFO:uamqp.c_uamqp:b'send called while not open' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_send_async':1181)
INFO:uamqp.c_uamqp:b'Cannot send encoded bytes' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'on_bytes_encoded':268)
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxx-partition0' state changed from <ConnectionState.START: 0> to <ConnectionState.END: 13>
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxx-partition0' state changed from <ConnectionState.END: 13> to <ConnectionState.END: 13>
INFO:uamqp.c_uamqp:b'saslclientio_close called while not open' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_close_async':1130)
INFO:uamqp.c_uamqp:b'xio_close failed' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'connection_close':1437)
INFO:uamqp.async_ops.connection_async:Connection shutdown complete b'EHReceiver-xxxxxxxx-partition0'.
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxx-partition0' state changed from <ConnectionState.START: 0> to <ConnectionState.START: 0>
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxx-partition0' state changed from <ConnectionState.START: 0> to <ConnectionState.START: 0>

It goes on for a while without recovering and since this is not logged as a critical error , my on_error callback is not triggered to terminate and restart the container instance . FYI manually restarting the container resolves the issue .

Some More Logs

INFO:uamqp.authentication.cbs_auth_async:Token on connection b'EHReceiver-xxxxxxxxxxxxx-partition4' will expire soon - attempting to refresh.
INFO:uamqp.c_uamqp:Token put complete with result: 1, status: 202, description: b'Accepted', connection: b'EHReceiver-xxxxxxxxxxxxx-partition4'
Received events from partition: 4

INFO:uamqp.authentication.cbs_auth_async:Token on connection b'EHReceiver-xxxxxxxxxxxxx-partition0' will expire soon - attempting to refresh.
INFO:uamqp.c_uamqp:Token put complete with result: 1, status: 202, description: b'Accepted', connection: b'xxxxxxxxxxxxx-partition0'

INFO:uamqp.async_ops.client_async:Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-xxxxxxxxxxxxx-partition4'
INFO:uamqp.async_ops.client_async:Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-xxxxxxxxxxxxx-partition0'
INFO:uamqp.async_ops.client_async:Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-xxxxxxxxxxxxx-partition5'
INFO:uamqp.async_ops.client_async:Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-xxxxxxxxxxxxx-partition2'
INFO:uamqp.async_ops.client_async:Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-xxxxxxxxxxxxx-partition3'
INFO:uamqp.async_ops.client_async:CBS session pending b'EHReceiver-xxxxxxxxxxxxx-partition1'.
INFO:uamqp.async_ops.client_async:Closing exclusive connection b'EHReceiver-xxxxxxxxxxxxx-partition1'.

INFO:uamqp.async_ops.connection_async:Shutting down connection b'EHReceiver-xxxxxxxxxxxxx-partition1'.
INFO:uamqp.authentication.cbs_auth_async:Shutting down CBS session on connection: b'EHReceiver-xxxxxxxxxxxxx-partition1'.
INFO:uamqp.authentication.cbs_auth_async:Auth closed, destroying session on connection: b'EHReceiver-xxxxxxxxxxxxx-partition1'.
INFO:uamqp.authentication.cbs_auth_async:Finished shutting down CBS session on connection: b'EHReceiver-xxxxxxxxxxxxx-partition1'.
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxxxxxxx-partition1' state changed from <ConnectionState.OPENED: 9> to <ConnectionState.END: 13>
INFO:uamqp.async_ops.connection_async:Connection shutdown complete b'EHReceiver-xxxxxxxxxxxxx-partition1'.
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxxxxxxx-partition1' state changed from <ConnectionState.START: 0> to <ConnectionState.START: 0>

INFO:uamqp.async_ops.connection_async:Shutting down connection b'EHReceiver-xxxxxxxxxxxxx-partition1'.
INFO:uamqp.authentication.cbs_auth_async:Shutting down CBS session on connection: b'EHReceiver-xxxxxxxxxxxxx-partition1'.
INFO:uamqp.c_uamqp:Token put complete with result: 2, status: 0, description: b'CBS Session closed.', connection: b'EHReceiver-xxxxxxxxxxxxx-partition1'
INFO:uamqp.authentication.cbs_auth_async:Auth closed, destroying session on connection: b'EHReceiver-xxxxxxxxxxxxx-partition1'.
INFO:uamqp.authentication.cbs_auth_async:Finished shutting down CBS session on connection: b'EHReceiver-xxxxxxxxxxxxx-partition1'.
INFO:uamqp.c_uamqp:b'send called while not open' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_send_async':1181)
INFO:uamqp.c_uamqp:b'Cannot send encoded bytes' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'on_bytes_encoded':268)
INFO:uamqp.c_uamqp:b'saslclientio_close called while not open' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_close_async':1130)
INFO:uamqp.c_uamqp:b'xio_close failed' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'on_bytes_encoded':272)
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxxxxxxx-partition1' state changed from <ConnectionState.END: 13> to <ConnectionState.END: 13>
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxxxxxxx-partition1' state changed from <ConnectionState.END: 13> to <ConnectionState.END: 13>
INFO:uamqp.c_uamqp:b'saslclientio_close called while not open' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_close_async':1130)
INFO:uamqp.c_uamqp:b'xio_close failed' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'connection_close':1437)
INFO:uamqp.async_ops.connection_async:Connection shutdown complete b'EHReceiver-xxxxxxxxxxxxx-partition1'.
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxxxxxxx-partition1' state changed from <ConnectionState.START: 0> to <ConnectionState.START: 0>
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxxxxxxx-partition1' state changed from <ConnectionState.START: 0> to <ConnectionState.START: 0>
Received events from partition: 3
INFO:__main__:Received events from partition: 3

After Container Restart

INFO:uamqp.async_ops.connection_async:Shutting down connection b'EHReceiver-xxxxxxxxxxxxxx-partition5'.
INFO:uamqp.authentication.cbs_auth_async:Shutting down CBS session on connection: b'EHReceiver-xxxxxxxxxxxxxx-partition5'.
INFO:uamqp.c_uamqp:Token put complete with result: 2, status: 0, description: b'CBS Session closed.', connection: b'EHReceiver-xxxxxxxxxxxxxx-partition5'
INFO:uamqp.c_uamqp:CBS for connection b'EHReceiver-xxxxxxxxxxxxxx-partition5' completed opening with status: 3
INFO:uamqp.authentication.cbs_auth_async:Auth closed, destroying session on connection: b'EHReceiver-xxxxxxxxxxxxxx-partition5'.
INFO:uamqp.authentication.cbs_auth_async:Finished shutting down CBS session on connection: b'EHReceiver-xxxxxxxxxxxxxx-partition5'.
INFO:uamqp.c_uamqp:b'Failure: sending socket failed. errno=104 (Connection reset by peer).' (b'/project/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/adapters/socketio_berkeley.c':b'socketio_send':944)
INFO:uamqp.c_uamqp:b'Error in xio_send.' (b'/project/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/adapters/tlsio_openssl.c':b'write_outgoing_bytes':702)
INFO:uamqp.c_uamqp:b'Error in write_outgoing_bytes.' (b'/project/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/adapters/tlsio_openssl.c':b'tlsio_openssl_send':1553)
INFO:uamqp.c_uamqp:b'xio_send failed' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_send_async':1190)
INFO:uamqp.c_uamqp:b'Cannot send encoded bytes' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'on_bytes_encoded':268)
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxxxxxxxx-partition5' state changed from <ConnectionState.HDR_SENT: 2> to <ConnectionState.END: 13>
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxxxxxxxx-partition5' state changed from <ConnectionState.END: 13> to <ConnectionState.END: 13>
INFO:uamqp.c_uamqp:b'saslclientio_close called while not open' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_close_async':1130)
INFO:uamqp.c_uamqp:b'xio_close failed' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'connection_close':1437)
INFO:uamqp.async_ops.connection_async:Connection shutdown complete b'EHReceiver-xxxxxxxxxxxxxx-partition5'.
INFO:uamqp.connection:Connection b'EHReceiver-xxxxxxxxxxxxxx-partition5' state changed from <ConnectionState.START: 0> to <ConnectionState.START: 0>
INFO:uamqp.connection:Connection b'EHReceixxxxxxxxxxxxxx-partition5' state changed from <ConnectionState.START: 0> to <ConnectionState.START: 0>

An error occurred: Connection timeout to host https:REDACTED
ERROR:__main__:An error occurred: Connection timeout to host https://REDACTED?comp=metadata
WARNING:azure.eventhub.extensions.checkpointstoreblobaio._blobstoragecsaio:An exception occurred when EventProcessor instance '0e9fe475-bc74-4e94-b6dd-469b3610f51b' claim_ownership for namespace 'REDACTED.servicebus.windows.net' eventhub 'REDACTED' consumer group 'REDACTED' partition '3'. The ownership is now lost. Exception is ServiceResponseError('Connection timeout to host https://REDACTED/ownership/3?comp=metadata')
WARNING:azure.eventhub.extensions.checkpointstoreblobaio._blobstoragecsaio:An exception occurred when EventProcessor instance '0e9fe475-bc74-4e94-b6dd-469b3610f51b' claim_ownership for namespace 'REDACTED.servicebus.windows.net' eventhub 'REDACTED' consumer group 'REDACTED' partition '0'. The ownership is now lost. Exception is ServiceResponseError('Connection timeout to host https://REDACTED.servicebus.windows.net/REDACTED/REDACTED/ownership/0?comp=metadata')
WARNING:azure.eventhub.extensions.checkpointstoreblobaio._blobstoragecsaio:An exception occurred when EventProcessor instance '0e9fe475-bc74-4e94-b6dd-469b3610f51b' claim_ownership for namespace 'REDACTED.windows.net' eventhub 'REDACTED' consumer group 'REDACTED' partition '2'. The ownership is now lost. Exception is ServiceResponseError('Connection timeout to host https://REDACTED.servicebus.windows.net/REDACTED/REDACTED/ownership/2?comp=metadata')
WARNING:azure.eventhub.extensions.checkpointstoreblobaio._blobstoragecsaio:An exception occurred when EventProcessor instance '0e9fe475-bc74-4e94-b6dd-469b3610f51b' claim_ownership for namespace 'REDACTED.servicebus.windows.net' eventhub 'REDACTED' consumer group 'REDACTED' partition '5'. The ownership is now lost. Exception is ServiceResponseError('Connection timeout to host https://REDACTED/ownership/5?comp=metadata')
WARNING:azure.eventhub.extensions.checkpointstoreblobaio._blobstoragecsaio:An exception occurred when EventProcessor instance '0e9fe475-bc74-4e94-b6dd-469b3610f51b' claim_ownership for namespace 'REDACTED.servicebus.windows.net' eventhub 'REDACTED' consumer group 'REDACTED' partition '4'. The ownership is now lost. Exception is ServiceResponseError('Connection timeout to host https://REDACTED.blob.core.windows.net/REDACTED.servicebus.windows.net/REDACTED/ownership/4?comp=metadata')
INFO:uamqp.async_ops.client_async:CBS session pending b'EHReceiver-28962368-25a0-41d6-9db9-92c0d2134510-partition1'.
INFO:uamqp.async_ops.client_async:Closing exclusive connection b'EHReceiver-28962368-25a0-41d6-9db9-92c0d2134510-partition1'.
kashifkhan commented 4 months ago

@Benniah the error logs are just saying that the connection is closed and its not able to write to that stream. Are there any signs of stuff on the network being wonky during that time ?

Would you be able to turn on debug logging to see if there is something more happening ?

import logging
import sys

handler = logging.StreamHandler(stream=sys.stdout)
logger = logging.getLogger('azure.eventhub')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)

logger2 = logging.getLogger('uamqp')
logger2.setLevel(logging.DEBUG)
logger2.addHandler(handler)

...

from azure.eventhub import EventHubConsumerClient

consumer = EventHubConsumerClient(..., logging_enable=True)
Benniah commented 4 months ago

Hi @kashifkhan , when you say 'write to that stream' are you referring to the Blobstorage checkpointing ? Because thats what i mainly see in the logs .

Here are the DEBUG logs you requested for . It created a huge spike in CPU Usage and keeps going .

Deallocating 'DictValue'
Destroying 'DictValue'
Deallocating 'cApplicationProperties'
Destroying 'cApplicationProperties'
Deallocating 'cMessageAnnotations'
Destroying 'cMessageAnnotations'
Parsing received message application properties 14905.
Wrapping value type: <AMQPType.DictValue: 20>
Wrapping value type: <AMQPType.StringValue: 17>
Wrapping value type: <AMQPType.StringValue: 17>
Deallocating 'StringValue'
Destroying 'StringValue'
Deallocating 'StringValue'
Destroying 'StringValue'
Deallocating 'DictValue'
Destroying 'DictValue'
Parsing received message annotations 14905.
Wrapping value type: <AMQPType.DictValue: 20>
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.IntValue: 9>
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.LongValue: 10>
Deallocating 'SymbolValue'
Destroying 'SymbolValue'
Deallocating 'IntValue'
Destroying 'IntValue'
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.StringValue: 17>
Deallocating 'SymbolValue'
Destroying 'SymbolValue'
Deallocating 'LongValue'
Destroying 'LongValue'
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.TimestampValue: 14>
Deallocating 'SymbolValue'
Destroying 'SymbolValue'
Deallocating 'StringValue'
Destroying 'StringValue'
Deallocating 'SymbolValue'
Destroying 'SymbolValue'
Deallocating 'TimestampValue'
Destroying 'TimestampValue'
Deallocating 'DictValue'
Destroying 'DictValue'
Deallocating 'cApplicationProperties'
Destroying 'cApplicationProperties'
Deallocating 'cMessageAnnotations'
Destroying 'cMessageAnnotations'
Parsing received message application properties 14906.
Wrapping value type: <AMQPType.DictValue: 20>
Wrapping value type: <AMQPType.StringValue: 17>
Wrapping value type: <AMQPType.StringValue: 17>
Deallocating 'StringValue'
Destroying 'StringValue'
Deallocating 'StringValue'
Destroying 'StringValue'
Deallocating 'DictValue'
Destroying 'DictValue'
Parsing received message annotations 14906.
Wrapping value type: <AMQPType.DictValue: 20>
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.IntValue: 9>
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.LongValue: 10>
Deallocating 'SymbolValue'
Destroying 'SymbolValue'
Deallocating 'IntValue'
Destroying 'IntValue'
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.StringValue: 17>
Deallocating 'SymbolValue'
Destroying 'SymbolValue'
Deallocating 'LongValue'
Destroying 'LongValue'
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.TimestampValue: 14>
Deallocating 'SymbolValue'
Destroying 'SymbolValue'
Deallocating 'StringValue'
Destroying 'StringValue'
Deallocating 'SymbolValue'
Destroying 'SymbolValue'
Deallocating 'TimestampValue'
Destroying 'TimestampValue'
Deallocating 'DictValue'
Destroying 'DictValue'
Deallocating 'cApplicationProperties'
Destroying 'cApplicationProperties'
Deallocating 'cMessageAnnotations'
Destroying 'cMessageAnnotations'
Parsing received message application properties 14907.
Wrapping value type: <AMQPType.DictValue: 20>
Wrapping value type: <AMQPType.StringValue: 17>
Wrapping value type: <AMQPType.StringValue: 17>
Deallocating 'StringValue'
Destroying 'StringValue'
Deallocating 'StringValue'
Destroying 'StringValue'
Deallocating 'DictValue'
Destroying 'DictValue'
Parsing received message annotations 14907.
Wrapping value type: <AMQPType.DictValue: 20>
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.IntValue: 9>
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.LongValue: 10>
Deallocating 'SymbolValue'
Destroying 'SymbolValue'
Deallocating 'IntValue'
Destroying 'IntValue'
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.StringValue: 17>
Deallocating 'SymbolValue'
Destroying 'SymbolValue'
Deallocating 'LongValue'
Destroying 'LongValue'
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.TimestampValue: 14>
Deallocating 'SymbolValue'
Destroying 'SymbolValue'
Deallocating 'StringValue'
Destroying 'StringValue'
Deallocating 'SymbolValue'
Destroying 'SymbolValue'
Deallocating 'TimestampValue'
Destroying 'TimestampValue'
Deallocating 'DictValue'
Destroying 'DictValue'
Deallocating 'cApplicationProperties'
Destroying 'cApplicationProperties'
Deallocating 'cMessageAnnotations'
Destroying 'cMessageAnnotations'
Parsing received message application properties 14908.
Wrapping value type: <AMQPType.DictValue: 20>
Wrapping value type: <AMQPType.StringValue: 17>
Wrapping value type: <AMQPType.StringValue: 17>
Deallocating 'StringValue'
Destroying 'StringValue'
Deallocating 'StringValue'
Destroying 'StringValue'
Deallocating 'DictValue'
Destroying 'DictValue'
Parsing received message annotations 14908.
Wrapping value type: <AMQPType.DictValue: 20>
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.IntValue: 9>
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.LongValue: 10>
Deallocating 'SymbolValue'
Destroying 'SymbolValue'
Deallocating 'IntValue'
Destroying 'IntValue'
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.StringValue: 17>
Deallocating 'SymbolValue'
Destroying 'SymbolValue'
Deallocating 'LongValue'
Destroying 'LongValue'
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.TimestampValue: 14>
Deallocating 'SymbolValue'
Destroying 'SymbolValue'
Deallocating 'StringValue'
Destroying 'StringValue'
Deallocating 'SymbolValue'
Destroying 'SymbolValue'
Deallocating 'TimestampValue'
Destroying 'TimestampValue'
Deallocating 'DictValue'
Destroying 'DictValue'
Deallocating 'cApplicationProperties'
Destroying 'cApplicationProperties'
Deallocating 'cMessageAnnotations'
Destroying 'cMessageAnnotations'
Parsing received message application properties 14909.
Wrapping value type: <AMQPType.DictValue: 20>
Wrapping value type: <AMQPType.StringValue: 17>
Wrapping value type: <AMQPType.StringValue: 17>
Deallocating 'StringValue'
Destroying 'StringValue'
Deallocating 'StringValue'
Destroying 'StringValue'
Deallocating 'DictValue'
Destroying 'DictValue'
Parsing received message annotations 14909.
Wrapping value type: <AMQPType.DictValue: 20>
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.IntValue: 9>
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.LongValue: 10>
Deallocating 'SymbolValue'
Destroying 'SymbolValue'
Deallocating 'IntValue'
Destroying 'IntValue'
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.StringValue: 17>
Deallocating 'SymbolValue'
Destroying 'SymbolValue'
Deallocating 'LongValue'
Destroying 'LongValue'
Wrapping value type: <AMQPType.SymbolValue: 18>
Wrapping value type: <AMQPType.TimestampValue: 14>

Also setting the LOG level back to INFO with Logging_enable=True gives the following :

b'<- [TRANSFER]* {2,2848,<>,0,true,false,NULL,NULL,NULL,NULL,true}'
b'<- [TRANSFER]* {2,2849,<>,0,true,false,NULL,NULL,NULL,NULL,true}'
b'<- [TRANSFER]* {2,2850,<>,0,true,false,NULL,NULL,NULL,NULL,true}'
b'<- [TRANSFER]* {2,2851,<>,0,true,false,NULL,NULL,NULL,NULL,true}'
b'<- [TRANSFER]* {2,2852,<>,0,true,false,NULL,NULL,NULL,NULL,true}'
b'<- [TRANSFER]* {2,2853,<>,0,true,false,NULL,NULL,NULL,NULL,true}'
b'<- [FLOW]* {1,5000,2855,62682,NULL,NULL,NULL,NULL,NULL,true,NULL}'
b"<- [DETACH]* {0,true,* {amqp:connection:forced,The connection was inactive for more than the allowed 240000 milliseconds and is closed by container 'b1a6a94f4a8d4a35a7b74908af34d494_G5'.,NULL}}"
b'-> [DETACH]* {0,true}'
CBS error occurred on connection b'EHReceiver-2317a7e2-7af1-43a6-b85c-002443a38e6b-partition1'.
b"<- [DETACH]* {1,true,* {amqp:connection:forced,The connection was inactive for more than the allowed 240000 milliseconds and is closed by container 'b1a6a94f4a8d4a35a7b74908af34d494_G5'.,NULL}}"
b'-> [DETACH]* {1,true}'
b"<- [DETACH]* {2,true,* {amqp:connection:forced,The connection was inactive for more than the allowed 240000 milliseconds and is closed by container 'b1a6a94f4a8d4a35a7b74908af34d494_G5'.,NULL}}"
b'-> [DETACH]* {2,true}'
Received Link detach event: b'amqp:connection:forced'
Link: b'receiver-link-79fcfa8f-0623-4132-a90d-3a0f4a858ae0'
Description: b"The connection was inactive for more than the allowed 240000 milliseconds and is closed by container 'b1a6a94f4a8d4a35a7b74908af34d494_G5'."
Details: None
Retryable: True
Connection: b'EHReceiver-2317a7e2-7af1-43a6-b85c-002443a38e6b-partition1'
Message receiver b'receiver-link-79fcfa8f-0623-4132-a90d-3a0f4a858ae0' state changed from <MessageReceiverState.Open: 3> to <MessageReceiverState.Error: 5> on connection: b'EHReceiver-2317a7e2-7af1-43a6-b85c-002443a38e6b-partition1'
b'<- [END]* {NULL}'
b'-> [END]* {}'
b"<- [CLOSE]* {* {amqp:connection:forced,The connection was inactive for more than the allowed 240000 milliseconds and is closed by container 'b1a6a94f4a8d4a35a7b74908af34d494_G5'.,NULL}}"
Connection b'EHReceiver-2317a7e2-7af1-43a6-b85c-002443a38e6b-partition1' state changed from <ConnectionState.OPENED: 9> to <ConnectionState.CLOSE_RCVD: 10>
b'-> [CLOSE]* {}'
Connection b'EHReceiver-2317a7e2-7af1-43a6-b85c-002443a38e6b-partition1' state changed from <ConnectionState.CLOSE_RCVD: 10> to <ConnectionState.END: 13>
Received Connection close event: b'amqp:connection:forced'
Connection: b'EHReceiver-2317a7e2-7af1-43a6-b85c-002443a38e6b-partition1'
Description: b"The connection was inactive for more than the allowed 240000 milliseconds and is closed by container 'b1a6a94f4a8d4a35a7b74908af34d494_G5'."
Details: None
b'SSL channel closed in decode_ssl_received_bytes.' (b'/project/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/adapters/tlsio_openssl.c':b'decode_ssl_received_bytes':886)
b'Error callback received in unexpected state' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'on_underlying_io_error':243)
b'Error in decode_ssl_received_bytes.' (b'/project/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/adapters/tlsio_openssl.c':b'on_underlying_io_bytes_received':935)
Processed batch in 217.54 seconds
Received events from partition: 3

Also here is a code snippet :

async def on_event_batch(partition_context, event_batch):
    logger.info("Received events from partition: {}".format(partition_context.partition_id))
    start_time = time.time()  # Record the start time of processing the batch
    await batch_process_events(partition_context,event_batch)
    await partition_context.update_checkpoint()
    # Calculate and log the processing time for the batch
    processing_time = time.time() - start_time
    logger.info(f"Processed batch in {processing_time:.2f} seconds")

async def receive_batch():
    checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR,
                                                                  STORAGE_CONTAINER_NAME)

    client = EventHubConsumerClient.from_connection_string(CONNECTION_STR,
                                                           CONSUMER_GROUP,
                                                           eventhub_name=EVENTHUB_NAME,
                                                           checkpoint_store=checkpoint_store,
                                                           load_balancing_strategy="greedy",
                                                           uamqp_transport=True,
                                                           logging_enable=True) 
    async with client:
        try:
            await client.receive_batch(
                on_event_batch=on_event_batch,
                on_error=on_error,
                on_partition_close=on_partition_close,
                prefetch=50000,
                max_batch_size=25000, 
                starting_position="@latest"
            )
        except KeyboardInterrupt:
            logger.info("Received KeyboardInterrupt. Exiting...")
            pass
        finally:
            snowflake_connector.close()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(receive_batch())
kashifkhan commented 4 months ago

Thank you for the information @Benniah

github-actions[bot] commented 4 months ago

Hi @Benniah. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

Benniah commented 4 months ago

Hi @kashifkhan ,

Yes , regarding the pauses , there are occasional data bursts which lead to extra processing time exceeding that 240000 milliseconds limit .

I reduced the prefetch to 10000 and max_batch_size to 5000 but still getting those errors from above . We have quite a huge amount of data coming through so not sure if lowering the batch size will be good idea as it might increase latency

I have tested this code in parallel on our DEV eventhub which produces less messages and we don't experience that there . The data stream remains persistent for all partitions even with AMQP transport protocol ( the only issue i have there is that we get less events even though a higher prefetch and batch size is specified )

FYI , on DEV we have just 300-400 messages coming in whereas on PRD 30K-80K messages

kashifkhan commented 4 months ago

@Benniah Thanks for testing out if reducing the batch size would help. One thing about prefetch and max batch size is that they dont guarantee exactly that amount will be received. The clients are designed to return back asap with as many messages as possible, so they are not waiting until that max limit is hit.

I was trying to repro this on my side and I noticed something missing from your logs Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-021163af-6223-4422-9bd7-673e5ab3df53-partition0' - which is our keep alive functionality.

I changed my code to time.sleep(240) instead of await asyncio.sleep(240) and then was able to get the disconnection message from the service. The client was able to re-connect and resume receiving.

Is there anything during processing or any other parts that could be blocking the event loop hence stopping the background keep alive from running?

github-actions[bot] commented 4 months ago

Hi @Benniah. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

Benniah commented 4 months ago

Hi @kashifkhan , thanks for the info . We do get the Keeping connection alive LOGS as well . Sorry for not including that section of the logs .

One thing i have noticed is that , after a while , the client is able to reconnect to the partition but by that time the lag is pretty huge since it starts from the last checkpoint. Also this issue then moves to another partition.

Connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0' state changed from <ConnectionState.HDR_EXCH: 3> to <ConnectionState.OPEN_SENT: 7>
Connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0' state changed from <ConnectionState.OPEN_SENT: 7> to <ConnectionState.CLOSE_RCVD: 10>
b'AMQP management instance not open' (b'/project/src/vendor/azure-uamqp-c/src/amqp_management.c':b'amqp_management_close':1061)
CBS for connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0' completed opening with status: 2
Connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0' state changed from <ConnectionState.CLOSE_RCVD: 10> to <ConnectionState.END: 13>
Received Connection close event: b'amqp:internal-error'
Connection: b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'
Description: b"An AMQP error occurred (condition='amqp:internal-error')."
Details: None
b'SSL channel closed in decode_ssl_received_bytes.' (b'/project/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/adapters/tlsio_openssl.c':b'decode_ssl_received_bytes':886)
b'Error callback received in unexpected state' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'on_underlying_io_error':243)
b'Error in decode_ssl_received_bytes.' (b'/project/src/vendor/azure-uamqp-c/deps/azure-c-shared-utility/adapters/tlsio_openssl.c':b'on_underlying_io_bytes_received':935)
Processed batch in 16.62 seconds
CBS session pending b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Closing exclusive connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Received events from partition: 4
Loaded 1515 records into Snowflake
Shutting down connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Shutting down CBS session on connection: b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Token put complete with result: 2, status: 0, description: b'CBS Session closed.', connection: b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'
Auth closed, destroying session on connection: b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Finished shutting down CBS session on connection: b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
b'send called while not open' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_send_async':1181)
b'Cannot send encoded bytes' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'on_bytes_encoded':268)
b'saslclientio_close called while not open' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_close_async':1130)
b'xio_close failed' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'on_bytes_encoded':272)
Connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0' state changed from <ConnectionState.END: 13> to <ConnectionState.END: 13>
Connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0' state changed from <ConnectionState.END: 13> to <ConnectionState.END: 13>
b'saslclientio_close called while not open' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_close_async':1130)
b'xio_close failed' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'connection_close':1437)
Connection shutdown complete b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Processed batch in 70.43 seconds
Connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0' state changed from <ConnectionState.START: 0> to <ConnectionState.START: 0>
Connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0' state changed from <ConnectionState.START: 0> to <ConnectionState.START: 0>
Processed batch in 4.16 seconds
Processed batch in 41.75 seconds
Processed batch in 140.66 seconds
Processed batch in 15.61 seconds
Received events from partition: 2
Loaded 3240 records into Snowflake
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-a82f465f-8a9e-4218-8649-a477d7a7cd2b-partition5'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-6f4e191a-6254-4510-9f5e-2e6beeb1e10b-partition3'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-8125bfc3-de7e-437f-8fcd-d4f529a327fe-partition4'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-38e19bdc-2037-483e-8bcb-11205e65d3bc-partition1'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-d7dab39c-3bf2-49c9-81d7-83c627746393-partition2'
Received events from partition: 1
Loaded 1248 records into Snowflake
Received events from partition: 4
Loaded 1149 records into Snowflake
Received events from partition: 5
Loaded 5000 records into Snowflake
Received events from partition: 3
Loaded 2526 records into Snowflake
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'
Processed batch in 44.14 seconds
Received events from partition: 1
Loaded 5000 records into Snowflake
Processed batch in 54.48 seconds
Processed batch in 88.64 seconds
Received events from partition: 2
Loaded 5000 records into Snowflake
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-d7dab39c-3bf2-49c9-81d7-83c627746393-partition2'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-38e19bdc-2037-483e-8bcb-11205e65d3bc-partition1'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-a82f465f-8a9e-4218-8649-a477d7a7cd2b-partition5'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-6f4e191a-6254-4510-9f5e-2e6beeb1e10b-partition3'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-8125bfc3-de7e-437f-8fcd-d4f529a327fe-partition4'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'
Processed batch in 52.74 seconds
Received events from partition: 5
Loaded 5000 records into Snowflake
Processed batch in 74.01 seconds
CBS authentication timeout on connection: b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Processed batch in 34.56 seconds
Received events from partition: 2
Loaded 719 records into Snowflake
Processed batch in 3.45 seconds
Received events from partition: 4
Loaded 2022 records into Snowflake
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-d7dab39c-3bf2-49c9-81d7-83c627746393-partition2'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-38e19bdc-2037-483e-8bcb-11205e65d3bc-partition1'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-a82f465f-8a9e-4218-8649-a477d7a7cd2b-partition5'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-6f4e191a-6254-4510-9f5e-2e6beeb1e10b-partition3'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-8125bfc3-de7e-437f-8fcd-d4f529a327fe-partition4'
Received events from partition: 3
Loaded 5000 records into Snowflake
Processed batch in 95.43 seconds
Received events from partition: 1
Loaded 3752 records into Snowflake
CBS session pending b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Closing exclusive connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Received events from partition: 2
Loaded 5000 records into Snowflake
Processed batch in 28.21 seconds
Shutting down connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Shutting down CBS session on connection: b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Token put complete with result: 2, status: 0, description: b'CBS Session closed.', connection: b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'
CBS for connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0' completed opening with status: 3
Auth closed, destroying session on connection: b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Finished shutting down CBS session on connection: b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
b'send called while not open' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_send_async':1181)
b'Cannot send encoded bytes' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'on_bytes_encoded':268)
Connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0' state changed from <ConnectionState.START: 0> to <ConnectionState.END: 13>
Connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0' state changed from <ConnectionState.END: 13> to <ConnectionState.END: 13>
b'saslclientio_close called while not open' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_close_async':1130)
b'xio_close failed' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'connection_close':1437)
Connection shutdown complete b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0' state changed from <ConnectionState.START: 0> to <ConnectionState.START: 0>
Connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0' state changed from <ConnectionState.START: 0> to <ConnectionState.START: 0>
Received events from partition: 1
Loaded 2117 records into Snowflake
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-38e19bdc-2037-483e-8bcb-11205e65d3bc-partition1'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-a82f465f-8a9e-4218-8649-a477d7a7cd2b-partition5'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-6f4e191a-6254-4510-9f5e-2e6beeb1e10b-partition3'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-8125bfc3-de7e-437f-8fcd-d4f529a327fe-partition4'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-d7dab39c-3bf2-49c9-81d7-83c627746393-partition2'
Processed batch in 7.21 seconds
Received events from partition: 1
Loaded 5000 records into Snowflake
Processed batch in 86.23 seconds
Received events from partition: 3
Loaded 5000 records into Snowflake
Processed batch in 112.84 seconds
Received events from partition: 4
Loaded 5000 records into Snowflake
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'
Processed batch in 103.67 seconds
Received events from partition: 2
Loaded 5000 records into Snowflake
Processed batch in 70.89 seconds
Received events from partition: 3
Loaded 5000 records into Snowflake
CBS authentication timeout on connection: b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-38e19bdc-2037-483e-8bcb-11205e65d3bc-partition1'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-a82f465f-8a9e-4218-8649-a477d7a7cd2b-partition5'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-6f4e191a-6254-4510-9f5e-2e6beeb1e10b-partition3'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-8125bfc3-de7e-437f-8fcd-d4f529a327fe-partition4'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-d7dab39c-3bf2-49c9-81d7-83c627746393-partition2'
Processed batch in 244.22 seconds
Processed batch in 141.22 seconds
Received events from partition: 1
Loaded 2883 records into Snowflake
Processed batch in 118.03 seconds
Received events from partition: 4
Loaded 314 records into Snowflake
Processed batch in 86.78 seconds
Processed batch in 55.24 seconds
Received events from partition: 3
Loaded 2474 records into Snowflake
Processed batch in 14.89 seconds
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-38e19bdc-2037-483e-8bcb-11205e65d3bc-partition1'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-a82f465f-8a9e-4218-8649-a477d7a7cd2b-partition5'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-6f4e191a-6254-4510-9f5e-2e6beeb1e10b-partition3'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-8125bfc3-de7e-437f-8fcd-d4f529a327fe-partition4'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-d7dab39c-3bf2-49c9-81d7-83c627746393-partition2'
CBS session pending b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Closing exclusive connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Received events from partition: 5
Loaded 5000 records into Snowflake
Shutting down connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Shutting down CBS session on connection: b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Token put complete with result: 2, status: 0, description: b'CBS Session closed.', connection: b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'
CBS for connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0' completed opening with status: 3
Auth closed, destroying session on connection: b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Finished shutting down CBS session on connection: b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
b'send called while not open' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_send_async':1181)
b'Cannot send encoded bytes' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'on_bytes_encoded':268)
Connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0' state changed from <ConnectionState.START: 0> to <ConnectionState.END: 13>
Connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0' state changed from <ConnectionState.END: 13> to <ConnectionState.END: 13>
b'saslclientio_close called while not open' (b'/project/src/vendor/azure-uamqp-c/src/saslclientio.c':b'saslclientio_close_async':1130)
b'xio_close failed' (b'/project/src/vendor/azure-uamqp-c/src/connection.c':b'connection_close':1437)
Connection shutdown complete b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'.
Processed batch in 35.38 seconds
Connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0' state changed from <ConnectionState.START: 0> to <ConnectionState.START: 0>
Connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0' state changed from <ConnectionState.START: 0> to <ConnectionState.START: 0>
Received events from partition: 2
Loaded 5000 records into Snowflake
Received events from partition: 4
Loaded 5000 records into Snowflake
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-38e19bdc-2037-483e-8bcb-11205e65d3bc-partition1'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-6f4e191a-6254-4510-9f5e-2e6beeb1e10b-partition3'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-d7dab39c-3bf2-49c9-81d7-83c627746393-partition2'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-8125bfc3-de7e-437f-8fcd-d4f529a327fe-partition4'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-a82f465f-8a9e-4218-8649-a477d7a7cd2b-partition5'
Received events from partition: 3
Loaded 1472 records into Snowflake
Processed batch in 52.12 seconds
Received events from partition: 5
Loaded 5000 records into Snowflake
Processed batch in 22.73 seconds
Received events from partition: 4
Loaded 5000 records into Snowflake
Processed batch in 31.43 seconds
Received events from partition: 3
Loaded 5000 records into Snowflake
Processed batch in 119.54 seconds
Processed batch in 61.72 seconds
Received events from partition: 2
Loaded 5000 records into Snowflake
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-38e19bdc-2037-483e-8bcb-11205e65d3bc-partition1'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-d7dab39c-3bf2-49c9-81d7-83c627746393-partition2'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-8125bfc3-de7e-437f-8fcd-d4f529a327fe-partition4'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-a82f465f-8a9e-4218-8649-a477d7a7cd2b-partition5'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-6f4e191a-6254-4510-9f5e-2e6beeb1e10b-partition3'
Processed batch in 37.87 seconds
Processed batch in 14.62 seconds
Received events from partition: 3
Loaded 3528 records into Snowflake
Processed batch in 12.29 seconds
Processed batch in 6.56 seconds
Connection b'EHReceiver-2de305c6-0a88-403c-998b-3ee65f647cd4-partition0' state changed from <ConnectionState.START: 0> to <ConnectionState.HDR_SENT: 2>
Received events from partition: 1
Loaded 5000 records into Snowflake
Received events from partition: 3
Loaded 3083 records into Snowflake
Received events from partition: 2
Loaded 5000 records into Snowflake
Received events from partition: 5

I have also been running another test in parallel with a prefetch = 3000 and max_batch_size = 1500 . This seems to be working with no errors at all . Connectivity to all partitions has remained stable as well . But this won't work for our use case as the small batch size increases latency / Lag

received events from partition: 2
Loaded 1500 records into Snowflake
Processed batch in 18.11 seconds
Received events from partition: 0
Loaded 1500 records into Snowflake
Processed batch in 39.41 seconds
Received events from partition: 1
Loaded 1500 records into Snowflake
Processed batch in 19.78 seconds
Processed batch in 17.20 seconds
Processed batch in 13.90 seconds
Processed batch in 10.64 seconds
Processed batch in 7.58 seconds
Received events from partition: 4
Loaded 1500 records into Snowflake
Received events from partition: 3
Loaded 1500 records into Snowflake
Received events from partition: 5
Loaded 1500 records into Snowflake
Received events from partition: 2
Loaded 1500 records into Snowflake
Received events from partition: 0
Loaded 1500 records into Snowflake
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-0bb3beff-9f98-46de-b93b-7c8e15447e56-partition1'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-a75e1f89-0d48-42f8-9a0c-92ad6e765613-partition2'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-e881fce8-6e17-4702-a912-0600e124da36-partition3'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-286249be-deeb-41ca-bf29-b6193fea73f4-partition5'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-41f4b598-bcb5-407b-acf9-61b068904610-partition0'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-a31881e7-90c0-402b-90e4-fc5093baadb4-partition4'
Processed batch in 20.82 seconds
Received events from partition: 4
Loaded 1500 records into Snowflake
Received events from partition: 1
Loaded 1038 records into Snowflake
Processed batch in 25.86 seconds
Received events from partition: 3
Loaded 1500 records into Snowflake
Processed batch in 31.25 seconds
Received events from partition: 5
Loaded 1500 records into Snowflake
Processed batch in 28.00 seconds
Processed batch in 17.94 seconds
Processed batch in 9.98 seconds
Received events from partition: 4
Loaded 418 records into Snowflake
Received events from partition: 5
Loaded 1500 records into Snowflake
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-0bb3beff-9f98-46de-b93b-7c8e15447e56-partition1'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-a75e1f89-0d48-42f8-9a0c-92ad6e765613-partition2'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-e881fce8-6e17-4702-a912-0600e124da36-partition3'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-286249be-deeb-41ca-bf29-b6193fea73f4-partition5'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-41f4b598-bcb5-407b-acf9-61b068904610-partition0'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-a31881e7-90c0-402b-90e4-fc5093baadb4-partition4'
Processed batch in 49.73 seconds
Received events from partition: 2
Loaded 1500 records into Snowflake
Processed batch in 21.21 seconds
Processed batch in 45.99 seconds
Received events from partition: 1
Loaded 1500 records into Snowflake
Processed batch in 28.50 seconds
Received events from partition: 5
Loaded 619 records into Snowflake
Processed batch in 67.38 seconds
Received events from partition: 0
Loaded 1500 records into Snowflake
Processed batch in 29.18 seconds
Processed batch in 20.47 seconds
Received events from partition: 1
Loaded 462 records into Snowflake
Processed batch in 14.84 seconds
Received events from partition: 5
Loaded 881 records into Snowflake
Processed batch in 13.52 seconds
Received events from partition: 4
Loaded 1500 records into Snowflake
Received events from partition: 3
Loaded 1291 records into Snowflake
Processed batch in 23.06 seconds
Processed batch in 18.77 seconds
Processed batch in 14.74 seconds
Received events from partition: 4
Loaded 579 records into Snowflake
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-0bb3beff-9f98-46de-b93b-7c8e15447e56-partition1'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-a75e1f89-0d48-42f8-9a0c-92ad6e765613-partition2'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-e881fce8-6e17-4702-a912-0600e124da36-partition3'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-286249be-deeb-41ca-bf29-b6193fea73f4-partition5'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-41f4b598-bcb5-407b-acf9-61b068904610-partition0'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-a31881e7-90c0-402b-90e4-fc5093baadb4-partition4'
Received events from partition: 2
Loaded 1500 records into Snowflake
Processed batch in 11.41 seconds
Received events from partition: 4
Loaded 503 records into Snowflake
Received events from partition: 0
Loaded 1500 records into Snowflake
Processed batch in 20.22 seconds
Received events from partition: 2
Loaded 1500 records into Snowflake
Processed batch in 22.92 seconds
Received events from partition: 1
Loaded 426 records into Snowflake
Received events from partition: 5
Loaded 1500 records into Snowflake
Processed batch in 23.44 seconds
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-0bb3beff-9f98-46de-b93b-7c8e15447e56-partition1'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-e881fce8-6e17-4702-a912-0600e124da36-partition3'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-a31881e7-90c0-402b-90e4-fc5093baadb4-partition4'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-286249be-deeb-41ca-bf29-b6193fea73f4-partition5'
Processed batch in 12.04 seconds
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-a75e1f89-0d48-42f8-9a0c-92ad6e765613-partition2'
Keeping 'ReceiveClientAsync' connection alive. b'EHReceiver-41f4b598-bcb5-407b-acf9-61b068904610-partition0'
kashifkhan commented 4 months ago

Hi @Benniah , current how many receivers are set up ? Maybe having one for each partition could alleviate things

Largely from the logs ( given keep alive is going ) the service doesn't get any communication from the clients and disconnects them. It should recover and continue

Are there any network events showing up on your end ?

github-actions[bot] commented 4 months ago

Hi @Benniah. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

github-actions[bot] commented 4 months ago

Hi @Benniah, we're sending this friendly reminder because we haven't heard back from you in 7 days. We need more information about this issue to help address it. Please be sure to give us your input. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you!