Closed xabe closed 2 years ago
That just means there are no more records to consume (offsets greater than 22) after the container was resumed.
Hi @garyrussell
The problem is not the offset, but why does the kafkaMessageListenerContainer resume the kafka consumer? When should Spring Kafka send an event ListenerContainerIdleEvent with partitions paused.
The documentation example pause and resume does not work from 2.7.X and 2.8.X
my proposed solution is as follows
private void pausePartitionsIfNecessary() {
Long idleEventInterval = this.containerProperties.getIdleEventInterval();
if (idleEventInterval == null) {
Set<TopicPartition> pausedConsumerPartitions = this.consumer.paused();
Collection<TopicPartition> partitions = getAssignedPartitions();
if (partitions != null) {
List<TopicPartition> partitionsToPause = partitions
.stream()
.filter(tp -> isPartitionPauseRequested(tp)
&& !pausedConsumerPartitions.contains(tp))
.collect(Collectors.toList());
if (partitionsToPause.size() > 0) {
this.consumer.pause(partitionsToPause);
this.pausedPartitions.addAll(partitionsToPause);
this.logger.debug(() -> "Paused consumption from " + partitionsToPause);
partitionsToPause.forEach(KafkaMessageListenerContainer.this::publishConsumerPartitionPausedEvent);
}
}
}
}
private void resumePartitionsIfNecessary() {
Long idleEventInterval = this.containerProperties.getIdleEventInterval();
if (idleEventInterval == null) {
Set<TopicPartition> pausedConsumerPartitions = this.consumer.paused();
List<TopicPartition> partitionsToResume = this
.assignedPartitions
.stream()
.filter(tp -> !isPartitionPauseRequested(tp)
&& pausedConsumerPartitions.contains(tp))
.collect(Collectors.toList());
if (partitionsToResume.size() > 0) {
this.consumer.resume(partitionsToResume);
this.pausedPartitions.removeAll(partitionsToResume);
this.logger.debug(() -> "Resumed consumption from " + partitionsToResume);
partitionsToResume.forEach(KafkaMessageListenerContainer.this::publishConsumerPartitionResumedEvent);
}
}
}
And Test
@Test
public void dontResumePausedIdlePartition() throws Exception {
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
Consumer<Integer, String> consumer = mock(Consumer.class);
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
given(consumer.assignment()).willReturn(Set.of(new TopicPartition("foo", 0)));
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
Thread.sleep(500);
return emptyRecords;
});
ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset("foo", 0));
containerProps.setGroupId("grp");
containerProps.setAckMode(AckMode.RECORD);
containerProps.setClientId("clientId");
containerProps.setIdleEventInterval(2000L);
containerProps.setMessageListener((MessageListener) rec -> { });
containerProps.setMissingTopicsFatal(false);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
final CountDownLatch idleEventPublish = new CountDownLatch(1);
container.setApplicationEventPublisher(event -> {
if(event instanceof ListenerContainerIdleEvent)
{
idleEventPublish.countDown();
}
});
container.start();
idleEventPublish.await(10,TimeUnit.SECONDS);
verify(consumer, never()).pause(any());
verify(consumer, never()).resume(any());
container.stop();
}
Why did you write this issue against the binder if you believe the problem is in spring-kafka?
A similar issue was fixed some time ago https://github.com/spring-projects/spring-kafka/commit/726c48517d6fc5ccf3a6c1b7e51df65ee093c66c
It was fixed in 2.7.4; are you sure you are using the latest?
It is true that the error is from spring-kafka, but the example uses spring kafka 2.7.10, the latest version that works in my test is 2.6.9 of spring kafka.
I close the issue I open issue in spring-kafka
That example is wrong; you should NOT pause and resume the consumer directly; you should pause and resume the container.
We should leave this open as a documentation bug.
Your suggested "fix" will break a lot of framework functionality.
See here for the proper way to pause and resume a binding.
Hi
I am trying to make the example pause and resume, but it doesn't work with the latest spring cloud (2020.0.5).
I have an example on github and test that paused and resume of consumer kafka it works version spring boot 2.3.3 and Hoxton.SR3 and other branch with new version that doesn't work spring boot 2.5.9 and 2020.0.5
In the logs you can see that the ListenerContainerIdleEvent event arrives
In last version spring clould stream, ListenerContainerIdleEvent never arrives because kafkaMessageListenerContainer resume consumer in method resumePartitionsIfNecessary
Regards