spring-cloud / spring-cloud-stream-binder-kafka

Spring Cloud Stream binders for Apache Kafka and Kafka Streams
Apache License 2.0
331 stars 301 forks source link

Fix Pause/Resume Documentation #1195

Closed xabe closed 2 years ago

xabe commented 2 years ago

Hi

I am trying to make the example pause and resume, but it doesn't work with the latest spring cloud (2020.0.5).

I have an example on github and test that paused and resume of consumer kafka it works version spring boot 2.3.3 and Hoxton.SR3 and other branch with new version that doesn't work spring boot 2.5.9 and 2020.0.5

In the logs you can see that the ListenerContainerIdleEvent event arrives

2022-02-05 08:08:51.291 DEBUG 22327 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Received: 1 records
2022-02-05 08:08:51.315  INFO 22327 --- [container-0-C-1] com.example.springkafka.ConsumerInput    : Consumer event: message 
2022-02-05 08:08:51.317 ERROR 22327 --- [container-0-C-1] com.example.springkafka.ConsumerInput    : Error Consumer event: message 
2022-02-05 08:08:51.317 DEBUG 22327 --- [container-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-group-1, groupId=group] Pausing partitions [topic-0]
2022-02-05 08:08:51.317  INFO 22327 --- [container-0-C-1] com.example.springkafka.ConsumerInput    : Paused Consumer event: message 
2022-02-05 08:08:51.318 DEBUG 22327 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {topic-0=OffsetAndMetadata{offset=21, leaderEpoch=null, metadata=''}}
2022-02-05 08:08:51.318 DEBUG 22327 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {topic-0=OffsetAndMetadata{offset=21, leaderEpoch=null, metadata=''}}
2022-02-05 08:08:56.326 DEBUG 22327 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2022-02-05 08:08:56.326  INFO 22327 --- [container-0-C-1] c.e.springkafka.ConsumerResumeListener   : Received event ListenerContainerIdleEvent [idleTime=5.034s, listenerId=KafkaConsumerDestination{consumerDestinationName='topic', partitions=0, dlqName='null'}.container-0, container=KafkaMessageListenerContainer [id=KafkaConsumerDestination{consumerDestinationName='topic', partitions=0, dlqName='null'}.container-0, clientIndex=-0, topicPartitions=[topic-0]], paused=false, topicPartitions=[topic-0]]
2022-02-05 08:08:56.327  INFO 22327 --- [container-0-C-1] c.e.springkafka.ConsumerResumeListener   : Resume partitions ListenerContainerIdleEvent [idleTime=5.034s, listenerId=KafkaConsumerDestination{consumerDestinationName='topic', partitions=0, dlqName='null'}.container-0, container=KafkaMessageListenerContainer [id=KafkaConsumerDestination{consumerDestinationName='topic', partitions=0, dlqName='null'}.container-0, clientIndex=-0, topicPartitions=[topic-0]], paused=false, topicPartitions=[topic-0]]
2022-02-05 08:08:56.327 DEBUG 22327 --- [container-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-group-1, groupId=group] Resuming partitions [topic-0]
2022-02-05 08:08:56.327 DEBUG 22327 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {}
2022-02-05 08:08:56.335 DEBUG 22327 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Received: 1 records
2022-02-05 08:08:56.335  INFO 22327 --- [container-0-C-1] com.example.springkafka.ConsumerInput    : Consumer event: message 
2022-02-05 08:08:56.336 DEBUG 22327 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {topic-0=OffsetAndMetadata{offset=22, leaderEpoch=null, metadata=''}}
2022-02-05 08:08:56.336 DEBUG 22327 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {topic-0=OffsetAndMetadata{offset=22, leaderEpoch=null, metadata=''}}

In last version spring clould stream, ListenerContainerIdleEvent never arrives because kafkaMessageListenerContainer resume consumer in method resumePartitionsIfNecessary

