quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.57k stars 2.63k forks source link

Inability to create multiple Kafka Streams topologies in one application using @Produces method(s) #38318

Open ziemsky opened 7 months ago

ziemsky commented 7 months ago

Describe the bug

The problem

Quarkus' guide Using Apache Kafka Streams describes a method of creating Kafka Streams topology using method annotated with @jakarta.enterprise.inject.Produces.

However, this method does not seem to be working for more than one topology within the same application, as having more than one such method results in the application failing to start with the following exception seen in the logs:

java.lang.RuntimeException: Failed to start quarkus
        ...
Caused by: jakarta.enterprise.inject.AmbiguousResolutionException: Beans: [PRODUCER_METHOD bean [class=org.acme.qkt.TopologyProducerA, id=6BFQ4NfZqRuS74kLy3x4LX22jgo], PRODUCER_METHOD bean [class=org.acme.qkt.TopologyProducerB, id=na5ipbtejGCLdDVBsclVMcRoeQM]]
        ...

Analysis

Looking at the the stack trace (see full stack trace below), it appears that KafkaStreamsProducer simply doesn't support more than one topology, and throws said exception when it detects more than one org.apache.kafka.streams.Topology beans available - see topology.get in KafkaStreamsProducer:106.

Following this logic, this is likely an enhancement request rather than a bug, but I'm raising it as a bug so that someone more familiar with the matter can kindly confirm whether this is a real issue or just my ignorance/wrong expectations regarding the proper configuration.

If it's the latter, please accept my apologies and a request to kindly clarify the recommended method of creating multiple topologies (be it in the docs or in a response to this StackOverflow question).

Having said that, seeing that Quarkus supports injection of multiple beans of the same type it seems not unreasonable to expect that it should be possible to have multiple @Produces Topology methods available in one application.

Attempted fix

To address the exception, custom @jakarta.inject.Qualifier-annotated annotations were applied to the methods producing topologies, following remarks from Quarkus' intro to CDI.

Unfortunately, the effect was that whilst the exception was no longer thrown, the topology producer methods were no longer invoked, leaving Kafka Streams not initialised.

This is understandable as KafkaStreamsProducer's constructor only appears to expect a @Default bean, and once annotated with custom qualifiers, none of the topology beans match this requirement.

Workaround

The only workaround that worked for me was to fall back to the 'legacy' method described in Quarkus' blog, i.e. 'manually' managing instances of StreamsBuilder driven by application lifecycle events.

Expected behavior

To have multiple, working Kafka Streams topologies (multiple instances of class org.apache.kafka.streams.Topology class) produced through @Produces annotated methods.

Actual behavior

Having more than one @Produces Topology methods results in the application failing to start with the following stack trace seen in the logs:

