LGouellec / kafka-streams-dotnet

.NET Stream Processing Library for Apache Kafka 🚀
https://lgouellec.github.io/kafka-streams-dotnet/
MIT License
452 stars 73 forks source link

Unable to process messages when using branching #313

Closed attributeerror closed 4 months ago

attributeerror commented 4 months ago

Description

I am running into an issue where I'm using branching to determine which Avro schema should be used to serialise a message, but the stream processor is simply consuming all of the messages and not outputting to any downstream topic.

How to reproduce

Create 2 or more models from Avro schemas (I don't think the data matters in this case), then use AvroGen to create the C# models. I have provided a snippet of my code below as to how I set the branches up.

Setup stream topology code snippet ```c# StreamBuilder builder = new(); List> predicates = new(); foreach (Models.RouteMapping routeMapping in _settings.RouteMappings) { predicates.Add((k, v) => { return routeMapping.RoutingKeys.Any(routingKey => routingKey.MatchesRoutingKey(routeMapping.RoutingKeys)); }); } IKStream[] branches = builder.Stream(_settings.KafkaSourceTopic, new StringSerDes(), new StringSerDes()) .Branch(predicates.ToArray()); foreach (var (index, routeMapping) in _settings.RouteMappings.WithIndex()) { if (routeMapping.TopicName == "Topic1") { branches[index] .Peek((k, v) => _log.Info($"Processing message {k} - routing to Topic1.")) .MapValues(value => _schemaOneMapper.From(value)) .Filter((key, value) => value != null) .To(TopicNames.AssetActions, new StringSerDes(), new SchemaAvroSerDes()); } else if (routeMapping.TopicName == "Topic2") { refEntityBranches[refEntityBranchIndex] .Peek((k, v) => _log.Info($"Processing message {k} - routing to Topic2.")) .MapValues((key, value) => _schemaTwoMapper.From(value)) .Filter((key, value) => value != null) .To(TopicNames.ReferenceEntityActions, new StringSerDes(), new SchemaAvroSerDes()); } // Otherwise, just forward the message with no serialisation else { branches[index] .Peek((k, v) => _log.Info($"Processing message {k} - unknown routing.")) .Filter((key, value) => value != null) .To(routeMapping.TopicName, new StringSerDes(), new StringSerDes()); } } return builder.Build(); ```

My RouteMapping class is then as follows:

public class RouteMapping
{

    public string TopicName { get; set; }

    public List<string> RoutingKeys { get; set; }

}

The RoutingKeys field should be a List of RegEx patterns to match with the incoming key.

Assembly Information

Streamiz.Kafka.Net Nuget version: v1.5.1 Apache Kafka version: 3.3.1 Client configuration:

ApplicationId = "test-stream-processor",
BootstrapServers = "[redacted]",
SchemaRegistryUrl = "[redacted]/confluent-schema-registry",
AutoRegisterSchemas = false,
SubjectNameStrategy = SubjectNameStrategy.Record,
AutoOffsetReset = AutoOffsetReset.Earliest,
DefaultKeySerDes = new StringSerDes(),
DefaultValueSerDes = new StringSerDes(),
Debug = "consumer,cgrp,topic,fetch,broker,msg",
Logger = LoggerFactory.Create(b =>
{
    b.SetMinimumLevel(LogLevel.Debug);
    b.AddConsole();
}),

Operating system: Windows 10

Logs attached: stream processor logs.txt

Checklist

Please provide the following information:

attributeerror commented 4 months ago

Oops- this was a flaw in my logic, not with the library. No branch predicate was passing which is why it was consuming all of the messages yet supposedly not doing anything.

For anyone else with this issue in the future, I would highly recommend a catch-all branch predicate (where the very last predicate in the list always returns true) so that you can spot this easier.

LGouellec commented 4 months ago

Thanks @attributeerror for the follow up. You are right, a good practice is to have a default branch at the end in case of any predicate match.