Azure / azure-sdk-for-python

This repository is for active development of the Azure SDK for Python. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/python/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-python.
MIT License
4.37k stars 2.71k forks source link

don't know how to set operation_Id, operation_ParentId and operation_Name in logger (using configure_azure_monitor from azure.monitor.opentelemetry) #34655

Open eeertel opened 4 months ago

eeertel commented 4 months ago

Describe the bug When logging from an Azure Function using configure_azure_monitor to create a logger, I don't know how to/cannot set the fields operation_Id, operation_ParentId and operation_Name that are displayed in AppInsights

To Reproduce Steps to reproduce the behavior:

  1. Create the logger
    conn_string  = os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"]
    configure_azure_monitor(
    connection_string=conn_string,
    logger_name=__name__,       
    )
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
  2. log something:
    logger.info(f">>> Worker {worker_index} started >>>")

Expected behavior I would like to be able to set (when creating the logger) the context information (that is passed to an Azure Function) that contains the operation_Id, operation_name, etc. when creating the logger

Screenshots AppInsightsTraces

Additional context A sample Azure Function is depicted below. Please note that since the Azure FX is using multiprocessing the original Azure FX logger cannot be passed to the multiprocessing.Process instances as parameter (it is not pickable) . Each instance needs to create its own logger. The goal is to have a uniform logging experience in AppInsights.

import azure.functions as func

import os
import time
import logging

from azure.monitor.opentelemetry import configure_azure_monitor

app = func.FunctionApp()

@app.schedule(schedule="0 15 3 * * *", arg_name="myTimer", run_on_startup=True,
              use_monitor=False) 
def test_multiprocessor(myTimer: func.TimerRequest, context: func.Context) -> None:

    #set warning level for all azure libraries
    logger = logging.getLogger('azure')
    logger.setLevel(logging.WARNING)

    if myTimer.past_due:
      logging.warn(f"test_multiprocessor timer is past due")

    start_time = time.time()
    logging.info('test_multiprocessor started')
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    test_processor = TestMultiprocessing(logger, context)
    test_processor.process_in_parallel()
    logging.info( f"test_multiprocessor finished processing in {int(time.time()-start_time)} seconds")

class TestMultiprocessing:

  _QUEUE_END_MARKER                         = "__END_OF_QUEUE__"
  _RET_CODE_KEY                             = "ret_code"

  def __init__(self, logger, context, **kwargs):

    self.logger = logger
    self.number_threads = 2

  def process_job(self, worker_index):

    conn_string  = os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"]
    configure_azure_monitor(
        connection_string=conn_string,
        logger_name=__name__,       
    )
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)

    logger.info(f">>> Worker {worker_index} started >>>")
    time.sleep(5)
    logger.info(f"<<< Worker {worker_index} finished <<<")

    for handler in logger.handlers:
      handler.flush()

    return

  def process_in_parallel(self):
    from multiprocessing import Process

    self.logger.info(f"Starting to process in parallel with: {self.number_threads} thread(s)")

    #prepare multiprocessing
    parallel_jobs_arr = []
    #prepare worker threads
    #func = process_site_inventory
    for i in range(self.number_threads):
      worker_process=Process(target=self.process_job,
        args=(
          i, 
        )
      )

      parallel_jobs_arr.append(worker_process)
      worker_process.start()

    #just to make sure
    for inventory_job in parallel_jobs_arr:
      inventory_job.join()

    self.logger.info(f"Finished all {self.number_threads} thread(s)")

    return 
kashifkhan commented 4 months ago

Thanks for the feedback @eeertel . We will investigate and get back to you asap.

lzchen commented 4 months ago

In a functions environment, you need to extract the current trace context from the context and set it as the current span context. See this example. Then put your logging.info() messages inside the with tracer.start_as_current_span... context manager. Your logs should then be correlated with your function app calls.

eeertel commented 4 months ago

