spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

KStream function detection #1030

Closed pasmikm closed 3 years ago

pasmikm commented 3 years ago

Hello,

in general we prefer to define functional listener beans as classes, for example

@Component
public class AccountMapper implements Function<AccountMessage, EnhancedAccountMessage> {
  @Override
  public EnhancedAccountMessage apply(AccountMessage message)

with spring.cloud.stream.function.definition=accountMapper or

@Component
public class ApplicationConsumer implements Function<KStream<String, AccountMessage>,
        Function<KStream<String, ApplicationMessage>, Consumer<KStream<String, ApplicationSettingsMessage>>>> {
  @Override
  public Function<KStream<String, ApplicationMessage>,
      Consumer<KStream<String, ApplicationSettingsMessage>>> apply(
      KStream<String, AccountMessage> accountStream)

with spring.cloud.stream.function.definition=applicationConsumer

This works nicely for the first case, but unfortunately doesn't work for functions with KStreams. There is logic in FunctionDetectorCondition which checks methods and ignores this kind of function unless we add this as a workaround:

  /* FunctionDetectorCondition passes only when there is a method that matches value from
   spring.cloud.stream.function.definition plus its return type definition must match actual interface */
  public Function<KStream<String, AccountMessage>,
        Function<KStream<String, ApplicationMessage>, Consumer<KStream<String,
                ApplicationSettingsMessage>>>> applicationConsumer() {
      throw new UnsupportedOperationException("Workaround for KStream's FunctionDetectorCondition");
  }

Another case where this method checking breaks the binding is when method and bean have different names (although I admit this is an unusual example, it still shouldn't just ignore the function):

import java.util.function.Function;

import org.apache.kafka.streams.kstream.KStream;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import lombok.Data;

@Component
public class IgnoredExtractor {

    @Data
    public static class SomeData {
        private final String data;
    }

    @Bean(name = "changed")
    public Function<KStream<String, SomeData>, KStream<String, String>> extract() {
        return someDataKStream -> someDataKStream.mapValues(SomeData::getData);
    }
}

and spring.cloud.stream.function.definition=changed

Could you please check why FunctionDetectorCondition works like this or if it could be updated to accept also these rare cases? It should be possible to check the apply/accept method of Function/BiFunction/Consumer/BiConsumer bean directly. It would probably have to be split for Functions and Consumers and you'd have to check also method parameters, but it would be more robust :)

sobychacko commented 3 years ago

@pasmikm Thank you for this detailed report. The fact that Component beans declared in the applications are not scanned by the binder is a gap, which we can address. However, the bigger question is what you are trying to accomplish with the function signature above. Kafka Streams functional binding has certain limitations when it comes to the programming model. The main one is that the outbound type must be a KStream if you have an outbound binding. The kind of functional composition available with regular message channel binders is not available at the moment in Kafka Streams binder. I am noticing that your return signature is the following:

Function<KStream<String, ApplicationMessage>,
      Consumer<KStream<String, ApplicationSettingsMessage>>>

Basically, you have a single input binding and then produces another Function that has KStream -> java.util.functions.Consumer. This may not work as you intend. Ultimately, by looking at the signature, you don't have an output binding since your terminal type is a Consumer, in other words, what you have there is a consumer function that has a single input on which you are doing some transformations. Why can't you do the following instead?

Consumer<KStream<String, AccountMessage>> consumer() {

  return ks -> {
     // Do the transformation on this KStream to generate 
    // ApplicationMessage and ApplicationSettingsMessage

  }

Obviously, I don't have all the context about your use case, but that is something that should work. On the other hand, if you have multiple inputs, then you should look at BiFunction, BiConsusumer or curried functions. See this section from the docs.

In any case, we will try to address the @Component bean scanning gap in the binder.

pasmikm commented 3 years ago

Thanks for the reply, what we have is a a consumer of 3 KStreams which need to be joined, original method signature was

@StreamListener
public void process(
        @Input(ACCOUNT_INPUT) KStream<String, AccountMessage> accountStream,
        @Input(APPLICATION_INPUT) KStream<String, ApplicationMessage> applicationStream,
        @Input(APPLICATION_SETTINGS_INPUT)  KStream<String, ApplicationSettingsMessage> applicationSettingsStream) {

Since we found no example for this case, we tried using the curried Function< Function< Consumer >> which works as expected. We also tried Function< BiConsumer > currying, but it doesn't seem to work (not sure why, we didn't investigate since we already had a working solution).

sobychacko commented 3 years ago

@pasmikm Sorry, I misunderstood your function. Yes, the curried form of Function<Function<Consumer>> should work.

sobychacko commented 3 years ago

@pasmikm Here is the PR addressing the Component beans issue: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/pull/1033. The associated test in the PR covers a handful of scenarios.

sobychacko commented 3 years ago

@pasmikm The issue with bean name overriding in Kafka Streams functions is also addressed on the same PR.

sobychacko commented 3 years ago

@pasmikm This feature is now part of the 3.1.2 release of the binder. Please check it out and see if it meets your needs. Any feedback/suggestions are appreciated.

pasmikm commented 3 years ago

@sobychacko Thanks for the update, it looks good for cases without curried functions, but our 3-KStream consumer fails with Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 1 out of bounds for length 1 at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsFunctionProcessor.setupFunctionInvokerForKafkaStreams(KafkaStreamsFunctionProcessor.java:301) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]

Problem is that only applicationConsumer-in-0 is detected instead of all 3 inputs. The issue starts in KafkaStreamsFunctionBeanPostProcessor#extractResolvableTypes for apply/accept method: final ResolvableType resolvableType = ResolvableType.forMethodParameter(method, 0); but the apply method is quite ugly and only part of it is in method parameters and all currying is in its return type: public Function<KStream<String, ApplicationMessage>, Consumer<KStream<String, ApplicationSettingsMessage>>> apply( KStream<String, AccountMessage> accountStream)

This means only KStream<String, AccountMessage> is passed as resolvable type => only one input detected, but later in KafkaStreamsFunctionProcessor#setupFunctionInvokerForKafkaStreams full Functions chain is being checked and there's not enough inputs. So in addition to method parameter it still needs to pick up applicationConsumer-in-1 (for ApplicationMessage) and applicationConsumer-in-2 (for ApplicationSettingsMessage) from the return type. But I understand it's quite complicated and we found an unusual edge case :)

sobychacko commented 3 years ago

@pasmikm Thanks for the feedback. We have a test case for curried consumers. https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/master/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsComponentBeansTests.java#L319

and the one below.

They both pass. How are they different from your function?

Thanks!

pasmikm commented 3 years ago

After downloading the test class to our project it fails with the same error, so we checked our dependencies and there don't seem to be anything suspicious, so we created a new empty project simply with

spring-boot-dependencies:2.4.2
spring-cloud-dependencies:2020.0.2
spring-boot-starter
spring-cloud-stream-binder-kafka-streams
+test dependencies

After copying the test class there it fails with

Caused by: java.lang.IllegalArgumentException: 'listener' cannot be null
    at org.springframework.util.Assert.notNull(Assert.java:201)
    at org.springframework.kafka.config.StreamsBuilderFactoryBean.addListener(StreamsBuilderFactoryBean.java:268)
    at org.springframework.cloud.stream.binder.kafka.streams.StreamsBuilderFactoryManager.start(StreamsBuilderFactoryManager.java:84)

In KafkaStreamsBinderSupportAutoConfiguration it's @Nullable KafkaStreamsMicrometerListener listener but StreamsBuilderFactoryManager doesn't check for null. But that's a completely different issue :)

Anyway, the test still fails after adding Micrometer, so I'm attaching the simple project, just unpack + run mvn test kstream-component-beans-test.zip

pasmikm commented 3 years ago

Also you should probably update the test class to JUnit5. JUnit4 runner is no longer included in spring-boot-test, that's why there's junit-vintage dependency in the attached example, otherwise Maven would simply skip the class as not containing any tests.

sobychacko commented 3 years ago

@pasmikm Thank you very much for the feedback. I will take a look at these issues.

sobychacko commented 3 years ago

@pasmikm I have a new commit on master where this issue is fixed. I tested it with the project you shared and all the tests pass now. Please make sure that you use version 3.1.3-SNAPSHOT for spring-cloud-stream-binder-kafka-streams. You also don't need to add the micrometer dependency now as we are only adding it to StreamsBuilder if it is not null. Please let us know if it works on your end.

pasmikm commented 3 years ago

@sobychacko all tests pass now - it looks great, good job and thanks! Shall I close this issue or do you usually wait until release?

sobychacko commented 3 years ago

Good to know that it works for you. Yes, please go ahead and close it.

outminder commented 3 years ago

cannot find 3.1.3-SNAPSHOT in the repo. How can I include it, facing listener null issue?

garyrussell commented 3 years ago

Add this repo to your pom/build.gradle.

https://repo.spring.io/snapshot

However, don't use snapshots in production.