spring-cloud / spring-cloud-stream-binder-aws-kinesis

Spring Cloud Stream binder for AWS Kinesis
Apache License 2.0
99 stars 97 forks source link

Consumer is too slow #44

Closed stavalfi closed 6 years ago

stavalfi commented 6 years ago

Both producer and consumer running on Windows 7, Java 8.

My consumer read 1 massage in 1-5 seconds. What is the problem with my settings?

Producer:

@EnableBinding(Source.class)
@SpringBootApplication
public class SpringCloudStreamKinesisProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringCloudStreamKinesisProducerApplication.class, args);
    }

    @InboundChannelAdapter(value = Source.OUTPUT)
    public String source() {
        String payload = new Date().toString();
        System.out.println("producer sending: " + payload + ".....");
        return "from producer: " + payload;
    }
}

Consumer:

@EnableBinding({Sink.class})
@SpringBootApplication
public class SpringCloudStreamKinesisConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringCloudStreamKinesisConsumerApplication.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void logger(String payload) {
        System.out.println("consumer received: " + payload);
    }
}

producer application.yml:

originator: KinesisProducer
server:
 port: 64397
management:
 port: 8083
 context-path: /manage

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: test_stream
          content-type: application/json
          producer: 
            partitionKeyExpression: "1"
      kinesis:
        binder:
          checkpoint:
            write-capacity: 1000

cloud:  
  aws:  
    region:  
      static: us-east-2

logging: 
  level:
    com: 
      amazonaws: INFO
    org:
      apache: 
        http: INFO

consumerapplication.yml:

originator: KinesisConsumer
server:
 port: 64399
management:
 port: 8082
 context-path: /manage

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: test_stream
          content-type: application/json
      kinesis:
        binder:
          checkpoint:
            read-capacity: 1000
            create-delay: 0

cloud:  
  aws:  
    region:  
      static: us-east-2

logging: 
  level:
    com: 
      amazonaws: INFO
    org:
      apache: 
        http: INFO

Both producers are separated modules in the same project which inherit from the following pom:

    <parent>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-build</artifactId>
        <version>2.0.0.BUILD-SNAPSHOT</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <version>2.0.0.BUILD-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kinesis</artifactId>
            <version>1.0.0.BUILD-SNAPSHOT</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-security</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-rest</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
artembilan commented 6 years ago

What I can say here that you use a default PollerMetadata on the producer side via that @InboundChannelAdapter(value = Source.OUTPUT) and it is like:

    /**
     * Fixed delay for default poller.
     */
    private long fixedDelay = 1000L;

    /**
     * Maximum messages per poll for the default poller.
     */
    private long maxMessagesPerPoll = 1L;

See ChannelBindingAutoConfiguration. So, you produce really only one message per second.

Now on the consumer side.

There is an option like:

/**
 * The sleep interval in milliseconds used in the main loop between shards polling cycles.
 * Defaults to {@code 1000}l minimum {@code 250}.
 * @param idleBetweenPolls the interval to sleep between shards polling cycles.
 */
public void setIdleBetweenPolls(int idleBetweenPolls) {
    this.idleBetweenPolls = Math.max(250, idleBetweenPolls);
}

And this one second delay between polls for the stream shard is one of the AWS requirements: https://docs.aws.amazon.com/streams/latest/dev/kinesis-low-latency.html

Therefore your testing results.

Hope that's clear.

stavalfi commented 6 years ago

Thanks for the fast response.

I searched for a way to override these default properties but failed to do so.

I saw here: https://spring.io/blog/2017/03/09/spring-integration-extension-for-aws-1-1-0-m1-available

@Bean
public KinesisMessageDrivenChannelAdapter kinesisMessageDrivenChannelAdapter() {
    KinesisMessageDrivenChannelAdapter adapter =
            new KinesisMessageDrivenChannelAdapter(amazonKinesis(), STREAM1);
    adapter.setOutputChannel(kinesisChannel());
    adapter.setCheckpointStore(checkpointStore());
    adapter.setCheckpointMode(CheckpointMode.manual);
    adapter.setListenerMode(ListenerMode.batch);
    adapter.setStartTimeout(10000);
    adapter.setDescribeStreamRetries(1);
    adapter.setConcurrency(10);
    return adapter;
}

I would like to override those properties from the application.yml but there is no "idleBetweenPolls" property.

Update:

I saw that you support this option but not in the current version. Can you tell me where to put the above code and how to fill: amazonKinesis(),kinesisChannel() ?

artembilan commented 6 years ago

Well, there is one. It is per binding configuration:

spring:
  cloud:
    stream:
      kinesis:
        bindings:
           <channelName>:
              consumer: 
                idleBetweenPolls: 250
stavalfi commented 6 years ago

The binder ignore this.

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: test_stream
          content-type: application/json
      kinesis:
        bindings:
          input:
            consumer:
              idleBetweenPolls: 250

At runtime: 1

artembilan commented 6 years ago

Oh! My mistake. We don’t need that kinesis: in between. Everything should be a part of the binding.

stavalfi commented 6 years ago

Thanks again but it's still not working.

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: test_stream
          content-type: application/json
          consumer:
            idleBetweenPolls: 250

I changed menualy in debug time this parameter but the consumer receive 1-2 elements in 10 sec +-. What is the problem?

artembilan commented 6 years ago

Closed in favor of StackOverflow question: https://stackoverflow.com/questions/49229500/spring-kinesis-consumer-is-too-slow