Open boyanitawang opened 1 year ago
Hi, what is your Kafka version and broker configuration of offsets.retention.minutes
?
Before v2.x if was 1 day by default and would lose the group committed offsets after that period of inactivity.
Hi.
We are seeing a similar problem, and only with librdkafka. We are also unable to reproduce this problem under controlled conditions.
In our infrastructure, we conduct periodic exercises to shut down part of the network to check the stability of the entire system. At this moment, in a disconnected network, some of the brokers remain working and some of the services also remain physically active, but have no connection with other nodes. At this point, all consumers are rebalanced. And on those that remained, and on those that are in the disconnected segment.
Because the disconnected segment does not see the rest of the kafka cluster, then, in fact, no useful work is done there.
Everything works well, until the switched off network segment is turned on.
After that, in some services, after rebalancing, offsets are reset in separate partitions to the very first available offset.
All clients with this problem:
@kalduzov what version(s) of Kafka brokers do you see this issue with?
We've experienced the same issue twice after similar network partitioning testing procedures between Kafka brokers. We're trying to reproduce it in isolated environment to reduce negative impact on production, but with limited luck so far. Our observations:
Aforementioned observations are more or less suspecting the server. However, as you mentioned that you're experiencing this only with librdkafka
, then the issue might be lurking on the client side. This code looks interesting: https://github.com/confluentinc/librdkafka/blob/49f180a36c247100dc246379ace84833b91f4038/src/rdkafka_fetcher.c#L283-L289
Perhaps during network partitioning offset metadata is inaccessible and/or consumer gets to consume offset out of range and rktp->rktp_offsets.fetch_pos.offset
gets reset to either 0
, RD_KAFKA_OFFSET_INVALID
or follows auto.offset.reset
?
@ljank hi
To catch this problem, we raised a separate cluster with a heavy load and actively randomly restarted the brokers.
I wanna know whether this issue has been fixed. do you have any new finds?
Hello.
The frequency of this problem has greatly decreased since the update containing KIP-320.
We are still registering problems with offset reset, at a time when several brokers in the cluster become suddenly unavailable.
The nature of the problem has changed. If previously the reset was carried out only after the old brokers returned to service, now this occurs immediately after the election of a new party leader.
@kalduzov Would you like to consider upgrading to the latest version of Rdkafka, which is 2.3.0?
Description
hi, We have recently encountered a problem in our business, and we have not been able to reproduce it so far. Let me describe the phenomenon below. 1: We have a topic A with 80 partitions and 110 consumers enabled.
2: Due to some reasons, the producer stopped producing after a period of time, and the consumer has been opening the subscription. Two days later, we went online to restart the order. After triggering rebalance, the offset of some partitions returned abnormally (two days ago), and began to consume historical data。
Description of key information: 1: Using C++ librdkafka。 2:The production end no longer produces the latest data, at this time (max_offset == current offset) 3:Client does not save offset, tp;
tp.reset(RdKafka::TopicPartition::create(msg->topic_name(), msg->partition(), msg->offset() + 1));
// xxxxx
//Business logic
// xxx
std::vector<RdKafka::TopicPartition*> offsets = {tp.get()};
offsets_store(offsets);
5: Some partition offset rollback occurs, The latest offset of the partition is 700W, and the consumption starts from around 400W after the offset rollback。
enable.auto.commit=true, auto.commit.interval.ms=5000 , enable.auto.offset.store=false, 4: code segment: std::unique_ptr
How to reproduce
cannot be reproduced。
Please provide the following information:
<REPLACE with e.g., v0.10.5 or a git sha. NOT "latest" or "current">
<REPLACE with e.g., 0.10.2.3>
<REPLACE with e.g., message.timeout.ms=123, auto.reset.offset=earliest, ..>
<REPLACE with e.g., Centos 5 (x64)>
debug=..
as necessary) from librdkafka