spring-projects / spring-kafka

Provides Familiar Spring Abstractions for Apache Kafka
https://projects.spring.io/spring-kafka
Apache License 2.0
2.17k stars 1.55k forks source link

Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table. #2230

Closed raulsalidosanchez closed 2 years ago

raulsalidosanchez commented 2 years ago

After upgrading to spring-kafka 2.6.6 ( I also upgraded kaka-clients and kafka streams to 2.6.0) I am facing this problema with the topology. I am using spring version 2.3.12 and i ckecked de compatibility matrix in: compatibility matrix

I checked my application.yml, my KafkaStreams configuration class and all seem to be ok.

The logs I see are: 2022-04-12 13:39:58.865 INFO 4528 [traceId='']--- [ main] org.apache.kafka.streams.KafkaStreams : stream-client [6Z7-f6548b6d-5238-4d55-bb82-bfede74ca3c0] Kafka Streams version: 2.6.0 2022-04-12 13:39:58.865 INFO 4528 [traceId='']--- [ main] org.apache.kafka.streams.KafkaStreams : stream-client [6Z7-f6548b6d-5238-4d55-bb82-bfede74ca3c0] Kafka Streams commit ID: 62abe01bee039651 2022-04-12 13:39:58.870 INFO 4528 [traceId='']--- [ main] org.apache.kafka.streams.KafkaStreams : stream-client [6Z7-f6548b6d-5238-4d55-bb82-bfede74ca3c0] Overriding number of StreamThreads to zero for global-only topology 2022-04-12 13:39:58.871 ERROR 4528 [traceId='']--- [ main] org.apache.kafka.streams.KafkaStreams : stream-client [6Z7-f6548b6d-5238-4d55-bb82-bfede74ca3c0] Topology with no input topics will create no stream threads and no global thread. 2022-04-12 13:39:58.875 WARN 4528 [traceId='']--- [ main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'defaultKafkaStreamsBuilder'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table. 2022-04-12 13:39:58.876 INFO 4528 [traceId='']--- [ main] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler' 2022-04-12 13:39:58.877 INFO 4528 [traceId='']--- [ main] o.s.i.endpoint.EventDrivenConsumer : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2022-04-12 13:39:58.877 INFO 4528 [traceId='']--- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.errorChannel' has 0 subscriber(s). 2022-04-12 13:39:58.877 INFO 4528 [traceId='']--- [ main] o.s.i.endpoint.EventDrivenConsumer : stopped bean '_org.springframework.integration.errorLogger' 2022-04-12 13:39:58.886 INFO 4528 [traceId='']--- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor' 2022-04-12 13:39:58.905 INFO 4528 [traceId='']--- [ main] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default' 2022-04-12 13:39:58.906 TRACE 4528 [traceId='']--- [ main] o.h.type.spi.TypeConfiguration$Scope : Handling #sessionFactoryClosed from [org.hibernate.internal.SessionFactoryImpl@55a46bf3] for TypeConfiguration 2022-04-12 13:39:58.906 DEBUG 4528 [traceId='']--- [ main] o.h.type.spi.TypeConfiguration$Scope : Un-scoping TypeConfiguration [org.hibernate.type.spi.TypeConfiguration$Scope@51bcb07f] from SessionFactory [org.hibernate.internal.SessionFactoryImpl@55a46bf3] 2022-04-12 13:39:58.907 INFO 4528 [traceId='']--- [ main] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default' 2022-04-12 13:39:58.907 TRACE 4528 [traceId='']--- [ main] o.h.type.spi.TypeConfiguration$Scope : Handling #sessionFactoryClosed from [org.hibernate.internal.SessionFactoryImpl@7f69c4c0] for TypeConfiguration 2022-04-12 13:39:58.907 DEBUG 4528 [traceId='']--- [ main] o.h.type.spi.TypeConfiguration$Scope : Un-scoping TypeConfiguration [org.hibernate.type.spi.TypeConfiguration$Scope@679b617f] from SessionFactory [org.hibernate.internal.SessionFactoryImpl@7f69c4c0] 2022-04-12 13:39:58.910 INFO 4528 [traceId='']--- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated... 2022-04-12 13:39:58.922 INFO 4528 [traceId='']--- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed. 2022-04-12 13:39:58.923 INFO 4528 [traceId='']--- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-2 - Shutdown initiated... 2022-04-12 13:39:59.336 INFO 4528 [traceId='']--- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-2 - Shutdown completed. 2022-04-12 13:39:59.341 INFO 4528 [traceId='']--- [ main] o.apache.catalina.core.StandardService : Stopping service [Tomcat] 2022-04-12 13:39:59.388 INFO 4528 [traceId='']--- [ main] ConditionEvaluationReportLoggingListener :

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled. 2022-04-12 13:39:59.470 ERROR 4528 [traceId='']--- [ main] o.s.boot.SpringApplication : Application run failed

org.springframework.context.ApplicationContextException: Failed to start bean 'defaultKafkaStreamsBuilder'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554) at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:755) at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:402) at org.springframework.boot.SpringApplication.run(SpringApplication.java:312) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1247) at org.springframework.boot.SpringApplication.run(SpringApplication.java:1236) at com.soprasteria.eci.gateway.documentation.back.Application.main(Application.java:12) Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table. at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:333) at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ... 14 common frames omitted Caused by: org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table. at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:729) at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:583) at org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:316) ... 15 common frames omitted

