micronaut-projects / micronaut-kafka

Integration between Micronaut and Apache Kafka
Apache License 2.0
86 stars 107 forks source link

Possibility to disable kafka-streams if no Stream is defined #942

Open fgrutsch opened 10 months ago

fgrutsch commented 10 months ago

Issue description

Versions:

I am currently running into the issue that as soon as I pull in the micronaut-kafka-streams dependency AND I don't define any Streams that the server doesn't start up. It fails with:

Message: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.
Path Taken: KafkaStreams.kafkaStreams(String name,ConfiguredStreamBuilder builder,KafkaClientSupplier kafkaClientSupplier,KStream[] kStreams)
io.micronaut.context.exceptions.BeanInstantiationException: Bean definition [org.apache.kafka.streams.KafkaStreams] could not be loaded: Error instantiating bean of type  [org.apache.kafka.streams.KafkaStreams]

Message: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.
Path Taken: KafkaStreams.kafkaStreams(String name,ConfiguredStreamBuilder builder,KafkaClientSupplier kafkaClientSupplier,KStream[] kStreams)
        at io.micronaut.context.DefaultBeanContext.initializeContext(DefaultBeanContext.java:1921)
        at io.micronaut.context.DefaultApplicationContext.initializeContext(DefaultApplicationContext.java:249)
        at io.micronaut.context.DefaultBeanContext.readAllBeanDefinitionClasses(DefaultBeanContext.java:3326)
        at io.micronaut.context.DefaultBeanContext.finalizeConfiguration(DefaultBeanContext.java:3684)
        at io.micronaut.context.DefaultBeanContext.start(DefaultBeanContext.java:341)
        at io.micronaut.context.DefaultApplicationContext.start(DefaultApplicationContext.java:194)
        at io.micronaut.runtime.Micronaut.start(Micronaut.java:75)
        at op.events.ApplicationKt.main(Application.kt:10)
Caused by: io.micronaut.context.exceptions.BeanInstantiationException: Error instantiating bean of type  [org.apache.kafka.streams.KafkaStreams]

I assume this comes from here because org.apache.kafka.streams.KafkaStreams requires at least 1 Stream from my understanding.

I am aware of the kafka.streams.default.start-kafka-streams setting, however this only influences whether Kafka is started or not. With this configuration set to false, it is still required to have at list one Stream Bean defined.

Is this intended or is there a nice way to overcome this problem, right now we have something like this to avoid the startup problem:

@Factory
class MicronautKafkaStreamsDisabler {

    @Singleton
    @Requires(property = START_KAFKA_STREAMS_CONFIG, value = "false", defaultValue = "true")
    fun noOpStream(builder: ConfiguredStreamBuilder): KStream<String, ByteArray> =
        builder.stream("", Consumed.with(Serdes.String(), Serdes.ByteArray()))
}

By setting kafka.streams.default.start-kafka-streams: false this noOpStream Bean is created.

Use case: We have a project that has Kafka Consumers and Kafka Streams which are contained in a single docker image. On deployment, we want to run Consumers and Streams separately in different pods. So the pod that is running a consumer only will not have a Stream bean enabled.