Farfetch / kafkaflow

Apache Kafka .NET Framework to create applications simple to use and extend.
https://farfetch.github.io/kafkaflow/
MIT License
651 stars 119 forks source link

[Bug Report]: KafkaFlow.SchemaRegistry does not work with multiple clusters/schema registry URLs #558

Open danielmpetrov opened 7 months ago

danielmpetrov commented 7 months ago

Prerequisites

Description

We ran into this issue when we tried to connect two different Kafka clusters, with two different schema registry endpoints. We noticed that one of the consumers was throwing a unfamiliar exception. I don't have the exact stack trace, but the issue was "schema id could not be found". When running the solution with one of the cluster configs or the other, it works. When running both of them together on the other hand, one of the cluster's consumers constantly throws.

We debugged KafkaFlow's source code and traced the problem deep down into Confluent.SchemaRegistry. The Confluent library expects an ISchemaRegistryClient per schema registry endpoint, however KafkaFlow's WithSchemaRegistry method registers singleton clients, and then other classes resolve the last one registered. By resolving the last registered client (at least how Microsoft's DI container works), all clusters and their consumers will use that last registered client, which results in a 404 Not Found for all other consumers, since the schema ID's are naturally different across different schema registries.

Steps to reproduce

  1. Register multiple clusters with different schema registries, contrived example below:
// the first cluster's consumers will throw,
// since it resolves the schema registry client of the second one
// last one registered wins! 😢
services.AddKafka(kafka =>
{
    kafka.AddCluster(cluster => cluster.WithBrokers(new[] { "localhost:9092" }).WithSchemaRegistry(config => config.Url = "localhost:8081"));
    kafka.AddCluster(cluster => cluster.WithBrokers(new[] { "localhost:9093" }).WithSchemaRegistry(config => config.Url = "localhost:8082"));
});

// this works fine
services.AddKafka(kafka =>
{
    kafka.AddCluster(cluster => cluster.WithBrokers(new[] { "localhost:9092" }).WithSchemaRegistry(config => config.Url = "localhost:8081"));
});

// this works fine
services.AddKafka(kafka =>
{
    kafka.AddCluster(cluster => cluster.WithBrokers(new[] { "localhost:9093" }).WithSchemaRegistry(config => config.Url = "localhost:8082"));
});
  1. Add a consumer to each cluster, and start the program.

Expected behavior

We expect each cluster to read from it's own schema registry, as implied by the docstring of WithSchemaRegistry - "Configures schema registry to the cluster".

Actual behavior

One of the consumer throws exceptions "Cannot find schema with id '123'" (paraphrased)

KafkaFlow version

v3.0.7

ruiqbarbosa commented 6 months ago

Hello @danielmpetrov,

We have internally analyzed the issue and indeed, Kafkaflow is resolving to the last registered Schema Registry Client.

We will internally review the best way to fix the problem and will provide feedback as soon as possible.

Thank you!

KurtNapolitano-TCGP commented 4 months ago

Our organization has chosen to add code to work around this bug. I've uploaded it here, in the hopes that this either helps the maintainers of this project find a workable solution or helps others blocked by this bug find a suitable work around for the time being.

Some of the decisions made in this implementation are specific to our organizational use cases but hopefully the code will serve as a guide.

Note: As of .NET 8, IServiceCollection supports using "keyed" registrations. Since the KafkaFlow IDependencyConfigurator wraps IServiceCollection, I wasn't able to take advantage of that feature, which seems perfect for this problem. Hopefully the KafkaFlow owners will be able to exercise that.