Azure / azure-sdk-for-python

This repository is for active development of the Azure SDK for Python. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/python/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-python.
MIT License
4.64k stars 2.84k forks source link

[EventHubs] SDK layer exception parity for uamqp/pyamqp #26229

Closed swathipil closed 1 year ago

swathipil commented 2 years ago

Test cases:

CONNECTION_STR = os.environ["EVENT_HUB_CONN_STR"] EVENTHUB_NAME = os.environ['EVENT_HUB_NAME']

import logging

uamqp_logger = logging.getLogger('uamqp')

uamqp_logger.setLevel(logging.DEBUG)

Configure a console output

handler = logging.StreamHandler(stream=sys.stdout)

uamqp_logger.addHandler(handler)

def on_event(partition_context, event):

Put your code here.

# If the operation is i/o intensive, multi-thread will have better performance.
print("Received event from partition: {}.".format(partition_context.partition_id))

def on_partition_initialize(partition_context):

Put your code here.

print("Partition: {} has been initialized.".format(partition_context.partition_id))

def on_partition_close(partition_context, reason):

Put your code here.

print("Partition: {} has been closed, reason for closing: {}.".format(
    partition_context.partition_id,
    reason
))
consumer_client.close()

def on_error(partition_context, error):

Put your code here. partition_context can be None in the on_error callback.

if partition_context:
    print("An exception: {} occurred during receiving from Partition: {}.".format(
        partition_context.partition_id,
        error
    ))
else:
    print("An exception: {} occurred during the load balance process.".format(error))

def update_entity(interval): # forces service link detach subscription_id = os.environ['SUBSCRIPTION_ID'] live_eventhub = { "resource_group" : 'swathip-test', "namespace": 'swathip-test-eventhubs', "event_hub": "eventhub-test" } from azure.mgmt.eventhub import EventHubManagementClient from azure.identity import EnvironmentCredential import threading mgmt_client = EventHubManagementClient(EnvironmentCredential(), subscription_id) def _schedule_update_properties(): eventhub = mgmt_client.event_hubs.get( live_eventhub["resource_group"], live_eventhub["namespace"], live_eventhub["event_hub"] ) properties = eventhub.as_dict() if properties["message_retention_in_days"] == 1: properties["message_retention_in_days"] = 2 else: properties["message_retention_in_days"] = 1 mgmt_client.event_hubs.create_or_update( live_eventhub["resource_group"], live_eventhub["namespace"], live_eventhub["event_hub"], properties ) print('updating') t = threading.Timer(interval, _schedule_update_properties) t.start()

if name == 'main': update_entity(4) update_entity(6) update_entity(10) consumer_client = EventHubConsumerClient.from_connection_string( conn_str=CONNECTION_STR, consumer_group='$Default', eventhub_name=EVENTHUB_NAME, uamqp_transport=True, retry_total=0, logging_enable=True )

#import time
#from azure.eventhub import EventHubSharedKeyCredential
#from azure.eventhub._client_base import EventHubSASTokenCredential
#from azure.identity import EnvironmentCredential
#credential = EventHubSharedKeyCredential(os.environ["EVENT_HUB_SAS_POLICY"], os.environ['EVENT_HUB_SAS_KEY'])
#auth_uri = "sb://{}/{}".format(os.environ['EVENT_HUB_HOSTNAME'], os.environ['EVENT_HUB_NAME'])
#token = credential.get_token(auth_uri).token
#consumer_client = EventHubConsumerClient(
#    fully_qualified_namespace=os.environ['EVENT_HUB_HOSTNAME'],
#    eventhub_name=os.environ['EVENT_HUB_NAME'],
#    consumer_group='$Default',
#    credential=EventHubSASTokenCredential(token, time.time() + 8),
#    uamqp_transport=True,
#    retry_total=1,
#    logging_enable=True
#)

try:
    with consumer_client:
        consumer_client.receive(
            on_event=on_event,
            on_partition_initialize=on_partition_initialize,
            on_partition_close=on_partition_close,
            on_error=on_error,
            starting_position="-1",  # "-1" is from the beginning of the partition.
        )
