Closed georgievaja closed 2 years ago
Hi
What is the auto commit interval in your settings? Also what is the ingestion rate of events?
AutoCommitIntervalMs is not explicitly set, so it is the default value 5000 ms. Messages in (around the incriminated time) per specified topic 1.1 messages/sec. Generally 47.6 messages/sec.
Thanks.
How often do you store the offsets? i.e you call the API StoreOffset
Without looking at the code and by your description, that sounds very plausible that upon rebalance you will have 5 seconds worth of messages not committed. In the case above you have about 700+
If you don't call StoreOffset for each message, then there is a chance you don't commit more than 5 seconds (commit interval) worth of data.
Also, if you could add a snippet of your code that can help, but based on the description, this seems to be normal behavior. You can commit more often by reducing the interval, and also ensure you call StoreOffset more often (more frequent or same as commit interval, otherwise auto commit will commit same offset or just NOOP).
Well, it is not plausible because as we can see in the __consumer_offsets
topic, the last offset was stored correctly.
Also, 5 seconds of 1.1 message of that topic gives 5+ messages, not 700+.
Can you please share your code? And answer my question about calling StoreOffset.
Yes, we call StoreOffset with each message. This discussion about storing the offset is unsubstantial. Once again, we do not have a problem with storing it, however there is something incorrect happening while fetching it the first time.
FYI auto commit interval is the minimum delay between commits. It could be less frequent commits, so the calculation above described was mistaken. My bad.
Auto commit occurs at poll. So if the last time you called poll, aka Consume was 700+ offset behind, then you will run into what you describe.
Again without seeing the relevant code, all we can do is speculate.
We are having a very similar issue with the latest NuGet package.
The testing cluster has 3 servers and the topic has 6 partitions with replication factor of 3.
Two consumers connected to the cluster with an empty topic. I then have 18 messages dumped into the topic via console producer. The key (1-6) actually allow the messages to evenly distributed among all the partitions.
The code below is basically trying to simulate that each message will take some time to process and we like to commit the message after the physical processing is done. In another word, we are looking for at most once process but never drop/skip messages. Once the messages started processing with each consumer, we will abruptly kill one consumer which will cause a rebalance.
We have noticed different behaviors depending on if the partition assigned handler is set with the consumer.
When the partition assigned handler is set, even if it is an empty function/delegate, the system behaves as expected. If one consumer is killed before commit, the uncommitted message will be picked up again and never a duplicate.
However, when the consumer was built without setting a partition assigned handler, upon rebalancing, the remaining consumer will go back and re-consume messages it had already consumed and committed. I had pulled network traffic dump and getting the committed offset immediately after the consume(), and it shows that the committed offset is correct for each topic (except it is fetching an offset it had processed/committed before).
By the way, we did not implement consumer.close() as the intention is to test a hard crash via control-c. I am new to Kafka and there might be things I have not grasped or understood. Appreciate any help!
private void RunConsumer()
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = KafkaBootstrapServers,
GroupId = KafkaConsumerGroup,
ClientId = KafkaConsumerId,
EnableAutoCommit = false,
SessionTimeoutMs = 15000, // 15 seconds session timeout
StatisticsIntervalMs = 5000,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnablePartitionEof = true,
PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky
};
using var consumer = new ConsumerBuilder<string, string>(consumerConfig)
.SetErrorHandler((_, e) => _logger.LogError($"Error: {e.Reason}"))
.SetStatisticsHandler(StatisticsHandler)
.SetPartitionsAssignedHandler(PartitionAssignedHandler)
.SetPartitionsRevokedHandler(PartitionsRevokedHandler)
.SetPartitionsLostHandler(PartitionsLostHandler)
.Build();
consumer.Subscribe(KafkaTopic);
try
{
while (true)
try
{
var consumeResult = consumer.Consume(TimeSpan.FromMilliseconds(1000));
if (consumeResult == null) continue;
if (consumeResult.IsPartitionEOF)
{
_logger.LogDebug(
$"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
continue;
}
_logger.LogInformation(
$"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");
Thread.Sleep(KafkaConsumerSleepMs);
try
{
consumer.Commit(consumeResult);
_logger.LogInformation(
$"Committed message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");
}
catch (KafkaException ke)
{
_logger.LogError($"Commit error: {ke.Error.Reason}");
}
}
catch (ConsumeException e)
{
_logger.LogDebug($"Consume error: {e.Error.Reason}");
}
}
catch (OperationCanceledException)
{
_logger.LogDebug("Cancellation requested!");
}
}
private void PartitionAssignedHandler<T1, T2>(IConsumer<T1, T2> c, List<TopicPartition> p)
{
_logger.LogDebug("Partition assigned: [" +
string.Join(',', p.Select(p => p.Partition.Value)) +
"]");
}
private void PartitionsRevokedHandler<T1, T2>(IConsumer<T1, T2> c, List<TopicPartitionOffset> p)
{
var remaining = c.Assignment.Where(atp => p.Count(x => x.TopicPartition == atp) == 0);
_logger.LogDebug("Partition revoked: [" +
string.Join(',', p.Select(p => p.Partition.Value)) +
"], remaining: [" +
string.Join(',', remaining.Select(p => p.Partition.Value)) +
"]");
}
private void PartitionsLostHandler<T1, T2>(IConsumer<T1, T2> c, List<TopicPartitionOffset> p)
{
_logger.LogDebug($"Partition lost: [{string.Join(",", p)}]");
}
private void StatisticsHandler<T1, T2>(IConsumer<T1, T2> c, string s)
{
if (KafkaStatisticEnable)
_logger.LogDebug($"Statistics: {s}");
}
}
there were significant changes to the consumer rebalance code between 1.5.3 and 1.7.0 (to implement incremental rebalancing), so this is a bit concerning.
thanks for the detail @zhenxuzj - that is useful, there are quite different code paths when the handler is set and when it isn't.
librdkafka debug logs from the consumer would be very helpful in knowing what is going on (set Debug config to "all", or "consumer" may be enough if that is too verbose). these will give high visibility in when offsets were committed, and the result of that + relationship to rebalance workflow etc.
cc: @edenhill . are there any related fixes in v.1.8.0?
Auto commit occurs at poll. this is correct for the Java client, but in librdkafka, the timing is independent.
We've identified some corner case with regards to auto.offset.reset that are fixed in the upcoming v1.8.0 release.
Would you suggest rolling back to 1.6.x?
Could we get a reproduce debug log first?
We've identified some corner case with regards to auto.offset.reset that are fixed in the upcoming v1.8.0 release.
@edenhill could I ask more detail about the corner case - how is it reproducable.. in which situation is it manifesting? We turned on all consumer logging but it did not happen ever since. So I have nothing to share right now.
auto.offset.reset
could be triggered by temporary errors, such as connection loses (ERR__TRANSPORT), timeouts, etc.
In v1.8.0 we've made changes to only reset the offset on permanent errors, and to clearly log (regardless of debug level) the reason for the offset reset.
We have same problem with same configs. 1.8.0 fix that issue?
Attached is the log with librdkafka debug turned on.
9 messages keyed into 3 partitions equally. 2 consumers.
Consumer 1 log
[14:49:53 INF] Application starting...
[14:49:53 DBG] Bootstrap servers = dev-kafka1a:9092
[14:49:53 DBG] Topic = test.zxu
[14:49:53 DBG] Consumer group = ConsumerGroup-Test
[14:49:53 DBG] Consumer id = consumer1
[14:49:53 INF] Received message at test.zxu [[1]] @22: 4a
[14:49:59 ERR] Commit error: Broker: Specified group generation id is not valid
[14:50:00 INF] Received message at test.zxu [[0]] @22: 1a
[14:50:05 INF] Committed message at test.zxu [[0]] @22: 1a
[14:50:05 INF] Received message at test.zxu [[0]] @23: 1b
[14:50:19 INF] Application starting...
[14:50:19 DBG] Bootstrap servers = dev-kafka1a:9092
[14:50:19 DBG] Topic = test.zxu
[14:50:19 DBG] Consumer group = ConsumerGroup-Test
[14:50:19 DBG] Consumer id = consumer1
[14:50:21 INF] Received message at test.zxu [[2]] @32: 2a
[14:50:26 INF] Committed message at test.zxu [[2]] @32: 2a
[14:50:26 INF] Received message at test.zxu [[2]] @33: 2b
[14:50:31 INF] Committed message at test.zxu [[2]] @33: 2b
[14:50:31 INF] Received message at test.zxu [[2]] @34: 2c
[14:50:36 INF] Committed message at test.zxu [[2]] @34: 2c
[14:50:36 INF] Received message at test.zxu [[0]] @23: 1b
[14:50:41 INF] Committed message at test.zxu [[0]] @23: 1b
[14:50:41 INF] Received message at test.zxu [[0]] @24: 1c
[14:50:46 INF] Committed message at test.zxu [[0]] @24: 1c
[14:50:46 DBG] Reached end of topic test.zxu, partition [2], offset 35.
[14:50:46 DBG] Reached end of topic test.zxu, partition [0], offset 25.
Consumer2 log
[14:49:56 INF] Application starting...
[14:49:56 DBG] Bootstrap servers = dev-kafka1a:9092
[14:49:56 DBG] Topic = test.zxu
[14:49:56 DBG] Consumer group = ConsumerGroup-Test
[14:49:56 DBG] Consumer id = consumer2
[14:50:00 INF] Received message at test.zxu [[1]] @22: 4a
[14:50:05 INF] Committed message at test.zxu [[1]] @22: 4a
[14:50:05 INF] Received message at test.zxu [[1]] @23: 4b
[14:50:10 INF] Committed message at test.zxu [[1]] @23: 4b
[14:50:10 INF] Received message at test.zxu [[1]] @24: 4c
[14:50:15 INF] Committed message at test.zxu [[1]] @24: 4c
[14:50:15 DBG] Reached end of topic test.zxu, partition [1], offset 25.
[14:50:21 INF] Received message at test.zxu [[1]] @22: 4a
[14:50:26 INF] Committed message at test.zxu [[1]] @22: 4a
[14:50:26 INF] Received message at test.zxu [[1]] @23: 4b
[14:50:31 INF] Committed message at test.zxu [[1]] @23: 4b
[14:50:31 INF] Received message at test.zxu [[1]] @24: 4c
[14:50:36 INF] Committed message at test.zxu [[1]] @24: 4c
[14:50:36 DBG] Reached end of topic test.zxu, partition [1], offset 25.
Consumer 1 is forced to exit via Ctrl-C and then restarted. Consumer 2 is left running all the time.
By the way, I have already upgrade the NuGet to 1.8.1. The consumer was built without any handler set.
As you can see, after the rebalance @ 14:50:15, messages already consumed (4a/4b/4c) were re-processed on consumer 2.
1.8.1 version didn't fix that issue :( . Any suggestion ?
We have the same problem.
NuGet Version: 1.7.0 and PartitionAssignmentStrategy.RoundRobin
After adding some chaos to our deployment, we introduced frequent rebalancing and can clearly see, that we re-process old messages.
Offset 435723353 At 05:13:12.08 my-topic [3] offset 435723353
Offset jumps back to 435680920 At 05:13:15.237 my-topic [3] offset 435680920
Nov 15, 2021 @ 05:13:05.971 [thrd:main]: Topic my-topic [3]: stored offset -1001, committed offset 435723353: not including in commit
Nov 15, 2021 @ 05:13:05.971 [thrd:main]: my-topic [3] offset STORED
Nov 15, 2021 @ 05:13:07.797 [thrd:sasl_ssl://my-brokern]: sasl_ssl://my-broker:9093/0: Fetch topic my-topic [3] at offset 435723353 (v58)
Nov 15, 2021 @ 05:13:09.797 [thrd:sasl_ssl://my-brokern]: sasl_ssl://my-broker:9093/0: Fetch topic my-topic [3] at offset 435723353 (v58)
Nov 15, 2021 @ 05:13:10.973 [thrd:main]: Topic my-topic [3]: stored offset -1001, committed offset 435723353: not including in commit
Nov 15, 2021 @ 05:13:10.973 [thrd:main]: my-topic [3] offset STORED
Nov 15, 2021 @ 05:13:11.833 [thrd:sasl_ssl://my-brokern]: sasl_ssl://my-broker:9093/0: Fetch topic my-topic [3] at offset 435723353 (v58)
Nov 15, 2021 @ 05:13:12.013 [thrd:main]: my-topic [3]: rd_kafka_toppar_op_pause_resume:2434: new version barrier v59
Nov 15, 2021 @ 05:13:12.013 [thrd:main]: Pause my-topic [3] (v59)
Nov 15, 2021 @ 05:13:12.014 [thrd:main]: my-topic [3] received op PAUSE (v59) in fetch-state active (opv58)
Nov 15, 2021 @ 05:13:12.014 [thrd:main]: Pause my-topic [3]: at offset 435723353 (state active, v59)
Nov 15, 2021 @ 05:13:12.016 [thrd:main]: my-topic [3] offset STORED
Nov 15, 2021 @ 05:13:12.018 [thrd:main]: my-topic [3]: rd_kafka_toppar_op_fetch_stop:2375: new version barrier v60
Nov 15, 2021 @ 05:13:12.018 [thrd:main]: Stop consuming my-topic [3] (v60)
Nov 15, 2021 @ 05:13:12.018 [thrd:main]: my-topic [3]: rd_kafka_toppar_op_pause_resume:2434: new version barrier v61
Nov 15, 2021 @ 05:13:12.018 [thrd:main]: Resume my-topic [3] (v61)
Nov 15, 2021 @ 05:13:12.018 [thrd:main]: Removing (un)desired topic my-topic [3]
Nov 15, 2021 @ 05:13:12.018 [thrd:main]: Removing my-topic [3] from assignment (started=true, pending=false, queried=false, stored offset=INVALID)
Nov 15, 2021 @ 05:13:12.022 [thrd:main]: my-topic [3] received op FETCH_STOP (v60) in fetch-state active (opv59)
Nov 15, 2021 @ 05:13:12.022 [thrd:main]: Stopping fetch for my-topic [3] in state active (v60)
Nov 15, 2021 @ 05:13:12.022 [thrd:main]: Partition my-topic [3] changed fetch state active -> stopping
Nov 15, 2021 @ 05:13:12.022 [thrd:main]: my-topic [3]: offset store terminating
Nov 15, 2021 @ 05:13:12.022 [thrd:main]: Partition my-topic [3] changed fetch state stopping -> stopped
Nov 15, 2021 @ 05:13:12.022 [thrd:main]: my-topic [3] received op PAUSE (v61) in fetch-state stopped (opv60)
Nov 15, 2021 @ 05:13:12.022 [thrd:main]: Not resuming stopped my-topic [3]: at offset 435723353 (state stopped, v61)
Nov 15, 2021 @ 05:13:12.026 [thrd:main]: Group "test" received op PARTITION_LEAVE in state up (join-state wait-unassign-to-complete) for my-topic [3]
Nov 15, 2021 @ 05:13:12.026 [thrd:main]: Group "test": delete my-topic [3]
Nov 15, 2021 @ 05:13:12.057 [thrd:main]: my-topic [3]
Nov 15, 2021 @ 05:13:12.074 [thrd:main]: my-topic [3] offset INVALID
Nov 15, 2021 @ 05:13:12.074 [thrd:main]: my-topic [3] offset INVALID
Nov 15, 2021 @ 05:13:12.076 [thrd:main]: my-topic [3]: rd_kafka_toppar_op_pause_resume:2434: new version barrier v38
Nov 15, 2021 @ 05:13:12.076 [thrd:main]: Resume my-topic [3] (v38)
Nov 15, 2021 @ 05:13:12.076 [thrd:main]: my-topic [3] offset STORED
Nov 15, 2021 @ 05:13:12.076 [thrd:main]: my-topic [3] offset STORED
Nov 15, 2021 @ 05:13:12.077 [thrd:main]: Querying committed offset for pending assigned partition my-topic [3]
Nov 15, 2021 @ 05:13:12.077 [thrd:main]: my-topic [3] received op PAUSE (v38) in fetch-state stopped (opv37)
Nov 15, 2021 @ 05:13:12.077 [thrd:main]: Not resuming my-topic [3]: partition is not paused by library
Nov 15, 2021 @ 05:13:12.086 [thrd:main]: GroupCoordinator/0: OffsetFetchResponse: my-topic [3] offset 435723353, metadata 0 byte(s): NO_ERROR
Nov 15, 2021 @ 05:13:12.086 [thrd:main]: Adding my-topic [3] back to pending list with offset 435723353
Nov 15, 2021 @ 05:13:12.086 [thrd:main]: my-topic [3] offset STORED
Nov 15, 2021 @ 05:13:12.086 [thrd:main]: my-topic [3] offset 435723353
Nov 15, 2021 @ 05:13:12.086 [thrd:main]: Starting pending assigned partition my-topic [3] at offset 435723353
Nov 15, 2021 @ 05:13:12.086 [thrd:main]: my-topic [3]: rd_kafka_toppar_op_pause_resume:2434: new version barrier v39
Nov 15, 2021 @ 05:13:12.086 [thrd:main]: Resume my-topic [3] (v39)
Nov 15, 2021 @ 05:13:12.086 [thrd:main]: my-topic [3]: rd_kafka_toppar_op_fetch_start:2348: new version barrier v40
Nov 15, 2021 @ 05:13:12.087 [thrd:main]: Start consuming my-topic [3] at offset 435723353 (v40)
Nov 15, 2021 @ 05:13:12.087 [thrd:main]: my-topic [3] received op PAUSE (v39) in fetch-state stopped (opv38)
Nov 15, 2021 @ 05:13:12.087 [thrd:main]: Not resuming my-topic [3]: partition is not paused by library
Nov 15, 2021 @ 05:13:12.087 [thrd:main]: my-topic [3] received op FETCH_START (v40) in fetch-state stopped (opv39)
Nov 15, 2021 @ 05:13:12.087 [thrd:main]: Start fetch for my-topic [3] in state stopped at offset 435723353 (v40)
Nov 15, 2021 @ 05:13:12.087 [thrd:main]: Partition my-topic [3] changed fetch state stopped -> active
Nov 15, 2021 @ 05:13:12.087 [thrd:main]: Partition my-topic [3] start fetching at offset 435723353
Nov 15, 2021 @ 05:13:12.089 [thrd:main]: Group "test" received op PARTITION_JOIN in state up (join-state steady) for my-topic [3]
Nov 15, 2021 @ 05:13:12.089 [thrd:main]: Group "test": add my-topic [3]
Nov 15, 2021 @ 05:13:12.090 [thrd:sasl_ssl://my-brokern]: sasl_ssl://my-broker:9093/0: Topic my-topic [3] in state stopped at offset 435723353 (1/100000 msgs, 0/65536 kb queued, opv 58) is not fetchable: not in active fetch state
Nov 15, 2021 @ 05:13:12.090 [thrd:sasl_ssl://my-brokern]: sasl_ssl://my-broker:9093/0: Removed my-topic [3] from fetch list (12 entries, opv 58): not in active fetch state
Nov 15, 2021 @ 05:13:12.091 [thrd:sasl_ssl://my-brokern]: Topic my-topic [3]: fetch decide: updating to version 40 (was 34) at offset 435723353 (was 435681193)
Nov 15, 2021 @ 05:13:12.091 [thrd:sasl_ssl://my-brokern]: sasl_ssl://my-broker:9093/0: Topic my-topic [3] in state active at offset 435723353 (0/100000 msgs, 0/65536 kb queued, opv 40) is fetchable
Nov 15, 2021 @ 05:13:12.091 [thrd:sasl_ssl://my-brokern]: sasl_ssl://my-broker:9093/0: Added my-topic [3] to fetch list (4 entries, opv 40, 0 messages queued): fetchable
Nov 15, 2021 @ 05:13:12.091 [thrd:sasl_ssl://my-brokern]: sasl_ssl://my-broker:9093/0: Fetch topic my-topic [3] at offset 435723353 (v40)
Nov 15, 2021 @ 05:13:12.107 [thrd:main]: NEW my-topic [3] 0x7f8eef0d0be0 refcnt 0x7f8eef0d0c70 (at rd_kafka_topic_partition_cnt_update:798)
Nov 15, 2021 @ 05:13:12.108 [thrd:main]: my-topic [3]: delegate to broker sasl_ssl://my-broker:9093/0 (rktp 0x7f8eef0d0be0, term 0, ref 2)
Nov 15, 2021 @ 05:13:12.109 [thrd:main]: my-topic [3]: delegating to broker sasl_ssl://my-broker:9093/0 for partition with 0 messages (0 bytes) queued
Nov 15, 2021 @ 05:13:12.109 [thrd:main]: Migrating topic my-topic [3] 0x7f8eef0d0be0 from (none) to sasl_ssl://my-broker:9093/0 (sending PARTITION_JOIN to sasl_ssl://my-broker:9093/0)
Nov 15, 2021 @ 05:13:12.117 [thrd:sasl_ssl://my-brokern]: sasl_ssl://my-broker:9093/0: Topic my-topic [3]: joining broker (rktp 0x7f8eef0d0be0, 0 message(s) queued)
Nov 15, 2021 @ 05:13:13.113 [thrd:sasl_ssl://my-brokern]: sasl_ssl://my-broker:9093/0: Fetch topic my-topic [3] at offset 435723353 (v40)
Nov 15, 2021 @ 05:13:13.463 [thrd:main]: Topic my-topic [3]: stored offset 435680920, committed offset 435723353: not including in commit
Nov 15, 2021 @ 05:13:13.463 [thrd:main]: my-topic [3] offset STORED
Nov 15, 2021 @ 05:13:13.469 [thrd:main]: my-topic [3]: rd_kafka_toppar_op_pause_resume:2434: new version barrier v41
Nov 15, 2021 @ 05:13:13.469 [thrd:main]: Pause my-topic [3] (v41)
Nov 15, 2021 @ 05:13:13.470 [thrd:main]: my-topic [3] received op PAUSE (v41) in fetch-state active (opv40)
Nov 15, 2021 @ 05:13:13.470 [thrd:main]: Pause my-topic [3]: at offset 435723353 (state active, v41)
Nov 15, 2021 @ 05:13:13.472 [thrd:main]: my-topic [3] offset STORED
Nov 15, 2021 @ 05:13:13.473 [thrd:main]: my-topic [3]: rd_kafka_toppar_op_fetch_stop:2375: new version barrier v42
Nov 15, 2021 @ 05:13:13.473 [thrd:main]: Stop consuming my-topic [3] (v42)
Nov 15, 2021 @ 05:13:13.473 [thrd:main]: my-topic [3]: rd_kafka_toppar_op_pause_resume:2434: new version barrier v43
Nov 15, 2021 @ 05:13:13.473 [thrd:main]: Resume my-topic [3] (v43)
Nov 15, 2021 @ 05:13:13.473 [thrd:main]: Removing (un)desired topic my-topic [3]
Nov 15, 2021 @ 05:13:13.473 [thrd:main]: Removing my-topic [3] from assignment (started=true, pending=false, queried=false, stored offset=435680920)
Nov 15, 2021 @ 05:13:13.475 [thrd:main]: my-topic [3] received op FETCH_STOP (v42) in fetch-state active (opv41)
Nov 15, 2021 @ 05:13:13.475 [thrd:main]: Stopping fetch for my-topic [3] in state active (v42)
Nov 15, 2021 @ 05:13:13.475 [thrd:main]: Partition my-topic [3] changed fetch state active -> stopping
Nov 15, 2021 @ 05:13:13.475 [thrd:main]: my-topic [3]: offset store terminating
Nov 15, 2021 @ 05:13:13.475 [thrd:main]: Partition my-topic [3] changed fetch state stopping -> stopped
Nov 15, 2021 @ 05:13:13.476 [thrd:main]: my-topic [3] received op PAUSE (v43) in fetch-state stopped (opv42)
Nov 15, 2021 @ 05:13:13.476 [thrd:main]: Not resuming stopped my-topic [3]: at offset 435723353 (state stopped, v43)
Nov 15, 2021 @ 05:13:13.478 [thrd:main]: Group "test" received op PARTITION_LEAVE in state up (join-state wait-unassign-to-complete) for my-topic [3]
Nov 15, 2021 @ 05:13:13.479 [thrd:main]: Group "test": delete my-topic [3]
Nov 15, 2021 @ 05:13:14.098 [thrd:sasl_ssl://my-brokern]: sasl_ssl://my-broker:9093/0: Topic my-topic [3] in state stopped at offset 435723353 (1/100000 msgs, 0/65536 kb queued, opv 40) is not fetchable: not in active fetch state
Nov 15, 2021 @ 05:13:14.098 [thrd:sasl_ssl://my-brokern]: sasl_ssl://my-broker:9093/0: Removed my-topic [3] from fetch list (6 entries, opv 40): not in active fetch state
Nov 15, 2021 @ 05:13:15.183 [thrd:main]: my-topic [3]
Nov 15, 2021 @ 05:13:15.200 [thrd:main]: my-topic [3] offset INVALID
Nov 15, 2021 @ 05:13:15.201 [thrd:main]: my-topic [3] offset INVALID
Nov 15, 2021 @ 05:13:15.225 [thrd:main]: NEW my-topic [3] 0x7f9b81c1e020 refcnt 0x7f9b81c1e0b0 (at rd_kafka_topic_partition_cnt_update:798)
Nov 15, 2021 @ 05:13:15.225 [thrd:main]: my-topic [3]: delegate to broker sasl_ssl://my-broker:9093/0 (rktp 0x7f9b81c1e020, term 0, ref 2)
Nov 15, 2021 @ 05:13:15.225 [thrd:main]: my-topic [3]: delegating to broker sasl_ssl://my-broker:9093/0 for partition with 0 messages (0 bytes) queued
Nov 15, 2021 @ 05:13:15.225 [thrd:main]: Migrating topic my-topic [3] 0x7f9b81c1e020 from (none) to sasl_ssl://my-broker:9093/0 (sending PARTITION_JOIN to sasl_ssl://my-broker:9093/0)
Nov 15, 2021 @ 05:13:15.225 [thrd:sasl_ssl://my-brokern]: sasl_ssl://my-broker:9093/0: Topic my-topic [3]: joining broker (rktp 0x7f9b81c1e020, 0 message(s) queued)
Nov 15, 2021 @ 05:13:15.225 [thrd:main]: my-topic [3]: marking as DESIRED
Nov 15, 2021 @ 05:13:15.228 [thrd:main]: NEW my-topic [3] 0x7f394bfae010 refcnt 0x7f394bfae0a0 (at rd_kafka_topic_partition_cnt_update:798)
Nov 15, 2021 @ 05:13:15.228 [thrd:main]: my-topic [3]: delegate to broker sasl_ssl://my-broker:9093/0 (rktp 0x7f394bfae010, term 0, ref 2)
Nov 15, 2021 @ 05:13:15.228 [thrd:main]: my-topic [3]: delegating to broker sasl_ssl://my-broker:9093/0 for partition with 0 messages (0 bytes) queued
Nov 15, 2021 @ 05:13:15.228 [thrd:main]: Migrating topic my-topic [3] 0x7f394bfae010 from (none) to sasl_ssl://my-broker:9093/0 (sending PARTITION_JOIN to sasl_ssl://my-broker:9093/0)
Nov 15, 2021 @ 05:13:15.228 [thrd:sasl_ssl://my-brokern]: sasl_ssl://my-broker:9093/0: Topic my-topic [3]: joining broker (rktp 0x7f394bfae010, 0 message(s) queued)
Nov 15, 2021 @ 05:13:15.231 [thrd:main]: my-topic [3]: rd_kafka_toppar_op_pause_resume:2434: new version barrier v2
Nov 15, 2021 @ 05:13:15.231 [thrd:main]: Resume my-topic [3] (v2)
Nov 15, 2021 @ 05:13:15.231 [thrd:main]: my-topic [3] offset STORED
Nov 15, 2021 @ 05:13:15.231 [thrd:main]: my-topic [3] offset STORED
Nov 15, 2021 @ 05:13:15.231 [thrd:main]: Not resuming my-topic [3]: partition is not paused by library
Nov 15, 2021 @ 05:13:15.231 [thrd:main]: Querying committed offset for pending assigned partition my-topic [3]
Nov 15, 2021 @ 05:13:15.231 [thrd:main]: my-topic [3] received op PAUSE (v2) in fetch-state none (opv1)
Nov 15, 2021 @ 05:13:15.237 [thrd:main]: GroupCoordinator/0: OffsetFetchResponse: my-topic [3] offset 435680920, metadata 0 byte(s): NO_ERROR
Nov 15, 2021 @ 05:13:15.237 [thrd:main]: Adding my-topic [3] back to pending list with offset 435680920
Nov 15, 2021 @ 05:13:15.237 [thrd:main]: my-topic [3] offset STORED
Nov 15, 2021 @ 05:13:15.237 [thrd:main]: my-topic [3] offset 435680920
Nov 15, 2021 @ 05:13:15.237 [thrd:main]: Starting pending assigned partition my-topic [3] at offset 435680920
Nov 15, 2021 @ 05:13:15.237 [thrd:main]: my-topic [3]: rd_kafka_toppar_op_pause_resume:2434: new version barrier v3
Nov 15, 2021 @ 05:13:15.237 [thrd:main]: Resume my-topic [3] (v3)
Nov 15, 2021 @ 05:13:15.237 [thrd:main]: my-topic [3]: rd_kafka_toppar_op_fetch_start:2348: new version barrier v4
Nov 15, 2021 @ 05:13:15.237 [thrd:main]: Start consuming my-topic [3] at offset 435680920 (v4)
Nov 15, 2021 @ 05:13:15.238 [thrd:main]: my-topic [3] received op PAUSE (v3) in fetch-state none (opv2)
Nov 15, 2021 @ 05:13:15.238 [thrd:main]: Not resuming my-topic [3]: partition is not paused by library
Nov 15, 2021 @ 05:13:15.238 [thrd:main]: my-topic [3] received op FETCH_START (v4) in fetch-state none (opv3)
Nov 15, 2021 @ 05:13:15.238 [thrd:main]: Start fetch for my-topic [3] in state none at offset 435680920 (v4)
Nov 15, 2021 @ 05:13:15.238 [thrd:main]: Partition my-topic [3] changed fetch state none -> active
Nov 15, 2021 @ 05:13:15.238 [thrd:main]: Partition my-topic [3] start fetching at offset 435680920
Nov 15, 2021 @ 05:13:15.238 [thrd:main]: Group "test" received op PARTITION_JOIN in state up (join-state steady) for my-topic [3]
Nov 15, 2021 @ 05:13:15.238 [thrd:main]: Group "test": add my-topic [3]
The issue is still there in 1.8.2.
@ErenArslan Can you confirm this?
Hi, adding to this thread as we have exactly the same problem using both Kafka nuget v1.6.3 and v1.8.2. We were initially using a consumer with auto-committing every 5000ms (in fact the same config settings as @georgievaja) and experiencing the same problem every time we added or removed another consumer for the given topics.
In an effort to try and diagnose this issue we tried a lot of different combinations of consumer config options, and eventually also wrote a consumer that manually commits after every message is processed, and this still has the same issue of changing the offset after a rebalance.
Our setup that exhibits the issue is: Consumers: 1 (initially) Brokers: 2 Kafka nuget versions: 1.6.3 and 1.8.2 Kafka version: 2.7.0
This is a simplified version of our consumer:
void Consume(KafkaOptions options)
{
var config = new ConsumerConfig()
{
BootstrapServers = options.BootstrapServers,
GroupId = options.GroupId,
PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky,
EnableAutoCommit = false,
EnableAutoOffsetStore = false,
AutoOffsetReset = AutoOffsetReset.Latest,
AllowAutoCreateTopics = false,
};
var consumer = new ConsumerBuilder<string, string>(config).Build();
consumer.Subscribe(topicList);
var token = new CancellationToken();
while (!token.IsCancellationRequested)
{
var cr = consumer.Consume(token);
//In this case ProcessMessage is simply creating a different kind of message and publishing it again
ProcessMessage(cr);
consumer.StoreOffset(cr);
consumer.Commit(cr);
Logger.LogInformation(
"Stored and committed Topic: {Topic}, Partition: {Partition}, Offset: {Offset}", cr.Topic,
cr.Partition, cr.Offset);
}
consumer.Close();
consumer.Dispose();
}
And attached are logs for the time periods relevant to this issue. MSK-logs.csv
Happy to provide any extra details as needed to help diagnose this.
at 09:28:12 the OffsetFetch for 'topic 2 name' gives -1 (nothing committed) for some of that partitions. is that expected?
2021-12-23 09:28:12.485,<instance 2>,"%7|1640251692.484|OFFSETFETCH|rdkafka#consumer-3| [thrd:main]: GroupCoordinator/2: OffsetFetchResponse: <topic 2 name> [2] offset 2124, metadata 0 byte(s): NO_ERROR"
2021-12-23 09:28:12.485,<instance 2>,"%7|1640251692.484|OFFSETFETCH|rdkafka#consumer-3| [thrd:main]: GroupCoordinator/2: OffsetFetchResponse: <topic 2 name> [1] offset -1, metadata 0 byte(s): NO_ERROR"
2021-12-23 09:28:12.485,<instance 2>,"%7|1640251692.484|OFFSETFETCH|rdkafka#consumer-3| [thrd:main]: GroupCoordinator/2: OffsetFetchResponse: <topic 2 name> [4] offset -1, metadata 0 byte(s): NO_ERROR"
2021-12-23 09:28:12.485,<instance 2>,"%7|1640251692.484|OFFSETFETCH|rdkafka#consumer-3| [thrd:main]: GroupCoordinator/2: OffsetFetchResponse: <topic 2 name> [0] offset -1, metadata 0 byte(s): NO_ERROR"
2021-12-23 09:28:12.485,<instance 2>,"%7|1640251692.484|OFFSETFETCH|rdkafka#consumer-3| [thrd:main]: GroupCoordinator/2: OffsetFetchResponse: <topic 2 name> [3] offset -1, metadata 0 byte(s): NO_ERROR"
it may be wise to set AutoOffsetReset
to Error
to fail fast in the case of offset fetch errors (though that is not happening in this case from a quick look, and the bug which caused the offset to reset on temporary errors was resolved in v1.8.0).
Thanks for the reply @mhowlett. This is expected AFAIK as I was forcing committing after processing a single message.
I'll take a look into if AutoOffsetReset
would allow us to at least spot the multiple reprocessing and prevent generation of duplicate messages from the output.
Side note: Pausing and resuming consumtion is quite costly since all pre-fetched messages are discarded on pause and there's some overhead in fetching the committed offset on resume, etc.
@barndawe In your MSK-logs.csv, what topic and partition is being rewound?
I don't see any "fetching topic .. at
Also, what librdkafka/client version are the logs from?
It is still happening in version 1.8.2.
Happened to us again today after a couple of months. It only happened with 1 partition (of 20) - partition 19. It jumped from 198539 back to 197375 as you can see.
New facts that I have, are that it fetched the offset correctly at 2022-01-29 18:18:50
:
[thrd:main]: Partition xxx [19] start fetching at offset 198539
Also, an interesting fact is, that number 197375
is the number it fetched during the previous rebalance (1 day back at 2022-01-28 15:05:16
).
[thrd:main]: Partition xxx [19] start fetching at offset 197375
looking into this - thanks for the detailed info @georgievaja , @lhaussknecht , @barndawe helps a lot.
I've encountered this problem too, using v1.8.2 with auto commit + manual offset storage. In my case, my revocation handler shows the correct offsets, but the commit handler that runs immediately afterwards shows the wrong offsets:
I'm was wondering if this has to do with the lack of an explicit call to Commit()
in the consumer's RebalanceCallback
. The documentation for librdkafka suggests applications should do so before unassigning. Though the .NET library doesn't appear to do this, my commit handler does appear to get invoked consistently immediately after every rebalance. I'm not able to find what code is triggering this sometimes-erroneous commit.
Also potentially related, both times I've observed this, the rebalance happened very soon after startup / receiving initial assignment.
has anyone managed to write a program that can be used to relatively reliably demonstrate this issue without manual intervention?
I've been trying to set up a test case to reproduce, but haven't managed to do so yet.
Hi @mhowlett
When I read about the linked solution I understand it as a issue which is related to auto commit when having re assigned former partitions.
The initial issue has auto commit enabled, however some of the issues mentioned does also have auto commit disabled.
Would the fix also solve the issues when one have auto commit disabled and one does manually call commit?
Thanks
@3schwartz This issue is subject to any type of offset-less commit, be it auto-commit or an explicit commit() call without offsets set (so that it uses the stored offsets).
@3schwartz This issue is subject to any type of offset-less commit, be it auto-commit or an explicit commit() call without offsets set (so that it uses the stored offsets).
Thanks for the reply.
So in the case were you give the explicit consumer result when commit, like consumer.commit(cr), then the stored offsets wouldn't be used since this is a commit with offsets - right?
That's right, the bug here only revolves around the stored offsets (offsets_store() and enable.auto.offset.store=true)
Thanks :)
That's right, the bug here only revolves around the stored offsets (offsets_store() and enable.auto.offset.store=true)
Still getting the same issue
@3schwartz This issue is subject to any type of offset-less commit, be it auto-commit or an explicit commit() call without offsets set (so that it uses the stored offsets).
The issue appears with offset aware commits also, the only solution is to enable auto-commit, I tried everything with librdkafka v1.9.2 and node-rdkafka 2.1.13, and after rebalancing active consumer consumes messages of the left consumer.
What PartitionAssingment are you using ? In my with case CooperativeSticky I have the issue, without it is working ok so far.
Hi, @alfhv I am using rand and round-robin. As I know it is impossible to use CooperativeSticky in nodejs
closing this as the reported issue is resolved.
please feel free to open another github issue for any related issue and include code / logs etc. we are currently not aware of any problems.
@alfhv @tiko-star and anyone coming along later: I too was having problems with "offset rewinds" for consumers using the CooperativeSticky
assignor, when we were using version 1.9.3 of this lib. We just recently upgraded to 2.2.0 and the problem seems to have gone away. I am unsure what particular change fixed things, but for our case at least I don't think it was the one linked above since that should've been present from 1.9.0 forward.
@alfhv @tiko-star and anyone coming along later: I too was having problems with "offset rewinds" for consumers using the
CooperativeSticky
assignor, when we were using version 1.9.3 of this lib. We just recently upgraded to 2.2.0 and the problem seems to have gone away. I am unsure what particular change fixed things, but for our case at least I don't think it was the one linked above since that should've been present from 1.9.0 forward.
Due to issue was very critical in my case I never tried again with CooperativeSticky. I see several fixes has been delivered recently. I could to some checks on NonProd envs. Thanks a lot for your feedback.
Description
About a month ago we updated Confluent.Kafka library from 1.5.3 to the latest 1.7.0 and we started to experience offset issues after rebalancing. Those issues are happening randomly on different consumers, topics and partitions.
During application deployment (application start), after rebalance takes part, it happens that the affected partition is assigned to the consumer, however, the consumer starts consuming on offset thousands lower than the last offset committed - it causes many duplicates to be processed. For example, when we check
__consumer_offsets
messages after the incident, we can see following messages for one partition [xyz.eshop,xyz.eshop.async-requests,17]:Last committed offset Wednesday, August 4, 2021 6:10:01.506 PM - 134384
First committed offset after reassignment Wednesday, August 4, 2021 6:10:27.430 PM - 133632
How to reproduce
We did not find a way yet to reproduce it locally, the problem is new and occurs only occasionally during/after rebalance, probably during reassignment. We are not even sure it is connected with the library update itself. But we would appreciate any hints possible.
Checklist
Please provide the following information:
Number of brokers: 3 Number of partitions: 21 Number of consumers in a consumer group: 4 Consumer configuration:
assignment should happen during poll as a side effect
[x] Operating system - debian 9.13, 4.9.0-14-amd64
[x] Broker log (affected is consumer group xyz.eshop) controller.log kafka-authorizer.log server.log
[x] __consumer_offsets log (affected is [xyz.eshop,xyz.eshop.async-requests,17]) outputoffsets_23.txt
[x] Critical issue