LGouellec / kafka-streams-dotnet

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/kafka-streams-dotnet/
MIT License
452 stars 73 forks source link

Self-Relationship exception #319

Open In-Wake opened 3 months ago

In-Wake commented 3 months ago

Description

Hi I need to connect objects from the same stream to each other. Foreign-Key is not prioritized https://github.com/LGouellec/kafka-streams-dotnet/issues/52 "But I have a workaround with SelectKey(..) combine ˋJoin(..)`." After writing a test I got an exception System.ArgumentException: 'An item with the same key has already been added. Key: test-test-driver-app-KSTREAM-TOTABLE-0000000003-repartition'

I use kafka-streams-dotnet from commit https://github.com/LGouellec/kafka-streams-dotnet/commit/981e9d4b8447712ceb03adab6760202f21150f80

the exception throws from https://github.com/LGouellec/kafka-streams-dotnet/blob/981e9d4b8447712ceb03adab6760202f21150f80/core/Processors/Internal/InternalTopologyBuilder.cs#L592

How to reproduce

Test codes

public class SelfId
{
    public int Id { get; set; }
}

public class SelfRelation
{
    public int Id { get; set; }
    public string Name { get; set; }
    public int? Relation { get; set; }
}

public class Container
{
    public SelfRelation Node { get; set; }
    public SelfRelation Dependency { get; set; }
}

        [Fact]
        public void SelfTes()
        {
            var builder = new StreamBuilder();

            var stream =
                builder.Stream("self", new JsonSerDes<SelfId>(), new JsonSerDes<SelfRelation>());

            var filtrate = stream.Filter((k, v) => v.Relation.HasValue);
            var withRelationKeyStream = filtrate.SelectKey((k, v) => new SelfId { Id = v.Relation!.Value });
            var withRelationKeyTable = withRelationKeyStream.ToTable(
                Materialized<SelfId, SelfRelation, IKeyValueStore<Bytes, byte[]>>
                    .Create<JsonSerDes<SelfId>, JsonSerDes<SelfRelation>>());

            var table = stream.ToTable(
                Materialized<SelfId, SelfRelation, IKeyValueStore<Bytes, byte[]> >
                    .Create<JsonSerDes<SelfId>, JsonSerDes<SelfRelation>>());

            //this line add exception
            var join = withRelationKeyTable.Join(table, (left, right) => new Container { Node = left, Dependency = right });

           var topology = builder.Build();

           var config = new StreamConfig();
           config.ApplicationId = "test-test-driver-app";

           var driver = new TopologyTestDriver(topology, config);

           // create the test input topic
           var inputTopic =
               driver.CreateInputTopic(
                   "self", new JsonSerDes<SelfId>(), new JsonSerDes<SelfRelation>());

           inputTopic.PipeInput(new SelfId { Id = 1 }, new SelfRelation { Id = 1, Name = "self", Relation = 2 });
           inputTopic.PipeInput(new SelfId { Id = 2 }, new SelfRelation { Id = 2, Name = "rel" });

     }

Checklist

Please provide the following information:

LGouellec commented 1 month ago

Hi @In-Wake,

Did you try to run the same topology with a real kafka cluster ? It seems there is a bug in the TopologTestDriver? Btw, sorry for this late reply.