2024-01-21 16:13:37,136 ERROR [io.qua.run.Application] (Quarkus Main Thread) Failed to start application (with profile [dev]): java.lang.RuntimeException: Failed to start quarkus
        at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
        at io.quarkus.runtime.Application.start(Application.java:101)
        at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:111)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
        at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
        at io.quarkus.runner.GeneratedMain.main(Unknown Source)
        at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
        at java.base/java.lang.reflect.Method.invoke(Method.java:578)
        at io.quarkus.runner.bootstrap.StartupActionImpl$1.run(StartupActionImpl.java:113)
        at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: jakarta.enterprise.inject.AmbiguousResolutionException: Beans: [PRODUCER_METHOD bean [class=org.acme.qkt.TopologyProducerA, id=6BFQ4NfZqRuS74kLy3x4LX22jgo], PRODUCER_METHOD bean [class=org.acme.qkt.TopologyProducerB, id=na5ipbtejGCLdDVBsclVMcRoeQM]]
        at io.quarkus.arc.impl.InstanceImpl.bean(InstanceImpl.java:291)
        at io.quarkus.arc.impl.InstanceImpl.getInternal(InstanceImpl.java:309)
        at io.quarkus.arc.impl.InstanceImpl.get(InstanceImpl.java:190)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer.<init>(KafkaStreamsProducer.java:106)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_Bean.doCreate(Unknown Source)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_Bean.create(Unknown Source)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_Bean.create(Unknown Source)
        at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:119)
        at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:38)
        at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:35)
        at io.quarkus.arc.impl.LazyValue.get(LazyValue.java:32)
        at io.quarkus.arc.impl.ComputingCache.computeIfAbsent(ComputingCache.java:69)
        at io.quarkus.arc.impl.ComputingCacheContextInstances.computeIfAbsent(ComputingCacheContextInstances.java:19)
        at io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:35)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_Bean.get(Unknown Source)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_Bean.get(Unknown Source)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_ProducerMethod_getKafkaStreams_d5DLwLZ87HgycVRDmdD6cQd9k9w_Bean.doCreate(Unknown Source)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_ProducerMethod_getKafkaStreams_d5DLwLZ87HgycVRDmdD6cQd9k9w_Bean.create(Unknown Source)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_ProducerMethod_getKafkaStreams_d5DLwLZ87HgycVRDmdD6cQd9k9w_Bean.create(Unknown Source)
        at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:119)
        at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:38)
        at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:35)
        at io.quarkus.arc.impl.LazyValue.get(LazyValue.java:32)
        at io.quarkus.arc.impl.ComputingCache.computeIfAbsent(ComputingCache.java:69)
        at io.quarkus.arc.impl.ComputingCacheContextInstances.computeIfAbsent(ComputingCacheContextInstances.java:19)
        at io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:35)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_ProducerMethod_getKafkaStreams_d5DLwLZ87HgycVRDmdD6cQd9k9w_Bean.get(Unknown Source)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_ProducerMethod_getKafkaStreams_d5DLwLZ87HgycVRDmdD6cQd9k9w_Bean.get(Unknown Source)
        at io.quarkus.arc.impl.ArcContainerImpl.beanInstanceHandle(ArcContainerImpl.java:553)
        at io.quarkus.arc.impl.ArcContainerImpl.beanInstanceHandle(ArcContainerImpl.java:533)
        at io.quarkus.arc.impl.ArcContainerImpl.beanInstanceHandle(ArcContainerImpl.java:566)
        at io.quarkus.arc.impl.ArcContainerImpl.instance(ArcContainerImpl.java:338)
        at io.quarkus.kafka.streams.runtime.KafkaStreamsProducer_Observer_Synthetic_GBi-MxGEb8kh__7Q4XC-t4hECVU.notify(Unknown Source)
        at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:346)
        at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:328)
        at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:82)
        at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:157)
        at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:108)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(Unknown Source)
        at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(Unknown Source)
        ... 11 more

How to Reproduce?

In other words:

Have a basic Quarkus project with:

Output of uname -a or ver

Linux my-host-name 6.7.0-arch3-1 #1 SMP PREEMPT_DYNAMIC Sat, 13 Jan 2024 14:37:14 +0000 x86_64 GNU/Linux

Output of java -version

openjdk version "20.0.2" 2023-07-18 OpenJDK Runtime Environment (build 20.0.2+9-78) OpenJDK 64-Bit Server VM (build 20.0.2+9-78, mixed mode, sharing)

Quarkus version or git rev

3.6.6

Build tool (ie. output of mvnw --version or gradlew --version)

Gradle 8.3 - Build time: 2023-08-17 07:06:47 UTC Revision: 8afbf24b469158b714b36e84c6f4d4976c86fcd5 Kotlin: 1.9.0 Groovy: 3.0.17 Ant: Apache Ant(TM) version 1.10.13 compiled on January 4 2023 JVM: 20.0.2 (Oracle Corporation 20.0.2+9-78) OS: Linux 6.7.0-arch3-1 amd64

Additional information

No response

quarkus-bot[bot] commented 7 months ago

/cc @alesj (kafka,kafka-streams), @cescoffier (kafka), @geoand (kotlin), @gunnarmorling (kafka-streams), @ozangunalp (kafka,kafka-streams), @rquinio (kafka-streams)

ozangunalp commented 7 months ago

I don't think it makes much sense as it'd mean creating multiple KafkaStreams apps inside a single Quarkus app. @ziemsky Unless you've a specific solution in mind I'd like to close this.

