spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 302 forks source link

Batch mode is not working #1221

Closed avramakrishna closed 1 year ago

avramakrishna commented 1 year ago

I see this reference in https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RC1/reference/html/spring-cloud-stream-binder-kafka.html#_consuming_batches that starting with version 3.0, we can use batchmode. I am using spring cloud stream 3.2.5. Though I have been able to get this working using @Streamlistener, I did not have success with Functional Style. Here's my pom.xml.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>  
    <groupId>com.example</groupId>
    <artifactId>springcloudfunction</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springcloudfunction</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>11</java.version>
                <avro.version>1.10.0</avro.version>
                <confluent.version>5.3.0</confluent.version>

    </properties>
    <dependencies>

    <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>netty</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- serdes package for serializing data in kafka streams -->
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-streams-avro-serde</artifactId>
            <version>${confluent.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
            <version>3.2.5</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <version>3.2.5</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <version>3.2.5</version>
            <scope>test</scope>
            <classifier>test-binder</classifier>
            <type>test-jar</type>
        </dependency>
    </dependencies>

<build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Here's my application.yml:

spring:
  application:
    name: kafka-spriing-function
  cloud:
    stream:
      kafka:

        streams:
          binder:
            application-id: ecp-azure-spriing-function-1
            auto-create-topics: false
            auto-add-partitions: false
            configuration:function:
    function:
      definition: processIndividualStream
  kafka:
    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:<kafka-broker>}
    properties:
      security.protocol: ${SPRING_KAFKA_PROPERTIES_SECURITY_PROTOCOL:SSL}
      ssl:
        endpoint:
          identification:
            algorithm:
    consumer:
      group-id: ${GROUP_ID:ecp-azure-ramtesting}
      key-deserializer: org.apache.kafka.common.serialization.Serdes$StringSerde
      value-deserializer: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
      enable-auto-commit: false
      max-poll-records: 500
      auto-offset-reset: ${OFFSET_RESET:earliest}
      properties:
        bootstrap-servers: "<kafka-broker-url>"
        enable.auto.commit: false
        max.poll.interval.ms: 2147483647
        max.poll.records: 500
        partition.assignment.strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor
        request.timeout.ms: 2147483647

    ssl:
      key-password: ${KAFKA_KEYSTORE_PASSWORD:xxx}
      key-store-location: ${KAFKA_KEYSTORE_LOCATION:<keystorelocation>}
      key-store-password: ${KAFKA_KEYSTORE_PASSWORD:xxx}
      trust-store-location: ${KAFKA_TRUSTSTORE_LOCATION:<truststorelocation>}
      trust-store-password: ${KAFKA_TRUSTSTORE_PASSWORD:xxxxx}
      endpoint.identification.algorithm: ${ENDPOINT_ALGORITHM}

spring.cloud.stream.bindings.processIndividualStream-in-0.destination: ${INPUT_TOPIC:goldenrecord.v1}
spring.cloud.stream.kafka.streams.bindings.processIndividualStream-in-0.consumer.valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde

spring.cloud.stream.kafka.streams.bindings.processIndividualStream-in-0.consumer.application-id: ${spring.cloud.stream.kafka.streams.binder.application-id}-individual

spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url: ${KAFKA_SCHEMA_REGISTRY_URL:<kafka-schemaregistry-url>}

spring.cloud.stream.bindings.processIndividualStream-in-0.consumer.batch-mode: true

Here's my code:

@Bean
public Consumer<List<GenericRecord>> processIndividualStream() {

    return x -> {
        x.stream().forEach(System.out::println);
    };

}

when I run, I get the following exception:

org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : , and no default binder has been set. at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.23.jar:5.3.23] at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.23.jar:5.3.23] at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.23.jar:5.3.23] at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na] at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.23.jar:5.3.23] at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.23.jar:5.3.23] at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.23.jar:5.3.23] at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.23.jar:5.3.23] at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:147) ~[spring-boot-2.7.4.jar:2.7.4] at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:734) ~[spring-boot-2.7.4.jar:2.7.4] at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:408) ~[spring-boot-2.7.4.jar:2.7.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:308) ~[spring-boot-2.7.4.jar:2.7.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1306) ~[spring-boot-2.7.4.jar:2.7.4] at org.springframework.boot.SpringApplication.run(SpringApplication.java:1295) ~[spring-boot-2.7.4.jar:2.7.4] at com.example.springcloudfunction.SpringcloudfunctionApplication.main(SpringcloudfunctionApplication.java:10) ~[classes/:na] Caused by: java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : , and no default binder has been set. at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:210) ~[spring-cloud-stream-3.2.5.jar:3.2.5] at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:151) ~[spring-cloud-stream-3.2.5.jar:3.2.5] at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:402) ~[spring-cloud-stream-3.2.5.jar:3.2.5] at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:106) ~[spring-cloud-stream-3.2.5.jar:3.2.5] at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:118) ~[spring-cloud-stream-3.2.5.jar:3.2.5] at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) ~[spring-cloud-stream-3.2.5.jar:3.2.5] at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na] at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.2.5.jar:3.2.5] at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) ~[spring-cloud-stream-3.2.5.jar:3.2.5] at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.23.jar:5.3.23] ... 14 common frames omitted

But when I use the following code, it starts to consume messages and work just fine

@Bean
public Consumer<KStream<String, GenericRecord>> processIndividualStream() {
    return x -> {
        x.foreach((k, v) -> {
            System.out.println(k);
        });
    };

}

This is the signature for processing as a stream, but as described in https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RC1/reference/html/spring-cloud-stream-binder-kafka.html#_consuming_batches, when batch mode is set, it should accept a List instead of KStream. I tried different versions of spring cloud stream, but of no help.

sobychacko commented 1 year ago

@avramakrishna This repository is no longer maintained. See the announcement here: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/main/README.adoc. Closing this issue. Please create a new issue in core Spring Cloud Stream. Thanks!