thank you @lzchen - the tracer.start_as_current ... has indeed solved the problem with the operation id (see below: trace_span

but what I still miss is the operation_Name. Is there a possibility to have that also sent to AppInsights ?

The complete code that I used for this is here:

import azure.functions as func

import os
import time
import logging

from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry import trace
from opentelemetry.propagate import extract

app = func.FunctionApp()

@app.schedule(schedule="0 15 3 * * *", arg_name="myTimer", run_on_startup=True,
              use_monitor=False) 
def test_multiprocessor(myTimer: func.TimerRequest, context: func.Context) -> None:

    #set warning level for all azure libraries
    logger = logging.getLogger('azure')
    logger.setLevel(logging.WARNING)

    if myTimer.past_due:
      logging.warn(f"test_multiprocessor timer is past due")

    start_time = time.time()
    logging.info('test_multiprocessor started')
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    test_processor = TestMultiprocessing(logger, context)
    test_processor.process_in_parallel()
    logging.info( f"test_multiprocessor finished processing in {int(time.time()-start_time)} seconds")

class TestMultiprocessing:

  _QUEUE_END_MARKER                         = "__END_OF_QUEUE__"
  _RET_CODE_KEY                             = "ret_code"

  def __init__(self, logger, context, **kwargs):

    self.traceparent = context.trace_context.Traceparent
    self.tracestate = context.trace_context.Tracestate
    self.logger = logger
    self.number_threads = 2

  def process_job(self, worker_index, traceparent, tracestate):

    conn_string  = os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"]
    configure_azure_monitor(
        connection_string=conn_string,
        logger_name=__name__,       
    )
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)

    carrier = {
        "traceparent": traceparent,
        "tracestate": tracestate,
    }
    tracer = trace.get_tracer(__name__)

    # Start a span using the current context
    with tracer.start_as_current_span(
        "http_trigger_span",
        context=extract(carrier),
    ):    

      logger.info(f">>> Worker {worker_index} started >>>")
      time.sleep(5)
      logger.info(f"<<< Worker {worker_index} finished <<<")

      for handler in logger.handlers:
        handler.flush()

    return

  def process_in_parallel(self):
    from multiprocessing import Process

    self.logger.info(f"Starting to process in parallel with: {self.number_threads} thread(s)")

    #prepare multiprocessing
    parallel_jobs_arr = []
    #prepare worker threads
    #func = process_site_inventory
    for i in range(self.number_threads):
      worker_process=Process(target=self.process_job,
        args=(
          i, 
          self.traceparent,
          self.tracestate,
        )
      )

      parallel_jobs_arr.append(worker_process)
      worker_process.start()

    #just to make sure
    for inventory_job in parallel_jobs_arr:
      inventory_job.join()

    self.logger.info(f"Finished all {self.number_threads} thread(s)")

    return 
github-actions[bot] commented 4 months ago

Hi @eeertel. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text "/unresolve" to remove the "issue-addressed" label and continue the conversation.

eeertel commented 4 months ago

From my point of view the solution mentioned above does not solve the issue because of the following use case:

Without a valid operation_Name a trace in Application Insights that points to an execution failure cannot be "connected" to a certain function (which makes it impossible to define an alert rule)

An example for such a query alert rule is shown below:

traces
| where operation_Name == "test_multiprocessor"
| where severityLevel > 1
lzchen commented 4 months ago

@eeertel

I am currently investigating. Please ignore the automated message.

jeremydvoss commented 4 months ago

@eeertel Operation Name is mapped from the OpenTelemetry span.name. So, by default it will look something like "GET /some-uri" for a given GET request. If you want to override Operation Name or any other span attribute that is autopopulated, you can do so with a custom span processor. Here are some examples: https://learn.microsoft.com/en-us/azure/azure-monitor/app/opentelemetry-add-modify?tabs=python#add-a-custom-property-to-a-span

eeertel commented 4 months ago

@jeremydvoss unfortunately the example using the span processor does not work as expected with Application Insights. I can see in the debugger that the on_end function is invoked, but neither the custom dimension nor the span name have any effects on the data displayed in Application Insights.

I have exported the AppInisghts output and attached it here: app_insights_export.csv

The code used to generate this is shown below. My feeling is that the logging data is filtered by some entity along the way. If I look at context.traceparent that is extracted from the context - it is actually a strange combination 00---00 and context.tracestate is empty

If I use these values to set in the carrier (line 72 and 73) - somehow AppInsights displays the correct operation_id but a complete random operation_ParentId

Please let me know if you need any more details to reproduce this behavior.

Thank you in advance,

import azure.functions as func

import os
import time
import logging

from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry import trace
from opentelemetry.propagate import extract
from opentelemetry.sdk.trace import SpanProcessor

app = func.FunctionApp()

@app.schedule(schedule="0 15 3 * * *", arg_name="myTimer", run_on_startup=True,
              use_monitor=False) 
def test_multiprocessor(myTimer: func.TimerRequest, context: func.Context) -> None:

    #set warning level for all azure libraries
    logger = logging.getLogger('azure')
    logger.setLevel(logging.WARNING)

    if myTimer.past_due:
      logging.warn(f"test_multiprocessor timer is past due")

    start_time = time.time()
    logging.info('test_multiprocessor started')
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    test_processor = TestMultiprocessing(logger, context)
    test_processor.process_in_parallel()
    logging.info( f"test_multiprocessor finished processing in {int(time.time()-start_time)} seconds")

class SpanEnrichingProcessor(SpanProcessor):

  def __init__(self, function_name, **kwargs):
    self.function_name = function_name

  def on_end(self, span):
      # Set the function name
      span._name = self.function_name
      # test one custom dimension
      span._attributes["foo"] = "bar"
      # test setting the client ip address
      span._attributes["http.client_ip"] = "1.1.1.1"

