spring-cloud / spring-cloud-stream-samples

Samples for Spring Cloud Stream
Apache License 2.0
956 stars 676 forks source link

EmbeddedKafka unit testing a KStream Function with specific Avro #201

Closed ddhanak closed 2 years ago

ddhanak commented 3 years ago

I am trying to run a unit test of a KStream function that uses Avro to serialize/deserialize.

    @Bean
    public Function<KStream<String, THF13>, KStream<String, THF13>> process() {
        return input -> input
                .map((key, value) -> new KeyValue<>("v1", value))
                .peek((key, value) -> System.out.println("Key: " + key + " Value: " + value));
    }

Here is my application.yml for my unit test

spring.cloud.stream:
  function:
    definition: process
  bindings:
    process-in-0:
      destination: test1
    process-out-0:
      destination: test2
      producer:
        useNativeEncoding: true
  kafka:
    streams:
      binder:
        applicationId: "my-test"
        configuration:
          schema.registry:
            url: "mock://testurl"
          default:
            key.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
            value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

And the unit test:

    @Test
    public void failingTest() {
        Map<String, Object> producerProps = new HashMap<>();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class);
        producerProps.put("schema.registry.url", "mock://testurl");
        DefaultKafkaProducerFactory<String, THF13> pf = new DefaultKafkaProducerFactory<>(producerProps);
        KafkaTemplate kafkaTemplate = new KafkaTemplate<>(pf, true);
        kafkaTemplate.setDefaultTopic(INPUT_TOPIC);
        kafkaTemplate.send(INPUT_TOPIC, "v0", getThf13());

        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(GROUP_NAME, "true", broker);
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class);
        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-1");
        consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        consumerProps.put("schema.registry.url", "mock://testurl");
        DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps);
        Consumer consumer = cf.createConsumer();
        broker.consumeFromEmbeddedTopics(consumer, OUTPUT_TOPIC);

        ConsumerRecords<String, THF13> records = KafkaTestUtils.getRecords(consumer, 10000);
        assertThat(records).hasSize(1);
    }

The function does get triggered if I put a breakpoint at the peek line; however, the consumer never catches the messages that should be redirected to test2 by the Function from my understanding.

From my understanding, the message should be sent to test1, processed by the function and then sent over to test2 but nothing seems to be happening.

I am also seeing strange errors in my console:

java.lang.ClassCastException: class com.sun.proxy.$Proxy94 cannot be cast to class org.springframework.messaging.MessageChannel (com.sun.proxy.$Proxy94 and org.springframework.messaging.MessageChannel are in unnamed module of loader 'app')
    at org.springframework.cloud.stream.test.binder.TestSupportBinder.bindProducer(TestSupportBinder.java:66) ~[spring-cloud-stream-test-support-3.1.2.jar:3.1.2]
    at org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:313) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:282) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
    at org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:291) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindOutputs(AbstractBindableProxyFactory.java:136) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.doStartWithBindable(OutputBindingLifecycle.java:58) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
    at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
    at org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:34) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.5.jar:5.3.5]
    at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:769) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:761) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:426) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:326) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.test.context.SpringBootContextLoader.loadContext(SpringBootContextLoader.java:123) ~[spring-boot-test-2.4.4.jar:2.4.4]
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99) ~[spring-test-5.3.5.jar:5.3.5]
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:124) ~[spring-test-5.3.5.jar:5.3.5]
    at org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:124) ~[spring-test-5.3.5.jar:5.3.5]
    at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.injectDependencies(DependencyInjectionTestExecutionListener.java:118) ~[spring-test-5.3.5.jar:5.3.5]
    at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.prepareTestInstance(DependencyInjectionTestExecutionListener.java:83) ~[spring-test-5.3.5.jar:5.3.5]
    at org.springframework.boot.test.autoconfigure.SpringBootDependencyInjectionTestExecutionListener.prepareTestInstance(SpringBootDependencyInjectionTestExecutionListener.java:43) ~[spring-boot-test-autoconfigure-2.4.4.jar:2.4.4]
    at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:244) ~[spring-test-5.3.5.jar:5.3.5]
    at org.springframework.test.context.junit.jupiter.SpringExtension.postProcessTestInstance(SpringExtension.java:138) ~[spring-test-5.3.5.jar:5.3.5]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeTestInstancePostProcessors$6(ClassBasedTestDescriptor.java:350) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.executeAndMaskThrowable(ClassBasedTestDescriptor.java:355) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeTestInstancePostProcessors$7(ClassBasedTestDescriptor.java:350) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) ~[na:na]
    at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) ~[na:na]
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1654) ~[na:na]
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[na:na]
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) ~[na:na]
    at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.forEachRemaining(StreamSpliterators.java:312) ~[na:na]
    at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:735) ~[na:na]
    at java.base/java.util.stream.Streams$ConcatSpliterator.forEachRemaining(Streams.java:734) ~[na:na]
    at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) ~[na:na]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeTestInstancePostProcessors(ClassBasedTestDescriptor.java:349) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$instantiateAndPostProcessTestInstance$4(ClassBasedTestDescriptor.java:270) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.instantiateAndPostProcessTestInstance(ClassBasedTestDescriptor.java:269) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$2(ClassBasedTestDescriptor.java:259) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
    at java.base/java.util.Optional.orElseGet(Optional.java:369) ~[na:na]
    at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$3(ClassBasedTestDescriptor.java:258) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
    at org.junit.jupiter.engine.execution.TestInstancesProvider.getTestInstances(TestInstancesProvider.java:31) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$prepare$0(TestMethodTestDescriptor.java:101) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:100) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:65) ~[junit-jupiter-engine-5.7.1.jar:5.7.1]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$prepare$1(NodeTestTask.java:111) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.prepare(NodeTestTask.java:111) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:79) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51) ~[junit-platform-engine-1.7.1.jar:1.7.1]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108) ~[junit-platform-launcher-1.7.1.jar:1.7.1]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) ~[junit-platform-launcher-1.7.1.jar:1.7.1]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54) ~[junit-platform-launcher-1.7.1.jar:1.7.1]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) ~[junit-platform-launcher-1.7.1.jar:1.7.1]
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) ~[junit-platform-launcher-1.7.1.jar:1.7.1]
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96) ~[junit-platform-launcher-1.7.1.jar:1.7.1]
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75) ~[junit-platform-launcher-1.7.1.jar:1.7.1]
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71) ~[junit5-rt.jar:na]
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) ~[junit-rt.jar:na]
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221) ~[junit-rt.jar:na]
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) ~[junit-rt.jar:na]

And here is my pom.xml if needed:

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <!-- Avro stuff -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-streams-avro-serde</artifactId>
            <version>${confluent.version}</version>
        </dependency>

        <!-- Serializer stuff -->
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-api</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- Schema Registry stuff -->
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-schema-registry-client</artifactId>
            <version>${confluent.version}</version>
        </dependency>
    </dependencies>

Is there any documentation on how to test a KStream function? TIA.