spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

Method composition not work #1012

Closed benkeil closed 3 years ago

benkeil commented 3 years ago

My functions:

@Configuration
public class StringStream {

    @Bean
    public Function<KStream<Bytes, String>, KStream<Bytes, String>> reverse() {
        return strings -> strings.mapValues(StringUtils::reverse);
    }

    @Bean
    public Function<KStream<Bytes, String>, KStream<Bytes, String>> uppercase() {
        return strings -> strings.mapValues((ValueMapper<String, String>) String::toUpperCase);
    }

}

Tried configurations:

1.

spring.cloud.stream:
  function:
    definition: reverse|uppercase
  bindings:
    reverse|uppercase-in-0:
      destination: strings
    reverse|uppercase-out-0:
      destination: strings-processed

2.

spring.cloud.stream:
  function:
    definition: reverse|uppercase
  bindings:
    reverse-in-0:
      destination: strings
    reverse-out-0:
      destination: strings-reverse
    uppercase-in-0:
      destination: strings-reverse
    uppercase-out-0:
      destination: strings-processed

3.

spring.cloud.stream:
  function:
    definition: reverse;uppercase
  bindings:
    reverse-in-0:
      destination: strings
    reverse-out-0:
      destination: strings-reverse
    uppercase-in-0:
      destination: strings-reverse
    uppercase-out-0:
      destination: strings-processed

The 3rd one seems for me the most logical way to build that "application". The 1st is like documented here and logs infinit warnings and nothing happens:

2021-01-08 21:06:54,812  WARN kafka-streams-playground-cfe0d942-27e7-4aa4-93ba-dae382cab716-StreamThread-1 o.a.k.c.c.i.ConsumerCoordinator:376 - [Consumer clientId=kafka-streams-playground-cfe0d942-27e7-4aa4-93ba-dae382cab716-StreamThread-1-consumer, groupId=kafka-streams-playground] We received an assignment [uppercase-in-0-0] that doesn't match our current subscription Subscribe(reverse-in-0); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription
2021-01-08 21:06:54,812  INFO kafka-streams-playground-cfe0d942-27e7-4aa4-93ba-dae382cab716-StreamThread-1 o.a.k.c.c.i.AbstractCoordinator:553 - [Consumer clientId=kafka-streams-playground-cfe0d942-27e7-4aa4-93ba-dae382cab716-StreamThread-1-consumer, groupId=kafka-streams-playground] (Re-)joining group
2021-01-08 21:06:54,817  INFO kafka-streams-playground-cfe0d942-27e7-4aa4-93ba-dae382cab716-StreamThread-1 o.a.k.c.c.i.AbstractCoordinator:504 - [Consumer clientId=kafka-streams-playground-cfe0d942-27e7-4aa4-93ba-dae382cab716-StreamThread-1-consumer, groupId=kafka-streams-playground] Successfully joined group with generation 14
2021-01-08 21:06:54,818  INFO kafka-streams-playground-cfe0d942-27e7-4aa4-93ba-dae382cab716-StreamThread-1 o.a.k.c.c.i.SubscriptionState:257 - [Consumer clientId=kafka-streams-playground-cfe0d942-27e7-4aa4-93ba-dae382cab716-StreamThread-1-consumer, groupId=kafka-streams-playground] Assigned partition uppercase-in-0-0 for non-subscribed topic; subscription is [reverse-in-0]
sobychacko commented 3 years ago

@benkeil Function composition is currently not supported in Kafka Streams binder. You can use multiple processors though (your 3rd case) in which case the functions communicate over Kafka topics, not through composition. If there is a compelling use case for function composition with KStream types, we can consider adding support for composition. What exactly is your use case that requires composition in Kafka Streams binder?

benkeil commented 3 years ago

First of all thank you for your answer, I struggle with this since 2 weeks... You are right, function composition makes no sense for kafka. What I really want is the configuration 3. But it does not work. There are no errors or warnings during the startup.

I double checked it with configuration 3 and saw that (every time with the exact same application.yaml) if I remove one function from the spring context the remaining function works as expected (both variants). It's only a problem if I want to use both functions together.

This looks for me like a bug...

sobychacko commented 3 years ago

The following is your case 3 configuration and it should work. Are you seeing issues when using this config?

spring.cloud.stream:
  function:
    definition: reverse;uppercase
  bindings:
    reverse-in-0:
      destination: strings
    reverse-out-0:
      destination: strings-reverse
    uppercase-in-0:
      destination: strings-reverse
    uppercase-out-0:
      destination: strings-processed
benkeil commented 3 years ago

I uploaded a simple test project: https://github.com/benkeil/kafka-streams-playground.

it prints these 4 lines infinite times:

2021-01-09 11:57:40,624  INFO kafka-streams-playground-d91676aa-602e-495c-bed2-b97e0988b8b0-StreamThread-1 o.a.k.c.c.i.AbstractCoordinator:553 - [Consumer clientId=kafka-streams-playground-d91676aa-602e-495c-bed2-b97e0988b8b0-StreamThread-1-consumer, groupId=kafka-streams-playground] (Re-)joining group
2021-01-09 11:57:40,633  INFO kafka-streams-playground-d91676aa-602e-495c-bed2-b97e0988b8b0-StreamThread-1 o.a.k.c.c.i.AbstractCoordinator:504 - [Consumer clientId=kafka-streams-playground-d91676aa-602e-495c-bed2-b97e0988b8b0-StreamThread-1-consumer, groupId=kafka-streams-playground] Successfully joined group with generation 1
2021-01-09 11:57:40,633  INFO kafka-streams-playground-d91676aa-602e-495c-bed2-b97e0988b8b0-StreamThread-1 o.a.k.c.c.i.SubscriptionState:257 - [Consumer clientId=kafka-streams-playground-d91676aa-602e-495c-bed2-b97e0988b8b0-StreamThread-1-consumer, groupId=kafka-streams-playground] Assigned partition strings-reverse-0 for non-subscribed topic; subscription is [strings]
2021-01-09 11:57:40,633  WARN kafka-streams-playground-d91676aa-602e-495c-bed2-b97e0988b8b0-StreamThread-1 o.a.k.c.c.i.ConsumerCoordinator:376 - [Consumer clientId=kafka-streams-playground-d91676aa-602e-495c-bed2-b97e0988b8b0-StreamThread-1-consumer, groupId=kafka-streams-playground] We received an assignment [strings-reverse-0] that doesn't match our current subscription Subscribe(strings); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription
sobychacko commented 3 years ago

@benkeil Thanks for the sample, I found the issue. You are defining a spring.application.name in the project and that is used as the common applicaton.id for both the Kafka Streams processors. Since you have multiple processors, you need to set unique application.id for each of them, otherwise, you will have conflicts as the WARN messages above indicate. You can either remove the spring.application.name from the configuration (in which case, the binder will auto generate a static applicaiton.id for you) or define custom application.id using the property spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId. See the docs here for more details.

benkeil commented 3 years ago

Thank you for your help!