2022-02-05 08:15:16.630 DEBUG 24325 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Received: 1 records
2022-02-05 08:15:16.656  INFO 24325 --- [container-0-C-1] com.example.springkafka.ConsumerInput    : Consumer event: message 
2022-02-05 08:15:16.659 ERROR 24325 --- [container-0-C-1] com.example.springkafka.ConsumerInput    : Error Consumer event: message 
2022-02-05 08:15:16.659 DEBUG 24325 --- [container-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-group-1, groupId=group] Pausing partitions [topic-0]
2022-02-05 08:15:16.659  INFO 24325 --- [container-0-C-1] com.example.springkafka.ConsumerInput    : Paused Consumer event: message 
2022-02-05 08:15:16.660 DEBUG 24325 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {topic-0=OffsetAndMetadata{offset=23, leaderEpoch=null, metadata=''}}
2022-02-05 08:15:16.660 DEBUG 24325 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {topic-0=OffsetAndMetadata{offset=23, leaderEpoch=null, metadata=''}}
2022-02-05 08:15:21.668 DEBUG 24325 --- [container-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-group-1, groupId=group] **Resuming partitions [topic-0]**
2022-02-05 08:15:21.669 DEBUG 24325 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Resumed consumption from [topic-0]
2022-02-05 08:15:21.669 DEBUG 24325 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2022-02-05 08:15:21.670  INFO 24325 --- [container-0-C-1] c.e.springkafka.ConsumerResumeListener   : Received event ListenerContainerIdleEvent [idleTime=5.039s, listenerId=KafkaConsumerDestination{consumerDestinationName='topic', partitions=0, dlqName='null'}.container-0, container=KafkaMessageListenerContainer [id=KafkaConsumerDestination{consumerDestinationName='topic', partitions=0, dlqName='null'}.container-0, clientIndex=-0, topicPartitions=[topic-0]], paused=false, topicPartitions=[topic-0]]
2022-02-05 08:15:21.670 DEBUG 24325 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {}
2022-02-05 08:15:26.671 DEBUG 24325 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2022-02-05 08:15:26.671  INFO 24325 --- [container-0-C-1] c.e.springkafka.ConsumerResumeListener   : Received event ListenerContainerIdleEvent [idleTime=10.041s, listenerId=KafkaConsumerDestination{consumerDestinationName='topic', partitions=0, dlqName='null'}.container-0, container=KafkaMessageListenerContainer [id=KafkaConsumerDestination{consumerDestinationName='topic', partitions=0, dlqName='null'}.container-0, clientIndex=-0, topicPartitions=[topic-0]], paused=false, topicPartitions=[topic-0]]
2022-02-05 08:15:26.671 DEBUG 24325 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {}
2022-02-05 08:15:31.672 DEBUG 24325 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2022-02-05 08:15:31.672  INFO 24325 --- [container-0-C-1] c.e.springkafka.ConsumerResumeListener   : Received event ListenerContainerIdleEvent [idleTime=15.042s, listenerId=KafkaConsumerDestination{consumerDestinationName='topic', partitions=0, dlqName='null'}.container-0, container=KafkaMessageListenerContainer [id=KafkaConsumerDestination{consumerDestinationName='topic', partitions=0, dlqName='null'}.container-0, clientIndex=-0, topicPartitions=[topic-0]], paused=false, topicPartitions=[topic-0]]
2022-02-05 08:15:31.672 DEBUG 24325 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {}
2022-02-05 08:15:36.673 DEBUG 24325 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2022-02-05 08:15:36.674  INFO 24325 --- [container-0-C-1] c.e.springkafka.ConsumerResumeListener   : Received event ListenerContainerIdleEvent [idleTime=20.043s, listenerId=KafkaConsumerDestination{consumerDestinationName='topic', partitions=0, dlqName='null'}.container-0, container=KafkaMessageListenerContainer [id=KafkaConsumerDestination{consumerDestinationName='topic', partitions=0, dlqName='null'}.container-0, clientIndex=-0, topicPartitions=[topic-0]], paused=false, topicPartitions=[topic-0]]
2022-02-05 08:15:36.674 DEBUG 24325 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {}

Regards

garyrussell commented 2 years ago

That just means there are no more records to consume (offsets greater than 22) after the container was resumed.

xabe commented 2 years ago

Hi @garyrussell

The problem is not the offset, but why does the kafkaMessageListenerContainer resume the kafka consumer? When should Spring Kafka send an event ListenerContainerIdleEvent with partitions paused.

