Farfetch / kafkaflow-retry-extensions

Kafka Flow Retry Patterns Extensions
https://farfetch.github.io/kafkaflow-retry-extensions/
MIT License
57 stars 7 forks source link

[Bug Report]: Durable producer use topic name and we cannot have same topic for multiple message #146

Open kYann opened 10 months ago

kYann commented 10 months ago

Prerequisites

Description

Hello,

When creating a durable retry with two different message for the same topic, then the producer creation fail.

Steps to reproduce

  1. Declare consumer with durable retry with same topic but different message type
  2. Start kafkabus

Expected behavior

Should start without issue

Actual behavior

ArgumentException

Message: 
  System.ArgumentException : An item with the same key has already been added. Key: kafka-flow-retry-durable-producer-test-test-message-retry

Stack Trace: 
  Dictionary`2.TryInsert(TKey key, TValue value, InsertionBehavior behavior)
  Dictionary`2.Add(TKey key, TValue value)
  Enumerable.ToDictionary[TSource,TKey](IEnumerable`1 source, Func`2 keySelector, IEqualityComparer`1 comparer)
  Enumerable.ToDictionary[TSource,TKey](IEnumerable`1 source, Func`2 keySelector)
  ProducerAccessor.ctor(IEnumerable`1 producers)
  <>c__DisplayClass5_0.<Build>b__1(IDependencyResolver resolver)
  <>c__DisplayClass6_0`1.<Add>b__0(IServiceProvider provider)
  CallSiteRuntimeResolver.VisitFactory(FactoryCallSite factoryCallSite, RuntimeResolverContext context)
  CallSiteVisitor`2.VisitCallSiteMain(ServiceCallSite callSite, TArgument argument)
  CallSiteRuntimeResolver.VisitRootCache(ServiceCallSite callSite, RuntimeResolverContext context)
  CallSiteVisitor`2.VisitCallSite(ServiceCallSite callSite, TArgument argument)
  CallSiteRuntimeResolver.Resolve(ServiceCallSite callSite, ServiceProviderEngineScope scope)
  ServiceProvider.CreateServiceAccessor(ServiceIdentifier serviceIdentifier)
  ConcurrentDictionary`2.GetOrAdd(TKey key, Func`2 valueFactory)
  ServiceProvider.GetService(ServiceIdentifier serviceIdentifier, ServiceProviderEngineScope serviceProviderEngineScope)
  ServiceProviderEngineScope.GetService(Type serviceType)
  MicrosoftDependencyResolver.Resolve(Type type)
  DependencyResolverExtensions.Resolve[T](IDependencyResolver resolver)
  KafkaFlowConfigurator.CreateBus(IDependencyResolver resolver)

Producer name should use the message type name. See https://github.com/Farfetch/kafkaflow-retry-extensions/blob/9ee2ed2941b7ff73f2eff2c898cf61e83861e9a5/src/KafkaFlow.Retry/Durable/Definitions/Builders/RetryDurableEmbeddedClusterDefinitionBuilder.cs#L94

KafkaFlow Retry Extensions version

v3.0.0