LGouellec / kafka-streams-dotnet

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/kafka-streams-dotnet/
MIT License
455 stars 74 forks source link

System.ArgumentException: 'An item with the same key has already been added. Key: <my-topic-name>' #181

Closed igor-toporet closed 1 year ago

igor-toporet commented 1 year ago

Hi, I'm trying to stream messages from multiple topics to a single topic, e.g. "archive".

But I'm getting this exception.

System.ArgumentException: 'An item with the same key has already been added. Key: <my-topic-name>'

   at System.ThrowHelper.ThrowAddingDuplicateWithKeyArgumentException[T](T key)
   at System.Collections.Generic.Dictionary`2.TryInsert(TKey key, TValue value, InsertionBehavior behavior)
   at System.Collections.Generic.Dictionary`2.Add(TKey key, TValue value)
   at Streamiz.Kafka.Net.Processors.Internal.InternalTopologyBuilder.BuildTopology(ISet`1 nodeGroup, TaskId taskId)
   at Streamiz.Kafka.Net.Processors.Internal.InternalTopologyBuilder.BuildTopology(Nullable`1 id)
   at Streamiz.Kafka.Net.KafkaStream..ctor(Topology topology, IStreamConfig configuration, IKafkaSupplier kafkaSupplier)
   at MyCompany.MyApp.MyService.CreateKafkaStream() in <obfuscated>.cs:line NN

Am I doing something unintended? Reading from multiple topics and writing to a single topic.

Thanks!

igor-toporet commented 1 year ago

For now, I've created another stream, which seems to work -- writing to one topic from two different streams.

LGouellec commented 1 year ago

Hi @igor-toporet ,

Thanks for your issue. Could you share your topology ?

Thanks

igor-toporet commented 1 year ago

Sorry, I am away from keyboard. But here's the essence of the topology.

From topic A depending on the contents of the message I branch into two branches, map the messages differently in each branches and ultimately send to topics B and C. From B map again and send to D. From C also map (differently) and send to D again. So the topic D needs to be a terminal destination for both branches.

But a stream with this topology can't be instantiated. Maybe, a topology must be a directed acyclic graph (DAG)?

Meantime, I managed to work it around with separate streams/topologies. A to B&C, B to D, C to D.

Thanks for looking into this!

sharifmamun commented 1 year ago

@LGouellec Thanks for putting this into 1.4.0 milestone. Looking forward to that release.

igor-toporet commented 1 year ago

Thank you for fixing it!