spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
969 stars 595 forks source link

Inconsistent behavior when configuring multiple functional stream bindings with kafka streams #2954

Closed jbrelje closed 1 month ago

jbrelje commented 1 month ago

When running an application with multiple functional stream bindings the behavior on application start is inconsistent. Sometimes both bindings are created (as expected), but more often only one or the other is created. Occasionally neither binding is created at all.

When removing one of the bindings and leaving just a single one in place there are no issues with the bindings not being created.

steps to reproduce set below config in application.yml

spring.cloud.stream.kafka.streams.binder:
  brokers: <broker list>
  configuration:
    security.protocol: SSL
    auto.offset.reset: latest
    ssl:
      truststore:
        location: <cert file>
        password: <cert pass>
        type: JKS
      endpoint.identification.algorithm:
    replication.factor: 3
  functions:
    stream1.applicationId: stream1
    stream2.applicationId: stream2

spring.cloud.function.definition: stream1;stream2

spring.cloud.stream.bindings:
  stream1-in-0:
    destination: inbound-data-1
    content-type: application/json
    consumer.header-mode: none
  stream2-in-0:
    destination: inbound-data-2
    content-type: application/json
    consumer.header-mode: none

create Java class with multiple stream Consumer beans

@Log4j2
@Configuration
public class MultiStream {

    @Bean
    public Consumer<KStream<String, String>> stream1() {
        return stream -> stream.peek((k,v) -> log.info("received event on stream 1"));
    }

    @Bean
    public Consumer<KStream<String, String>> stream2() {
        return stream -> stream.peek((k,v) -> log.info("received event on stream 2"));
    }
}

run the application

see in the logs that sometimes only one stream consumer is working vs both, with no other indicator why. There are no WARN nor ERROR logs. Simply shutting down the application and starting again produces different behavior.

...
2024-05-24 20:55:40.097 INFO  [main] KafkaStreamsVersionAgnosticTop: Using method org.apache.kafka.streams.processor.internals.TopologyMetadata.sourceTopicsForStore(java.lang.String,java.lang.String) || CLASS_NAME=org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsVersionAgnosticTopologyInfoFacade
2024-05-24 20:55:40.443 INFO  [main] KafkaStreamsFunctionProcessor : Key Serde used for stream1-in-0: org.apache.kafka.common.serialization.Serdes$StringSerde || CLASS_NAME=org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsFunctionProcessor
2024-05-24 20:55:40.446 INFO  [main] KafkaStreamsFunctionProcessor : Value Serde used for stream1-in-0: org.apache.kafka.common.serialization.Serdes$StringSerde || CLASS_NAME=org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsFunctionProcessor
2024-05-24 20:55:40.448 INFO  [main] AbstractKafkaStreamsBinderProc: Native decoding is enabled for stream1-in-0. Inbound deserialization done at the broker. || CLASS_NAME=org.springframework.cloud.stream.binder.kafka.streams.AbstractKafkaStreamsBinderProcessor
...

vs

...
2024-05-24 20:46:06.216 INFO  [main] KafkaStreamsVersionAgnosticTop: Using method org.apache.kafka.streams.processor.internals.TopologyMetadata.sourceTopicsForStore(java.lang.String,java.lang.String) || CLASS_NAME=org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsVersionAgnosticTopologyInfoFacade
2024-05-24 20:46:06.467 INFO  [main] KafkaStreamsFunctionProcessor : Key Serde used for stream1-in-0: org.apache.kafka.common.serialization.Serdes$StringSerde || CLASS_NAME=org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsFunctionProcessor
2024-05-24 20:46:06.471 INFO  [main] KafkaStreamsFunctionProcessor : Value Serde used for stream1-in-0: org.apache.kafka.common.serialization.Serdes$StringSerde || CLASS_NAME=org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsFunctionProcessor
2024-05-24 20:46:06.473 INFO  [main] AbstractKafkaStreamsBinderProc: Native decoding is enabled for stream1-in-0. Inbound deserialization done at the broker. || CLASS_NAME=org.springframework.cloud.stream.binder.kafka.streams.AbstractKafkaStreamsBinderProcessor
2024-05-24 20:46:06.495 INFO  [main] KafkaStreamsFunctionProcessor : Key Serde used for stream2-in-0: org.apache.kafka.common.serialization.Serdes$StringSerde || CLASS_NAME=org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsFunctionProcessor
2024-05-24 20:46:06.497 INFO  [main] KafkaStreamsFunctionProcessor : Value Serde used for stream2-in-0: org.apache.kafka.common.serialization.Serdes$StringSerde || CLASS_NAME=org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsFunctionProcessor
2024-05-24 20:46:06.497 INFO  [main] AbstractKafkaStreamsBinderProc: Native decoding is enabled for stream2-in-0. Inbound deserialization done at the broker. || CLASS_NAME=org.springframework.cloud.stream.binder.kafka.streams.AbstractKafkaStreamsBinderProcessor
...

versions Java version 17 org.springframework.boot:spring-boot-starter-parent:3.2.1 org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:4.0.3

Expected behavior Both stream bindings should start and function consistently OR provide more information via logging about what is causing only one to be created.

sobychacko commented 1 month ago

Could you put this together in a runnable app? Please remove all the security configurations. If there is an underlying issue, it should be reproducible without any security config. Also, can you try not using the function/binding names as application id's? I don't think that matters, but it's worth a try. Thanks!

jbrelje commented 1 month ago

thank you for the response!

I have pushed a demo here: https://github.com/jbrelje/multiple-kstreams-demo

While sanitizing the demo app, I believe I have found the root of the issue. It seems that if the @Configuration class contains two methods with the same name, then the behavior is inconsistent on start up. If the functional bindings are uniquely named, I am getting the expected behavior. The demo I pushed showcases the duplicate named methods that seem to be causing the issue.

I was able to resolve the issues locally by simply altering the method names, but figured I would share anyways.

sobychacko commented 1 month ago

@jbrelje, I am curious how you ended up having the same method names in the same configuration class. Did you have two separate inner classes in the same configuration?

jbrelje commented 1 month ago

No, it was just an oversight.

I assumed since only one was annotated with @Bean (and they had different signatures) it would be okay.

sobychacko commented 1 month ago

I tend to think that the workaround is an actual solution. It might be better to name bean methods differently in the same class. Or do you have any use cases to address it any further?

jbrelje commented 1 month ago

Nope, I agree. That solution is good for me. Thanks for the discussion!