Open g7ed6e opened 4 months ago
I just realized that the InstrumentedProducerBuilder derives from the Confluent ProducerBuilder and has all the methods I need. Please ignore my original comment.
@g7ed6e
I'd love to use this package. I am trying to use the InstrumentedProducerBuilder
in my project but I see no way to extend it with my additional ProducerBuilder
methods.
I could have my own ProducerBuilder
implementation and create the InstrumentedProducer
, but it is internal. Could you make both the InstrumentedProducer
and InstrumentedConsumer
public? I'd like to build the Confluent.Kafka.Producer
and Confluent.Kafka.Consumer
myself and then wrap them with the InstrumentedProducer
and InstrumentedConsumer
classes.
using Confluent.Kafka;
public static class ProducerBuilderExtensions
{
public static ProducerBuilder<TKey, TValue> UseDefaultErrorHandler<TKey, TValue>(this ProducerBuilder<TKey, TValue> builder, ILogger logger)
{
ArgumentNullException.ThrowIfNull(builder);
ArgumentNullException.ThrowIfNull(logger);
builder.SetErrorHandler((self, e) =>
{
// TODO: figure out how periodic Local_AllBrokersDown could be avoided via configuration
var logLevel = e.Code == ErrorCode.Local_AllBrokersDown ? LogLevel.Debug : LogLevel.Error;
logger.Log(logLevel, "Kafka producer error: ErrorCode:{ErrorCode}, Reason:{Reason}, Name:{Name}", e.Code, e.Reason, self.Name);
});
return builder;
}
public static ProducerBuilder<TKey, TValue> UseDefaultLogHandler<TKey, TValue>(this ProducerBuilder<TKey, TValue> builder, ILogger logger)
{
ArgumentNullException.ThrowIfNull(builder);
ArgumentNullException.ThrowIfNull(logger);
builder.SetLogHandler((_, log) => log.LogWithLogger("Kafka producer", logger));
return builder;
}
}
It seems I can only use InstrumentedProducerBuilder
using DI. Depending on the complexity of the application this can make the migration from the ProducerBuilder
to the InstrumentedProducerBuilder
very painful. I wish there would be a way to just replace the ProducerBuilder
with the InstrumentedProducerBuilder
.
Could we have another constructor with an additional Options parameter?
I hacked these constructors to the library and I was able to replace all of my ProducerBuilder and ConsumerBuilder lines. I also needed to make the ConfluentKafkaCommon.InstrumentationName public so that I could enable metrics and traces without changing my DI. Now all the Kafka metrics and traces are showing up for my application. InstrumentedConsumer and InstrumentedProducer are very nice.
Please let us use this library without having all the Kafka consumers and producers needed to be migrated to DI.
public InstrumentedConsumerBuilder(IEnumerable<KeyValuePair<string, string>> config, bool enableMetrics, bool enabledTraces)
: base(config)
{
this.Options = new ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue>
{
Metrics = enableMetrics,
Traces = enabledTraces,
};
}
public InstrumentedProducerBuilder(IEnumerable<KeyValuePair<string, string>> config, bool enableMetrics, bool enabledTraces)
: base(config)
{
this.Options = new ConfluentKafkaProducerInstrumentationOptions<TKey, TValue>
{
Metrics = enableMetrics,
Traces = enabledTraces,
};
}
builder.Services.AddOpenTelemetry()
.WithMetrics(c =>
{
c.AddMeter(ConfluentKafkaCommon.InstrumentationName);
c.AddOtlpExporter();
})
.WithTracing(c =>
{
c.AddSource(ConfluentKafkaCommon.InstrumentationName);
c.AddOtlpExporter();
})
I echo what @t03apt mentioned in having the ability to enable tracing/metrics without requiring DI (I am not sure exactly how the API surface should look)
What I have been looking into is getting tracing working with Akka.Streams.Kafka https://github.com/mhbuck/KafkaTracingExamples/ Currently my Akka.Streams.Kafka example is not working (Consumer is not working but the producer is working) but I expect that is more due to how I am configuring Akka.Streams.Kafka.
The DI approach makes a number of assumptions around how configuration of consumers/producers should be handled that does not fit some approaches.
What is reasoning behind the "Options" approach of having an object that determines if metrics/traces are enabled as looking through other Instrumentation packages is seems unique. (Apologies if this is discussed somewhere I had a look through the original PR to see if there were any comments on it https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/1493)
_Originally posted by @vishweshbankwar in https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/1493#discussion_r1659150221_