In case of subscription change with a consumer using the cooperative assignor it can resume fetching from a previous position.
That can also happen if resuming a partition that wasn't paused.
The reason is that fetch version is immediately bumped when enqueuing the resume operation and even if the operation is then discarded because partition isn't paused, the is no next_fetch_startto start from when offset validation is completed in rd_kafka_toppar_fetch_decide_start_from_next_fetch_start, or it's the next_fetch_start of previous pause operation, so there can be a reset to an offset that was already consumed.
How to reproduce
Execute test 0050/test_no_duplicate_messages("cooperative-sticky") or 0145/test_no_duplicate_messages_unnecessary_resume(*) in #4636.
Checklist
Please provide the following information:
[x] librdkafka version (1.x)
[ ] Apache Kafka version: <REPLACE with e.g., 0.10.2.3>
Description
In case of subscription change with a consumer using the cooperative assignor it can resume fetching from a previous position. That can also happen if resuming a partition that wasn't paused. The reason is that fetch version is immediately bumped when enqueuing the resume operation and even if the operation is then discarded because partition isn't paused, the is no
next_fetch_start
to start from when offset validation is completed inrd_kafka_toppar_fetch_decide_start_from_next_fetch_start
, or it's thenext_fetch_start
of previous pause operation, so there can be a reset to an offset that was already consumed.How to reproduce
Execute test
0050/test_no_duplicate_messages("cooperative-sticky")
or0145/test_no_duplicate_messages_unnecessary_resume(*)
in #4636.Checklist
Please provide the following information:
<REPLACE with e.g., 0.10.2.3>
<REPLACE with e.g., message.timeout.ms=123, auto.reset.offset=earliest, ..>
<REPLACE with e.g., Centos 5 (x64)>
debug=..
as necessary) from librdkafka