spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

Use of @EmbeddedKafka with Kafka Streams binder #1192

Closed ghost closed 2 years ago

ghost commented 2 years ago

Good morning!

I'm having issues when I try to run a test with @EmbeddedKafka with KStreams, so I need your help to make it work.

This is my current test class:

@TestInstance(Lifecycle.PER_CLASS)
@ExtendWith(SpringExtension.class)
@DirtiesContext
@SpringBootTest
@EmbeddedKafka(bootstrapServersProperty = "spring.cloud.stream.kafka.streams.binder.brokers",
               topics = { "input-topic", "output-topic" },
               brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class StreamFunctionIT {

    @Value("${spring.cloud.stream.bindings.enrich-in-0.destination}")
    private String inputTopic;

    @Value("${spring.cloud.stream.bindings.enrich-out-0.destination}")
    private String outputTopic;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    private BlockingQueue<ConsumerRecord<UUID, ValueResponse>> records;
    private KafkaMessageListenerContainer<UUID, ValueResponse> container;
    private Producer<UUID, ValueRequest> producer;

    @BeforeAll
    void setUp() {
        // Consumer set-up
        final Map<String, Object> consumerConfig =
                new HashMap<>(KafkaTestUtils.consumerProps("enricher", "false", this.embeddedKafkaBroker));
        final DefaultKafkaConsumerFactory<UUID, ValueResponse> consumerFactory =
                new DefaultKafkaConsumerFactory<>(consumerConfig, new Serdes.UUIDSerde().deserializer(),
                        new JsonSerde<>(ValueResponse.class).deserializer());
        final ContainerProperties containerProperties = new ContainerProperties(this.outputTopic);
        this.container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        this.records = new LinkedBlockingQueue<>();
        this.container.setupMessageListener((MessageListener<UUID, ValueResponse>) this.records::add);
        this.container.start();
        ContainerTestUtils.waitForAssignment(this.container, this.embeddedKafkaBroker.getPartitionsPerTopic());
        // Producer set-up
        final Map<String, Object> producerConfig =
                new HashMap<>(KafkaTestUtils.producerProps(this.embeddedKafkaBroker));
        this.producer = new DefaultKafkaProducerFactory<>(producerConfig, new Serdes.UUIDSerde().serializer(),
                new JsonSerde<>(ValueRequest.class).serializer()).createProducer();
    }

    @AfterAll
    void tearDown() {
        this.container.stop();
    }

    @Test
    void testEnrich() throws InterruptedException {
        this.records.clear();
        // Create message key and value
        this.producer.send(new ProducerRecord<>(this.inputTopic, UUID.randomUUID(), value));
        final ConsumerRecord<UUID, ValueResponse> output = this.records.poll(10, TimeUnit.SECONDS);
        // Assertions
    }

}

This is my YAML configuration:

spring:
  cloud:
    stream:
      function:
        definition: enrich
      bindings:
        enrich-in-0:
          destination: input-topic
        enrich-out-0:
          destination: output-topic
      kafka:
        streams:
          binder:
            functions:
              enrich:
                applicationId: enricher
          bindings:
            enrich-in-0:
              consumer:
                keySerde: org.apache.kafka.common.serialization.Serdes$UUIDSerde
                valueSerde: org.springframework.kafka.support.serializer.JsonSerde
            enrich-out-0:
              producer:
                keySerde: org.apache.kafka.common.serialization.Serdes$UUIDSerde
                valueSerde: org.springframework.kafka.support.serializer.JsonSerde

This is the Spring Cloud Stream bean definition:

@Configuration
class StreamConfiguration {

    @Bean
    Function<KStream<UUID, ValueRequest>, KStream<UUID, ValueResponse>> enrich(
            final EnrichService<ValueRequest, ValueResponse> enrichService) {
        return input -> input.mapValues(enrichService::enrich);
    }

}

And this is my pom.xml (only relevant dependencies included):

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.2</version>
        <relativePath/>
    </parent>
    <groupId>com.example.kstream</groupId>
    <artifactId>message-enricher</artifactId>
    <version>0.0.1-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>17</java.version>
        <spring-cloud-stream.version>3.2.1</spring-cloud-stream.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <version>${spring-cloud-stream.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
            <version>${spring-cloud-stream.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-failsafe-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

When I run my tests, the request message is sent to the input topic, but the "enrich" function does not read it, so I'm guessing that I'm missing some of the configuration to make it work properly (I've made an equivalent test using Kafka binder and everything works as expected).

Thank you very much for your time and support!

sobychacko commented 2 years ago

@ecristobal I haven't run your test, but could you compare them with the following test setup in the samples repo?

https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/kafka-streams-samples/kafka-streams-word-count/src/test/java/kafka/streams/word/count

This test in particular looks very similar to your setup.

Here is another one for reference:

https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/kafka-streams-samples/kafka-streams-inventory-count/src/test/java/kafka/streams/inventory/count

If you are still having issues, could you create a small self-contained sample application? That way we can triage this further.

ghost commented 2 years ago

Hi @sobychacko,

I've tries to run the KafkaStreamsWordCountApplicationTests as-is and I get the following error:

org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'com.decathlon.member.enrich.StreamFunction2IT': Unsatisfied dependency expressed through field 'streamsBuilderFactoryBean'; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.config.StreamsBuilderFactoryBean' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {@org.springframework.beans.factory.annotation.Autowired(required=true)}

at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.resolveFieldValue(AutowiredAnnotationBeanPostProcessor.java:659) at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.inject(AutowiredAnnotationBeanPostProcessor.java:639) at org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:119) at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor.postProcessProperties(AutowiredAnnotationBeanPostProcessor.java:399) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1431) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.autowireBeanProperties(AbstractAutowireCapableBeanFactory.java:417) at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.injectDependencies(DependencyInjectionTestExecutionListener.java:119) at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.prepareTestInstance(DependencyInjectionTestExecutionListener.java:83) at org.springframework.boot.test.autoconfigure.SpringBootDependencyInjectionTestExecutionListener.prepareTestInstance(SpringBootDependencyInjectionTestExecutionListener.java:43) at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:248) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.createTest(SpringJUnit4ClassRunner.java:227) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner$1.runReflectiveCall(SpringJUnit4ClassRunner.java:289) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.methodBlock(SpringJUnit4ClassRunner.java:291) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:246) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38) at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11) at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35) at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235) at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.config.StreamsBuilderFactoryBean' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {@org.springframework.beans.factory.annotation.Autowired(required=true)} at org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1799) at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1355) at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1309) at org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor$AutowiredFieldElement.resolveFieldValue(AutowiredAnnotationBeanPostProcessor.java:656) ... 36 more

