open-telemetry / opentelemetry-dotnet

The OpenTelemetry .NET Client
https://opentelemetry.io
Apache License 2.0
3.22k stars 764 forks source link

Setting DefaultTextMapPropagator for trace context and baggage #4566

Closed alex-todorov-j2 closed 1 year ago

alex-todorov-j2 commented 1 year ago

Question

I have code based on this example and this example to propagate manually some custom data in the Baggage via a RabbitMQ message.

The producer app doesn't set any composite propagator. It simply uses the default:

private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;

And with that, while debugging the consumer, I can see both the trace context and baggage in the carrier (headers of the basic properties of the RabbitMQ channel). So propagation from the producer works. But if I use the same default, non composite propagator in the consumer, parentContext.Baggage is empty. So OK, I switched to the composite propagator:

//consumer app consumer class
//private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
private static readonly TextMapPropagator Propagator = new CompositeTextMapPropagator(new TextMapPropagator[]
{
    new TraceContextPropagator(),
    new BaggagePropagator(),
});  

And now this consumer propagator code extracts the Baggage data injected by the producer:

var parentContext = Propagator.Extract(default, ea.BasicProperties, ExtractTraceContextFromBasicProperties);
Baggage.Current = parentContext.Baggage; //Baggage has the data injected by the App1 propagator

My question: where do I set the default composite propagator in the consumer app - in the startup (below) or in consumer class (above)?

Because if I do it at app startup, the code above doesn't extract the baggage, just the trace context:

//consumer app startup class
private static TracerProvider SetupOpenTelemetry()
{
    return Sdk.CreateTracerProviderBuilder()
        .ConfigureServices(services =>
        {
            Sdk.SetDefaultTextMapPropagator(new CompositeTextMapPropagator(new TextMapPropagator[]
            {
                    new TraceContextPropagator(),
                    new BaggagePropagator(),
            }));
        })
        .AddHttpClientInstrumentation()
        .SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("App2"))
        .AddSource(nameof(Program))
        .AddJaegerExporter(opts =>
        {
            opts.AgentHost = _configuration["Jaeger:AgentHost"];
            opts.AgentPort = Convert.ToInt32(_configuration["Jaeger:AgentPort"]);
            opts.ExportProcessorType = ExportProcessorType.Simple;
        })
        .Build();
}

// Then in the consumer class:
private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
//private static readonly TextMapPropagator Propagator = new CompositeTextMapPropagator(new TextMapPropagator[]
//{
//  new TraceContextPropagator(),
//  new BaggagePropagator(),
//}); 

These are Net 6 apps and I use the latest OpenTelemetry. and Microsoft.Extensions. NuGet packages.

vishweshbankwar commented 1 year ago

@alex-todorov-j2 The default propagator set by SDK is composite propagator containing baggage and tracecontext propagators. https://github.com/open-telemetry/opentelemetry-dotnet/tree/main/docs/trace/customizing-the-sdk#context-propagation

You should be able to simply use that as shown here https://github.com/open-telemetry/opentelemetry-dotnet/blob/main/examples/MicroserviceExample/Utils/Messaging/MessageReceiver.cs.

If that does not work, please share a minimal repro.

alex-todorov-j2 commented 1 year ago

@vishweshbankwar Just Propagators.DefaultTextMapPropagator didn't work, I had to explicitly use the composite in the consumer app below.

Publisher web api (startup)

using System;
using App1.WebApi.Controllers;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;

namespace App1.WebApi
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        // This method gets called by the runtime. Use this method to add services to the container.
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();
            services.AddHttpClient();            

            services.AddOpenTelemetry()
                .WithTracing(builder => builder                    
                    //.ConfigureServices(services =>
                    //{
                    //    Sdk.SetDefaultTextMapPropagator(new CompositeTextMapPropagator(new TextMapPropagator[]
                    //    {
                    //        new TraceContextPropagator(),
                    //        new BaggagePropagator(),
                    //    }));
                    //})
                    .AddAspNetCoreInstrumentation()
                    .AddHttpClientInstrumentation()
                    .AddSource(nameof(CallApiController))
                    .AddSource(nameof(PublishMessageController))
                    .SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("App1"))                    
                    .AddJaegerExporter(opts =>
                    {
                        opts.AgentHost = Configuration["Jaeger:AgentHost"];
                        opts.AgentPort = Convert.ToInt32(Configuration["Jaeger:AgentPort"]);
                        opts.ExportProcessorType = ExportProcessorType.Simple;
                    }));
        }

        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapGet("/health", async context =>
                {
                    await context.Response.WriteAsync("Ok");
                });               
                endpoints.MapDefaultControllerRoute();
            });
        }
    }
}

