Closed Walliee closed 5 years ago
When creating a new consumer, KafkaMessageSource subscribes to partition assigned events and sets KafkaMessageSource.this.assigned to true.
KafkaMessageSource
KafkaMessageSource.this.assigned
true
https://github.com/spring-projects/spring-integration-kafka/blob/8f08abe265259da67ef3517ca107c6eb65033207/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java#L402-L416
However, KafkaMessageSource.this.assigned is not toggled back to false when partitions are revoked.
false
https://github.com/spring-projects/spring-integration-kafka/blob/8f08abe265259da67ef3517ca107c6eb65033207/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java#L387-L400
Since, the value of assigned is used to determine whether to use pollTimeout or assignTimeout, if the last event on a poll was partitions revoked then on the next poll assignTimeout should be used but it won't be used.
assigned
pollTimeout
assignTimeout
Sounds like a bug and looks like you are fully in the subject.
Feel free to raise a PR to fix the issue.
Thank you!
When creating a new consumer,
KafkaMessageSource
subscribes to partition assigned events and setsKafkaMessageSource.this.assigned
totrue
.https://github.com/spring-projects/spring-integration-kafka/blob/8f08abe265259da67ef3517ca107c6eb65033207/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java#L402-L416
However,
KafkaMessageSource.this.assigned
is not toggled back tofalse
when partitions are revoked.https://github.com/spring-projects/spring-integration-kafka/blob/8f08abe265259da67ef3517ca107c6eb65033207/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java#L387-L400
Since, the value of
assigned
is used to determine whether to usepollTimeout
orassignTimeout
, if the last event on a poll was partitions revoked then on the next pollassignTimeout
should be used but it won't be used.