ziemsky commented 7 months ago

Thanks for reviewing this, @ozangunalp, much appreciated :thumbsup:

I'm happy to be proven wrong, but here's my thinking:

Multiple KafkaStream instances in one app

Agreed, it does seem like this would require creating multiple instances of KafkaStreams in one Quarkus app, given that none of its public constructors accepts multiple topologies.

However, I can't say that I share the concern that having multiple instances of KafkaStreams in a single Quarkus application would be a problem:

According to the JavaDoc, it is a supported scenario (emphasis mine):

A KafkaStreams instance can co-ordinate with any other instances with the same application ID (whether in the same process, on other processes on this machine, or on remote machines) as a single (possibly distributed) stream processing application.

Admittedly, this mentions 'the same application ID', whereas I would expect each distinct topology to be able to have a separate application id/consumer group id (my reasoning below), but I don't think that 'same or different app id' is a factor that affects viability of the implementation.

In fact, having multiple instances of KafkaStreams in the same app appears to be exactly what Spring for Apache Kafka does. Slightly buried in its docs is this remark:

A new KafkaStreams is created on each start(). You might also consider using different StreamsBuilderFactoryBean instances, if you would like to control the lifecycles for KStream instances separately.

Indeed, it seems that the way Spring achieves support for that 'control of separate KStream instances' is by creating multiple instances of KafkaStreams, each with its own configuration.

I've created a small SpringBoot project to prove to myself that this is what happens. It can be found here.

Why, though?

As for why the ability to have multiple, distinct instances of Topology is useful, all use cases that I can think of boil down to the need of isolation, in order to:

Use case A: prevent changes to one sub-topology affect another

The former point stems from the fact that adding components to one sub-topology changes ids of components in another, as documented here. Admittedly, this can be mitigated by using static, explicitly assigned names, but in practice not everyone is aware of this need, and so the larger the topology, the more risk is there that innocent looking changes could lead to silent loss or corruption of data.

As for the latter point:

Use case B: enable event replays in one sub-topology

One use case is to support 'replaying' of individual topic. Assume that in a single topology scenario (single KafkaStreams instance) we use sub-topologies A and B that consume the same topic. Because they belong to the same topology, they share the same consumer group id. Now, suppose we want to reset consumer offsets just for topology A to trigger said replay. Because of the shared consumer group id, in doing so we're also, inadvertently, triggering the replay (reset the offsets) for sub-topology B, which may not be desirable, and may even be harmful. Being able to have multiple topologies would allow us to give them different consumer group ids and prevent (or at least lower the risk of) this problem.

Use case C: starting small with one app hosting multiple topologies

Another use case is 'starting small', where it may be more efficient to have multiple distinct but related topologies in one application. If the traffic is not too intensive, this allows us to have fewer application instances to support many topologies, with smaller cumulative overhead than if we had one-app-per-topology. As traffic increases, those distinct topologies can then be broken down to separate microservices for scalability; an activity that is made easier the more the topologies are isolated (e.g. through distinct consumer groups).

So, whatddaya suggest?

I realise that everything above is a justification of why I think it's useful to be able to have multiple instances of Topology in one app, when you asked for a suggestion of a solution. Unfortunately, I only started learning and using Quarkus very recently, and so am not in position to offer any pull requests just yet.

However, I think that the first option I'd explore would be to modify KafkaStreamsProducer to:

On that last point, I'm not immediately sure how to marry that with individual config settings in the application.properties, off the top of my head. I think that it should be possible for one group of topologies to share one config, another group share another config, with some common config available to be shared by all such groups. I suppose that the custom qualifier annotations could have, say, an attribute configName whose value could be then mapped to a 'category' fragment in properties' names in application.properties (similar to what is done with logging configuration). Is this what you had on mind? I hope I'm making sense, here, but I'll be happy to elaborate if not. I'm sure you know this stuff better than I do, anyway, but since you asked :grin:

BTW: given your response I think this is an enhancement request rather than a bug, after all. I don't have the power to change the issue's labels, however. Also, there is nothing Kotlin-specific here, the label has been assigned automatically as a result of Kotlin being used in Gradle's build script.