Publisher web api (publisher)

using Library.OTel;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;

namespace App1.WebApi.Controllers
{
    [ApiController]
    [Route("publish-message")]
    public class PublishMessageController : ControllerBase
    {
        private static readonly ActivitySource TraceSource = new(nameof(PublishMessageController));
        private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;        

        private readonly ILogger<PublishMessageController> logger;
        private readonly IConfiguration config;

        public PublishMessageController(
            ILogger<PublishMessageController> logger,
            IConfiguration configuration)
        {
            this.logger = logger;
            this.config = configuration;
        }

        [HttpGet]
        public void Get()
        {
            try
            {
                string routingKeyName = config["RabbitMq:RoutingKey"];

                using (var activity = TraceSource.StartActivity($"RabbitMq {routingKeyName} publish", ActivityKind.Producer))
                {
                    var factory = new ConnectionFactory { HostName = config["RabbitMq:Host"] };
                    using (var connection = factory.CreateConnection())
                    using (var channel = connection.CreateModel())
                    {
                        var props = channel.CreateBasicProperties();

                        AddActivityToHeader(activity, props);

                        channel.QueueDeclare(queue: routingKeyName,
                            durable: false,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);

                        var body = Encoding.UTF8.GetBytes("I am app1");

                        logger.LogInformation("Publishing message to queue");

                        channel.BasicPublish(exchange: "",
                            routingKey: routingKeyName,
                            basicProperties: props,
                            body: body);
                    }
                }
            }
            catch (Exception e)
            {
                logger.LogError("Error trying to publish a message", e);
                throw;
            }
        }

        private void AddActivityToHeader(Activity activity, IBasicProperties props)
        {
            string refNumber = Guid.NewGuid().ToString();            
            Baggage.SetBaggage(AuditTagConstants.CorrelationId, refNumber);

            ActivityContext contextToInject = default;
            if (activity != null)
            {
                contextToInject = activity.Context;
            }
            else if (Activity.Current != null)
            {
                contextToInject = Activity.Current.Context;
            }

            Propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), props, InjectContextIntoHeader);

            activity?.SetTag("messaging.system", "rabbitmq");
            activity?.SetTag("messaging.destination_kind", "queue");
            activity?.SetTag("messaging.rabbitmq.queue", config["RabbitMq:RoutingKey"]);
            activity?.SetTag(AuditTagConstants.CorrelationId, refNumber);
        }

        private void InjectContextIntoHeader(IBasicProperties props, string key, string value)
        {
            try
            {
                props.Headers ??= new Dictionary<string, object>();
                props.Headers[key] = value;                
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "Failed to inject trace context.");
            }
        }        
    }
}

