confluentinc / kafka-rest

Confluent REST Proxy for Kafka
https://docs.confluent.io/current/kafka-rest/docs/index.html
Other
45 stars 646 forks source link

Duplicate Messages( same offset value) are returned by records API call #730

Open pramodgudipati opened 4 years ago

pramodgudipati commented 4 years ago

I am using REST proxy 5.3.2 version. When I enabled debug mode on REST proxy, I saw below log statement which clearly mentions range of offsets that got duplicated i.e, 794686959 to 794686973.

,,"2020-08-03 22:27:49 DEBUG Fetcher:670 [thread_id-pool-3-thread-9] - [Consumer clientId=consumer-10, groupId=testgrp] Ignoring fetched records for testtopic-0 at offset 794686974 since the current position is FetchPosition{offset=794686959, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 970 rack: BF3004), epoch=-1}}"

In the records API response, I see messages from 794686959 to 794686973 twice and then followed by 794686974 to some 'x' records.

Based on debug log statement, I looked at Fetcher class in kafka 2.3.1-rc2 version and see that following

when "if(partitionRecords.nextFetchOffset == position.offset)" condition is false, it goes to else to log above debug statement.

Issue is happening intermittently. Need help to understand in what scenario this could happen and how to avoid it?

This particular consumer is fetching records from a high volume topic, processes each message to perform some business logic, publishes back to kafka to a different topic.

MarkC0x commented 4 years ago

Hi @pramodgudipati,

Could you share a little more information around this, for example, how exactly are you committing offsets?

Also, what are the scenarios is which this duplication happens? Does the Consumer Instance time-out, or get deleted, and then upon the next instantiation you are getting these message duplicates?

What settings do you have in your kafka-rest.properties file? All would be good, but specifically for:

consumer.auto.commit.interval.ms
consumer.request.timeout.ms

Cheers Mark

pramodgudipati commented 4 years ago

@MarkC0x we use post offsets API call in Rest proxy to commit offsets. POST /consumers/(string:group_name)/instances/(string:instance)/offsets

we only see issue happening intermittently and couldn't figure out in what scenario this is happening. One observation was when topic has high volume data, we see issue happening intermittently.

No, consumer instance looks fine. I don't have those properties in kaka-rest.properties, so we are using default values.

consumer.request.timeout.ms = 1000 consumer.auto.commit.interval.ms = 5000