Azure / azure-sdk-for-python

This repository is for active development of the Azure SDK for Python. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/python/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-python.
MIT License
4.63k stars 2.84k forks source link

[ServiceBus] pyamqp - Error scheduling messages with OpenTelemetry/tracing #29266

Closed swathipil closed 8 months ago

swathipil commented 1 year ago

Error with pyamqp - azure-servicebus==7.9.0b1. Scheduling messages with tracing turned on, specifically OpenTelemetry, results in the following error/stack trace:

Traceback (most recent call last):
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\servicebus\azure-servicebus\samples\async_samples\tracing_async.py", line 136, in <module>
    asyncio.run(main(args))
  File "C:\Users\swathip\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\swathip\AppData\Local\Programs\Python\Python311\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\swathip\AppData\Local\Programs\Python\Python311\Lib\asyncio\base_events.py", line 650, in run_until_complete       
    return future.result()
           ^^^^^^^^^^^^^^^
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\servicebus\azure-servicebus\samples\async_samples\tracing_async.py", line 117, in main
    await sender(msg, delay=args.delay)
  File "C:\Users\swathip\Documents\forks\azure-sdk-for-python\sdk\servicebus\azure-servicebus\samples\async_samples\tracing_async.py", line 107, in sender
    await sender.schedule_messages(message, scheduled_time_utc)   
  File "C:\Users\swathip\Documents\env311\Lib\site-packages\azure\servicebus\aio\_servicebus_sender_async.py", line 286, in schedule_messages
    return await self._mgmt_request_response_with_retry(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\swathip\Documents\env311\Lib\site-packages\azure\servicebus\aio\_base_handler_async.py", line 352, in _mgmt_request_response_with_retry
    return await self._do_retryable_operation(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\swathip\Documents\env311\Lib\site-packages\azure\servicebus\aio\_base_handler_async.py", line 259, in _do_retryable_operation
    raise last_exception
  File "C:\Users\swathip\Documents\env311\Lib\site-packages\azure\servicebus\aio\_base_handler_async.py", line 245, in _do_retryable_operation
    return await operation(**kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\swathip\Documents\env311\Lib\site-packages\azure\servicebus\aio\_base_handler_async.py", line 342, in _mgmt_request_response
    return callback(status, response, description)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\swathip\Documents\env311\Lib\site-packages\azure\servicebus\_common\mgmt_handlers.py", line 145, in schedule_op    
    _handle_amqp_mgmt_error(
  File "C:\Users\swathip\Documents\env311\Lib\site-packages\azure\servicebus\exceptions.py", line 93, in _handle_amqp_mgmt_error    
    raise _handle_amqp_exception_with_condition(
azure.servicebus.exceptions.ServiceBusError: Scheduling messages failed. b'The service was unable to process the request; please retry the operation. For more information on exception types and proper exception handling, please refer to http://go.microsoft.com/fwlink/?LinkId=761101 Reference:d0f8d5fa-6644-47ad-a514-b7feb8f9af18, TrackingId:acd597fc-265e-4a0d-995e-2645aebb5bbc_B14, SystemTracker:sb-te9c6b9643d59d62d:Queue:sb-te9c6b9643d59d62d-queue, Timestamp:2023-03-10T00:35:28'. Error condition: amqp:internal-error. Status Code: 500.

To reproduce:

from contextlib import asynccontextmanager
import datetime
import logging.config
import os

from azure.core.tracing.ext.opentelemetry_span import OpenTelemetrySpan
from azure.core.settings import settings
from azure.servicebus import ServiceBusMessage
from azure.servicebus.aio import ServiceBusClient

# otel libs
from opentelemetry.sdk import resources  # type: ignore
from opentelemetry.sdk.trace import TracerProvider  # type: ignore
from opentelemetry import trace  # type: ignore

# unsued but can export to a collector
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter  # type: ignore
from opentelemetry.sdk.trace.export import BatchSpanProcessor  # type: ignore

CONNECTION_STR = os.environ['SERVICEBUS_CONNECTION_STR']
QUEUE_NAME = os.environ["SERVICEBUS_QUEUE_NAME"]
tracer: trace.Tracer = trace.get_tracer(__name__)

logging.config.dictConfig({
    "version": 1,
    "disable_existing_loggers": False,
    "handlers": {
        "default": {
            "level": "WARN",
            "class": "logging.StreamHandler",
            "stream": "ext://sys.stdout"
        }
    },
    "loggers": {"": {"handlers": ["default"], "level": "INFO"}},
    "azure": {"handlers": ["default"], "level": "WARN"},
    "uamqp": {"handlers": ["default"], "level": "WARN"},
})

def setup_otel():
    # Declare OpenTelemetry as enabled tracing plugin for Azure SDKs
    settings.tracing_implementation = OpenTelemetrySpan

    # Service name is required for most backends
    resource = resources.Resource(
        attributes={
            resources.DEPLOYMENT_ENVIRONMENT: "local",
            resources.SERVICE_NAME: "service-bus-otel-test",
            resources.SERVICE_VERSION: "0.0.1",
        }
    )

    # Set up traces provider
    provider = TracerProvider(resource=resource)

    # Can export to a collector (unnecessary for this demo)
    # processor = BatchSpanProcessor(
    #     OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True)
    # )
    # provider.add_span_processor(processor)
    trace.set_tracer_provider(provider)

@asynccontextmanager
async def start_span(
    tracer: trace.Tracer,
    name: str,
):
    """
    Adds Parent Span information from `diagnostic_id` if otel enabled.
    """
    with tracer.start_as_current_span(
        name, kind=trace.SpanKind.CONSUMER
    ) as current_span:
        yield current_span

async def on_receive(msg: str):
    async with start_span(tracer, "service-bus-otel-test") as span:
        debug = f"MSG RECEIVED [{msg}] at {datetime.datetime.utcnow()}"
        print(debug)

async def receiver():
    servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)
    async with servicebus_client:
        receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
        async with receiver:
            async for msg in receiver:
                await on_receive(msg)
                await receiver.complete_message(msg)

async def sender(msg: str, delay: int):
    """
        msg: message to send
        delay: seconds to delay delivery
    """
    message = ServiceBusMessage(msg)
    now = datetime.datetime.now(tz=datetime.timezone.utc)
    scheduled_time_utc = now + datetime.timedelta(seconds=delay)
    servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)
    async with servicebus_client:
        sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
        async with sender:
            await sender.schedule_messages(message, scheduled_time_utc)

async def main(args):
    # works after commenting out setup_otel()
    setup_otel()
    if args.send:
        msg = args.send
        assert bool(msg), "Message is not empty"
        debug = f"SENDING MESSAGE [{msg}] WITH DELAY {args.delay} at {datetime.datetime.utcnow()}"
        print(debug)
        await sender(msg, delay=args.delay)
    else:
        print("STARTING RECEIVER")
        await receiver()

if __name__ == "__main__":
    # TO SEND:
    # $ python service-bus-otel-test.py -s test20 -d 20
    # TO RECEIVE:
    # $ python service-bus-otel-test.py -r
    import argparse
    import asyncio

    parser = argparse.ArgumentParser("Azure Queues Test")
    parser.add_argument("-s", "--send", type=str, help="Send value into the queue")
    parser.add_argument("-d", "--delay", type=int, help="Message delay", default=0)
    parser.add_argument("-r", "--receiver", action="store_true", help="run receiver")
    args = parser.parse_args()
    asyncio.run(main(args))
swathipil commented 8 months ago

no longer reproducing