Consumer console app

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Library.OTel;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace App2.RabbitConsumer.Console
{
    public class Program
    {
        private static readonly ActivitySource TraceSource = new(nameof(Program));
        //private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator; //new TraceContextPropagator(); 
        private static readonly TextMapPropagator Propagator = new CompositeTextMapPropagator(new TextMapPropagator[]
        {
            new TraceContextPropagator(),
            new BaggagePropagator(),
        });

        private static IConfiguration config;
        private static ILogger<Program> logger;

        public static void Main()
        {
            try
            {

                SetupConfiguration();
                SetupLogger();
                using var openTelemetry = SetupOpenTelemetry();
                DoWork();

                System.Console.WriteLine(" Press [enter] to exit.");
                System.Console.ReadLine();

            }
            catch (Exception e)
            {
                System.Console.WriteLine(e);
                throw;
            }

        }

        public static void DoWork()
        {
            var factory = new ConnectionFactory() { HostName = config["RabbitMq:Host"], DispatchConsumersAsync = true };

            var rabbitMqConnection = factory.CreateConnection();
            var rabbitMqChannel = rabbitMqConnection.CreateModel();
            var httpClient = new HttpClient { BaseAddress = new Uri(config["App3UriEndpoint"]) };

            string queueName = config["RabbitMq:Queue"];

            rabbitMqChannel.QueueDeclare(queue: queueName,
                durable: false,
                exclusive: false,
                autoDelete: false,
                arguments: null);

            rabbitMqChannel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            var consumer = new AsyncEventingBasicConsumer(rabbitMqChannel);
            consumer.Received += async (model, ea) =>
            {
                await ProcessMessage(ea,
                    httpClient,
                    rabbitMqChannel);
            };

            rabbitMqChannel.BasicConsume(queue: queueName,
                autoAck: false,
                consumer: consumer);

        }

        private static async Task ProcessMessage(BasicDeliverEventArgs ea,
            HttpClient httpClient,
            IModel rabbitMqChannel)
        {
            try
            {
                var parentContext = Propagator.Extract(default, ea.BasicProperties, ExtractTraceContextFromBasicProperties);
                Baggage.Current = parentContext.Baggage;

                var activityName = $"RabbitMQ {ea.RoutingKey} receive"; //or use config["RabbitMq:Queue"]

                using (var activity = TraceSource.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext))
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);

                    AddActivityTags(activity);

                    logger.LogInformation("Message Received: " + message);

                    _ = await httpClient.PostAsync("/sql-to-event",
                        new StringContent(JsonSerializer.Serialize(message),
                            Encoding.UTF8,
                            "application/json"));

                    rabbitMqChannel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                }

            }
            catch (Exception ex)
            {
                logger.LogError($"There was an error processing the message: {ex} ");
            }
        }

        private static IEnumerable<string> ExtractTraceContextFromBasicProperties(IBasicProperties props, string key)
        {
            try
            {
                if (props.Headers.TryGetValue(key, out var value))
                {
                    var bytes = value as byte[];
                    return new[] { Encoding.UTF8.GetString(bytes) };
                }
            }
            catch (Exception ex)
            {
                logger.LogError($"Failed to extract trace context: {ex}");
            }

            return Enumerable.Empty<string>();
        }

        private static void AddActivityTags(Activity activity)
        {
            activity?.SetTag("messaging.system", "rabbitmq");
            activity?.SetTag("messaging.destination_kind", "queue");
            activity?.SetTag("messaging.rabbitmq.queue", "sample");

            activity?.SetTag(AuditTagConstants.CorrelationId, Baggage.GetBaggage(AuditTagConstants.CorrelationId));
        }

        private static void SetupConfiguration()
        {

            //setup config
            var configFiles = Directory
                .GetFiles(Path.Combine(Directory.GetCurrentDirectory()),
                    "appsettings.json").ToList();

            if (!configFiles.Any())
                throw new Exception("Cannot read config file");

            config = new ConfigurationBuilder()
                .SetBasePath(Directory.GetCurrentDirectory())
                .AddJsonFile(configFiles[0], true, false)
                .AddEnvironmentVariables()
                .Build();
        }

        private static void SetupLogger()
        {
            using var loggerFactory = LoggerFactory.Create(builder =>
            {
                builder
                    .AddFilter("Microsoft", LogLevel.Warning)
                    .AddFilter("System", LogLevel.Warning)
                    .AddFilter("LoggingConsoleApp.Program", LogLevel.Debug)
                    .AddConsole();
            });

            logger = loggerFactory.CreateLogger<Program>();
        }

        private static TracerProvider SetupOpenTelemetry()
        {
            return Sdk.CreateTracerProviderBuilder()
                //.ConfigureServices(services =>
                //{
                //    Sdk.SetDefaultTextMapPropagator(new CompositeTextMapPropagator(new TextMapPropagator[]
                //    {
                //            new TraceContextPropagator(),
                //            new BaggagePropagator(),
                //    }));
                //})
                .AddHttpClientInstrumentation()
                .SetResourceBuilder(ResourceBuilder.CreateDefault().AddService("App2"))
                .AddSource(nameof(Program))
                .AddJaegerExporter(opts =>
                {
                    opts.AgentHost = config["Jaeger:AgentHost"];
                    opts.AgentPort = Convert.ToInt32(config["Jaeger:AgentPort"]);
                    opts.ExportProcessorType = ExportProcessorType.Simple;
                })
                .Build();
        }
    }
}