except KeyboardInterrupt:
    print('Stopped receiving.')


 - Note: Passing in an expired token credential using a connection string does not result in any errors raised for either uamqp or pyamqp. Issue #27079 created for this.
swathipil commented 2 years ago

x = not implemented y = passed n = not passed

API to test kwargs/exception test file test case passed?
BUFFERED_MODE = TRUE
EHProducerClient buffer_concurrency=2 test_buffered_producer.py + async test_producer_client_constructor y
EHProducerClient on_success=callback, on_error=callback test_buffered_producer.py, test_buffered_producer.py test_producer_client_constructor y
EHProducerClient on_success=callback w/ invalid # params, on_error=callback w/ invalid # params passes and logs error test_buffered_producer.py async test_producer_client_constructor y
EHProducerClient max_buffer_length=10/100 test_buffered_producer.py + _async.py test_long_wait_small_buffer/with_timing_config y
EHProducerClient, max_wait_time=0/10/1000 test_buffered_producer.py + _async.py test_send_with_timing_config, etc. y
EHProducerClient, retry_total=3, retry_backoff_factor=0.01 test_buffered_producer.py + _async.py test_long_wait_small_buffer y
EHProducerClient, auth_timeout=3 test_buffered_producer.py + _async.py test_long_wait_small_buffer y
EHProducerClient.get_buffered_event_count partition_id=pid test_buffered_producer.py + async test_basic_send_single_events_round_robin y
EHProducerClient on_error=on_error test_bp + async test_producer_client_constructor y
send_event timeout = -1 raises OperationTimeoutError test_negative + async test_client_send_timeout y
send_batch timeout = -1 raises OperationTimeoutError test_negative + async test_client_send_timeout y
BUFFERED_MODE = FALSE
EHProducerClient fully_qualified_namespace = invalid test_negative.py + async test_client_invalid_credential y
EHProducerClient eventhub_name = invalid test_negative.py + async test_client_invalid_credential y
EHProducerClient credential = invalid, expired test_negative.py + async test_client_invalid_credential y
EHProducerClient buffered_mode=False, on_success=callback, on_error=callback test_send.py, test_send_async.py test_send_with_callback y
EHProducerClient logging_enable=True x x x
EHProducerClient auth_timeout = 3 test_negative + async test_client_secret_credential y
EHProducerClient user_agent="customized information" test_negative.py + async test_client_secret_credential y
EHProducerClient retry_total=0, retry_mode='exponential' + RetryMode.Exponential, retry_backoff_factor=0.02 test_negative.py + async test_client_invalid_credential y
EHProducerClient idle_timeout=10 test_reconnect.py, test_reconnect_async.py test_send_connection_idle_timeout y
EHProducerClient transport_type=Amqp (pass in), uamqp.AmqpOverWebsocket, AmqpOverWebsocket test_send/receive.py, test_send/receive_async.py test_send/receive_over_websocket, test_send_list, test_send_multiple_partition_with_app_prop y
EHProducerClient http_proxy={...} x x x
EHProducerClient custom_endpoint_address=valid x x x
EHProducerClient custom_endpoint_address=invalid raises AuthenticationError test_negative.py + async test_client_invalid_credential y
EHProducerClient connection_verify=valid test_client_creation.py + async test_custom_certificate y
EHProducerClient connection_verify=invalid raises AuthenticationError test_auth.py + async test_client_identity_credential y
EHProducerClient without send claim (link detach) raises ConnectionLostError local testing x y
EHProducerClient disable entity (vendor link detach) raises ConnectError local testing x y
from_connection_string invalid hostname/key/param test_negative + async test_create/send/receive_batch_invalid_hostname, test_send_batch_invalid_partition y
create_batch max_batch_size=too large/invalid partition test_negative + async test_send_to_invalid_partitions/test_send_too_large_message_async y
send_event timeout = -1 raises OperationTimeoutError test_negative + async test_client_send_timeout y
send_batch timeout = -1 raises OperationTimeoutError test_negative + async test_client_send_timeout y
swathipil commented 2 years ago

ConsumerClient:

credentials:

kwargs/exception test file test case passed?
invalid key_name, invalid access_key test_negative + async ... y