My kafkaStreams configuration class is:

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs(KafkaProperties kafkaProperties) {
        Map<String, Object> config = new HashMap<>();
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getClientId());   
        return new KafkaStreamsConfiguration(config);
    }

    /**
     * Boot will autowire this into the container factory.
     */
    @Bean
    public LoggingErrorHandler errorHandler() {
      return new LoggingErrorHandler();
    }
}

My application.yml kafka configuration have: spring: kafka: bootstrap-servers: XX client-id: XX ssl: trust-store-location: classpath:kafkaCerts.jks

    properties:

Cloud broker connection parameters

        ssl.endpoint.identification.algorithm:
        sasl.mechanism: SCRAM-SHA-512
        security.protocol: SASL_SSL
        sasl.jaas.config: XX

Cloud discover

        interceptor-classes: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
        confluent.monitoring.interceptor.sasl.mechanism: SCRAM-SHA-512 
        confluent.monitoring.interceptor.security.protocol: SASL_SSL
        confluent.monitoring.interceptor.sasl.jaas.config: XX
        confluent.monitoring.interceptor.ssl.truststore.location: classpath:kafkaCerts.jks

Cloud Schema Registry Connection parameter

        schema.registry.url: XX
        basic.auth.credentials.source: USER_INFO   
        schema.registry.basic.auth.user.info: client:client
        auto.register.schemas: false
        # Delegate deserializers
        spring.deserializer.key.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
        spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer

    consumer:
        client-id: XX
        group-id: XX
        auto-offset-reset: earliest
        # Configures the Spring Kafka ErrorHandlingDeserializer that delegates to the 'real' deserializers
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        enable-auto-commit: true
        auto-commit-interval: 1000
        properties:
            specific.avro.reader: true

topic: name: XX

I am using the topic in another class with the annotation: @KafkaListener(topics = "${topic.name}")

can someone help me?

raulsalidosanchez commented 2 years ago

Sorry for the format of my application.yml.

spring:
    kafka:
        bootstrap-servers: XX
        client-id: XX
        ssl:
            trust-store-location: classpath:kafkaCerts.jks

        properties:
# Cloud broker connection parameters
            ssl.endpoint.identification.algorithm:
            sasl.mechanism: SCRAM-SHA-512
            security.protocol: SASL_SSL
            sasl.jaas.config: XX

# Cloud discover
            interceptor-classes: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
            confluent.monitoring.interceptor.sasl.mechanism: SCRAM-SHA-512 
            confluent.monitoring.interceptor.security.protocol: SASL_SSL
            confluent.monitoring.interceptor.sasl.jaas.config: XX
            confluent.monitoring.interceptor.ssl.truststore.location: classpath:kafkaCerts.jks

# Cloud Schema Registry Connection parameter
            schema.registry.url: XX
            basic.auth.credentials.source: USER_INFO   
            schema.registry.basic.auth.user.info: client:client
            auto.register.schemas: false
            # Delegate deserializers
            spring.deserializer.key.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
            spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer

        consumer:
            client-id: XX
            group-id: XX
            auto-offset-reset: earliest
            # Configures the Spring Kafka ErrorHandlingDeserializer that delegates to the 'real' deserializers
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            enable-auto-commit: true
            auto-commit-interval: 1000
            properties:
                specific.avro.reader: true

topic:
    name: XX
artembilan commented 2 years ago

Your problem means that you don't have KStream beans defined in your project: https://stackoverflow.com/questions/67841137/topology-with-no-input-topics-will-create-no-stream-threads-and-no-global-thread

raulsalidosanchez commented 2 years ago

Hello @artembilan, I do not understand exactly what you mean. I only use de default bean of KafkaStream defined in my confiugration class "KafkaStreamConfig".

The issue that you gave me talk about how to include serveral applications ids.

The same code that I have now works fine with version 2.3.1 of spring-kafka and spring version 2.0.2.

artembilan commented 2 years ago

Right, as long as I don't have a KStream bean in my tests I got the same error. If that worked before, it doesn't mean that was correct: there is no point in the @EnableKafkaStreams if you don't declare KStream to process data. For example in our tests we have something like this:

        @Bean
        public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
            KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
            stream.foreach((K, v) -> { });
            return stream;
        }
garyrussell commented 2 years ago

Specifically, see the fourth comment on the answer on Stack Overflow; there was a change made to the kafka-streams client code.

...they changed the Kafka Streams client since that answer to disallow a Topology with no input topics...

raulsalidosanchez commented 2 years ago

Ok, i Will check the configuration and implement KStream bean, thank you very much for the support!

artembilan commented 2 years ago

Closed as Mistake in the user code.