The documentation example pause and resume does not work from 2.7.X and 2.8.X

my proposed solution is as follows

        private void pausePartitionsIfNecessary() {
            Long idleEventInterval = this.containerProperties.getIdleEventInterval();
            if (idleEventInterval == null) {
                Set<TopicPartition> pausedConsumerPartitions = this.consumer.paused();
                Collection<TopicPartition> partitions = getAssignedPartitions();
                if (partitions != null) {
                    List<TopicPartition> partitionsToPause = partitions
                            .stream()
                            .filter(tp -> isPartitionPauseRequested(tp)
                                    && !pausedConsumerPartitions.contains(tp))
                            .collect(Collectors.toList());
                    if (partitionsToPause.size() > 0) {
                        this.consumer.pause(partitionsToPause);
                        this.pausedPartitions.addAll(partitionsToPause);
                        this.logger.debug(() -> "Paused consumption from " + partitionsToPause);
                        partitionsToPause.forEach(KafkaMessageListenerContainer.this::publishConsumerPartitionPausedEvent);
                    }
                }
            }
        }

        private void resumePartitionsIfNecessary() {
            Long idleEventInterval = this.containerProperties.getIdleEventInterval();
            if (idleEventInterval == null) {
                Set<TopicPartition> pausedConsumerPartitions = this.consumer.paused();
                List<TopicPartition> partitionsToResume = this
                        .assignedPartitions
                        .stream()
                        .filter(tp -> !isPartitionPauseRequested(tp)
                                && pausedConsumerPartitions.contains(tp))
                        .collect(Collectors.toList());
                if (partitionsToResume.size() > 0) {
                    this.consumer.resume(partitionsToResume);
                    this.pausedPartitions.removeAll(partitionsToResume);
                    this.logger.debug(() -> "Resumed consumption from " + partitionsToResume);
                    partitionsToResume.forEach(KafkaMessageListenerContainer.this::publishConsumerPartitionResumedEvent);
                }
            }
        }

And Test

    @Test
    public void dontResumePausedIdlePartition() throws Exception {
        ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
        Consumer<Integer, String> consumer = mock(Consumer.class);
        given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
        ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
        given(consumer.assignment()).willReturn(Set.of(new TopicPartition("foo", 0)));
        given(consumer.poll(any(Duration.class))).willAnswer(i -> {
            Thread.sleep(500);
            return emptyRecords;
        });
        ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset("foo", 0));
        containerProps.setGroupId("grp");
        containerProps.setAckMode(AckMode.RECORD);
        containerProps.setClientId("clientId");
        containerProps.setIdleEventInterval(2000L);
        containerProps.setMessageListener((MessageListener) rec -> { });
        containerProps.setMissingTopicsFatal(false);
        KafkaMessageListenerContainer<Integer, String> container =
                new KafkaMessageListenerContainer<>(cf, containerProps);
        final CountDownLatch idleEventPublish = new CountDownLatch(1);
        container.setApplicationEventPublisher(event -> {
            if(event instanceof ListenerContainerIdleEvent)
            {
                idleEventPublish.countDown();
            }
        });
        container.start();
        idleEventPublish.await(10,TimeUnit.SECONDS);
        verify(consumer, never()).pause(any());
        verify(consumer, never()).resume(any());
        container.stop();
    }
garyrussell commented 2 years ago

Why did you write this issue against the binder if you believe the problem is in spring-kafka?

A similar issue was fixed some time ago https://github.com/spring-projects/spring-kafka/commit/726c48517d6fc5ccf3a6c1b7e51df65ee093c66c

It was fixed in 2.7.4; are you sure you are using the latest?

xabe commented 2 years ago

It is true that the error is from spring-kafka, but the example uses spring kafka 2.7.10, the latest version that works in my test is 2.6.9 of spring kafka.

I close the issue I open issue in spring-kafka

garyrussell commented 2 years ago

That example is wrong; you should NOT pause and resume the consumer directly; you should pause and resume the container.

We should leave this open as a documentation bug.

Your suggested "fix" will break a lot of framework functionality.

garyrussell commented 2 years ago

See here for the proper way to pause and resume a binding.