Maybe all the Kafka Streams Binder stuff is not loading on Spring's application context?

It's a bit weird, but if I switch to the Apache Kafka Binder (removing KStreams, replacing the binder in Maven and updating application.yml accordingly) with my test everything works as expected. Maybe am I missing something (but I don't find it)?

ghost commented 2 years ago

@sobychacko just FYI, if I remove from your test the "StreamsBuilderFactoryBean" bean, your test has the same behavior than mine: the message is sent but the application does not read it.

Thanks!

sobychacko commented 2 years ago

The test for kafka-streams-word-count in the samples repository runs as expected on my end. Could you please create a small reproducible app with a sample test so that we can debug the issue further?

ghost commented 2 years ago

Of course! You can clone it from https://github.com/ecristobal/example-scs.git

There are 2 integration tests: one is my first version and the other is the adaptation of yours. I've tested both right now and none of them work, but if you want focus on yours and once it works I'll move my testing logic to your test.

Thanks!

sobychacko commented 2 years ago

@ecristobal Sorry for the delay in getting back to you. I was able to look at your code sample. I think the problem is due to this line. Note that the bean is declared with default access. It needs to be a public method. Change that to public and you will see that the function is invoked. I was able to run the test you copied from the samples repo once I made that change. However, I ran into further issues, but I think you can fix them once you address this.

ghost commented 2 years ago

@sobychacko thanks for your tip! With it and other minor changes everything is working as expected. However, given that this same code with the default bean visibility and Kafka binder was working fine, I don't know if this is something that should be clarified on the documentation or should be fixed on future releases.

Thank you very much for your support!

sobychacko commented 2 years ago

@ecristobal I will look into docs. Also, I will see why the visibility is causing the issue for Kafka Streams binder.