LGouellec / kafka-streams-dotnet

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/kafka-streams-dotnet/
MIT License
453 stars 73 forks source link

Exception when configuring StreamConfig via the ctor #220

Closed progrocket closed 1 year ago

progrocket commented 1 year ago

First, thank you very much for your library.

Description

When configuring StreamConfig via the constructor with the dictionary, an exception occurs when creating a KafkaStream An exception occurs when passing the "ApplicationId" and "BootstrapServers" keys to the dictionary constructor:

Unhandled exception. System.Collections.Generic.KeyNotFoundException: The given key 'application.id' was not present in the dictionary.
   at System.Collections.Generic.Dictionary`2.get_Item(TKey key)
   at Streamiz.Kafka.Net.StreamConfig.get_ApplicationId()
   at Streamiz.Kafka.Net.KafkaStream..ctor(Topology topology, IStreamConfig configuration, IKafkaSupplier kafkaSupplier)
   at Streamiz.Kafka.Net.KafkaStream..ctor(Topology topology, IStreamConfig configuration)
   at MyStream.Worker.ExecuteAsync(CancellationToken stoppingToken) in C:\Users\Dmitry\RiderProjects\MyStream\MyStream\Worker.cs:line 34
   at Microsoft.Extensions.Hosting.Internal.Host.StartAsync(CancellationToken cancellationToken)
   at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.RunAsync(IHost host, CancellationToken token)
   at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.RunAsync(IHost host, CancellationToken token)
   at Program.<Main>$(String[] args) in C:\Users\Dmitry\RiderProjects\MyStream\MyStream\Program.cs:line 11
   at Program.<Main>(String[] args)

If you change the keys, replacing "ApplicationId" with "application.id" and "BootstrapServers" with "bootstrap.servers", you get another exception:

Unhandled exception. Streamiz.Kafka.Net.Errors.StreamConfigException: Stream configuration is not correct. Please set ApplicationId and BootstrapServers as minimal.
   at Streamiz.Kafka.Net.KafkaStream..ctor(Topology topology, IStreamConfig configuration, IKafkaSupplier kafkaSupplier)
   at Streamiz.Kafka.Net.KafkaStream..ctor(Topology topology, IStreamConfig configuration)
   at MyStream.Worker.ExecuteAsync(CancellationToken stoppingToken) in C:\Users\Dmitry\RiderProjects\MyStream\MyStream\Worker.cs:line 40
   at Microsoft.Extensions.Hosting.Internal.Host.StartAsync(CancellationToken cancellationToken)
   at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.RunAsync(IHost host, CancellationToken token)
   at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.RunAsync(IHost host, CancellationToken token)
   at Program.<Main>$(String[] args) in C:\Users\Dmitry\RiderProjects\MyStream\MyStream\Program.cs:line 11
   at Program.<Main>(String[] args)

How to reproduce

All the code is here: https://github.com/progrocket/streamiz-configuration-bug

Package Version: 1.4.2

LGouellec commented 1 year ago

Hi @progrocket,

Thanks for your issue. Indeed, you can't set the bootstrap servers directly with the dictionary property, because the StreamConfig.BootstrapServers property set directly into ConsumerConfig / ProducerConfig / AdminConfig

For your information, I will refactor the configuration for the next release (eg 1.5.0). This issue #144 track this feature.

Workaround :

var configuration = new Dictionary<string, dynamic> {
   ["application.id"] = "Test",
};

var streamConfig = new StreamConfig(configuration) {
            AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Earliest,
            BootstrapServers = "localhost:9092"
};
LGouellec commented 1 year ago

This issue will fix soon with the new StreamConfig implementation. See #144

Now, this implementation works :

Func<ProcessorContext, ConsumeResult<byte[], byte[]>, Exception, ExceptionHandlerResponse> f = (_, _, _) =>
                ExceptionHandlerResponse.CONTINUE;

            var configuration = new Dictionary<string, dynamic>
            {
                ["application.id"] = "test-app",
                ["deserialization.exception.handler"] = f,
                ["metrics.interval.ms"] = 100000,
                ["follow.metadata"] = true,
                ["bootstrap.servers"] = "localhost:9092"
            };

            var streamConfig = new StreamConfig(configuration);
LGouellec commented 1 year ago

Fixed with PR #231