class TestMultiprocessing:

  def __init__(self, logger, context, **kwargs):

    self.traceparent = context.trace_context.Traceparent
    self.tracestate = context.trace_context.Tracestate
    self.function_name = context.function_name
    self.logger = logger
    self.number_threads = 2
    self.logger.info(f"Initialized with traceparent: {self.traceparent} and tracestate: {self.tracestate} and function name: {self.function_name}")

  def process_job(self, worker_index):

    conn_string  = os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"]
    span_enrich_processor = SpanEnrichingProcessor(self.function_name)
    configure_azure_monitor(
        connection_string=conn_string,
        logger_name=self.function_name,
        # Configure the custom span processors to include span enrich processor.
        span_processors=[span_enrich_processor],          
    )
    logger = logging.getLogger(self.function_name)
    logger.setLevel(logging.DEBUG)

    carrier = {
        "traceparent": self.traceparent,
        "tracestate": self.tracestate,
    }
    tracer = trace.get_tracer(self.function_name)

    # Start a span using the current context
    with tracer.start_as_current_span(
        "timer_trigger_span",
        context=extract(carrier),
    ):    

      logger.info(f">>> Worker {worker_index} started >>>")
      time.sleep(5)
      logger.info(f"<<< Worker {worker_index} finished <<<")

      for handler in logger.handlers:
        handler.flush()

    return

  def process_in_parallel(self):
    from multiprocessing import Process

    self.logger.info(f"Starting to process in parallel with: {self.number_threads} thread(s)")

    #prepare multiprocessing
    parallel_jobs_arr = []
    #prepare worker threads
    #func = process_site_inventory
    for i in range(self.number_threads):
      worker_process=Process(target=self.process_job,
        args=(
          i, 
        )
      )

      parallel_jobs_arr.append(worker_process)
      worker_process.start()

    #just to make sure
    for inventory_job in parallel_jobs_arr:
      inventory_job.join()

    self.logger.info(f"Finished all {self.number_threads} thread(s)")

    return 
jeremydvoss commented 4 months ago

@eeertel I see. I misunderstood. I was only talking about changing Operation Name in OpenTelemetry Spans which become Application Insights "requests". I will investigate if it's possible to set Operation Name in log traces with the SDK

github-actions[bot] commented 3 months ago

Hi @eeertel, since you haven’t asked that we /unresolve the issue, we’ll close this out. If you believe further discussion is needed, please add a comment /unresolve to reopen the issue.

eeertel commented 3 months ago

/unresolve

@jeremydvoss did you manage to find out if it is possible to set Operation Name in log traces with the SDK ?

jeremydvoss commented 3 months ago

I've confirmed that log SDK does not currently map Operation Name from anything. So, I'm marking this as a feature request.

gkeuccsr commented 1 month ago

Hello, any progress on this?

jeremydvoss commented 1 month ago

There is no current plan to enable setting Operation Name for Application Insights "Trace Logs"

gkeuccsr commented 1 month ago

I ended up monkey-patching the code:

import azure.monitor.opentelemetry.exporter.export.logs._exporter as xxx_logs

def patch_34655(ai_operation_name: str) -> None:
    """
    Patching the AzureMonitorLogExporter, see https://github.com/Azure/azure-sdk-for-python/issues/34655
    """
    # pylint: disable=protected-access
    orig_convert_log_to_envelope = xxx_logs._convert_log_to_envelope

    # pylint: disable=import-outside-toplevel
    from azure.monitor.opentelemetry.exporter._generated.models import TelemetryItem
    from opentelemetry.sdk._logs import LogData

    def custom_convert_log_to_envelope(log_data: LogData) -> TelemetryItem:
        """Add operation_Name to the envelope"""
        envelope = orig_convert_log_to_envelope(log_data)
        envelope.tags[ContextTagKeys.AI_OPERATION_NAME] = ai_operation_name
        return envelope

    xxx_logs._convert_log_to_envelope = custom_convert_log_to_envelope

patch_34655(ai_operation_name="fubar")
claria commented 2 weeks ago

Thanks @gkeuccsr for providing the monkey patch woraround.

@jeremydvoss I would expect this to be a fundamental feature to be supported when using azure functions and would appreciate if this could be put on the roadmap.

sudharsan2020 commented 1 week ago

@gkeuccsr Thanks for the awesome workaround. Is there any way where we can plug this code directly into the snippet?

        exporter = AzureMonitorLogExporter(
            connection_string=os.environ["My__Azure__ApplicationInsights__ConnectionString"],
        )
        self.logger_provider.add_log_record_processor(BatchLogRecordProcessor(
            exporter,
            schedule_delay_millis=EXPORT_INTERVAL_IN_MS,
        ))

FYI: Raised a similar issue like this: https://github.com/open-telemetry/opentelemetry-python/issues/3635