apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.51k stars 3.7k forks source link

Infinite automatic Kafka offset resetting #11658

Open FrankChen021 opened 3 years ago

FrankChen021 commented 3 years ago

Affected Version

Since 0.16

Description

There's a configuration resetOffsetAutomatically in KafkaIndexTaskTuningConfig that allows Kafka offset to be reset automatically once the Kafka offset is out of range. The error that offset is out of range typically occurs when messages in Kafka expires before the Druid ingestion task reads data from Kafka.

But current automatic resetting implementation uses a wrong offset to reset. That means the resetting does no take effect and causes another out of range error, and then automatic resetting is called again. The ingestion task falls into a dead loop.

Problem Analysis

https://github.com/apache/druid/blob/59d257816b85dbeeca336b8e25d341d67bbc5697/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java#L134-L155

From the code(Line 148, Line 154) above we can see that, a variable nextOffset is used for automatic resetting. But this variable holds the offset we're currently reading from Kafka, and this is the offset that causes out of range exception(Line 134).

This means automatic resetting uses the offset which causes out of range to reset the offset. Of course, this resetting won't help and causes another out of range exception in the next round of polling messages from Kafka.

How to fix

To fix this problem, the leastAvailableOffset variable should be used to reset the offset. Since there's a check(Line 152) that guarantees that the leastAvailableOffset is greater than current reading offset, the automatic resetting also won't causes data duplication. The fixes looks like as follows

        if (leastAvailableOffset > nextOffset) {
          doReset = true;
          resetPartitions.put(topicPartition, leastAvailableOffset);

          recordSupplier.seek(streamPartition, leastAvailableOffset);
        }

I will open a PR to fix this.

samarthjain commented 2 years ago

@FrankChen021 - I believe the code is doing the right thing. It may be doing an unnecessary seek, but it shouldn't cause an infinite retry loop. In line 140 when recordSupplier.getEarliestSequenceNumber(streamPartition) is called it does the following:

 Long currPos = getPosition(partition);
 seekToEarliest(Collections.singleton(partition)); 
 Long nextPos = getPosition(partition);
 seek(partition, currPos);

So it gets the current position for partition, seeks to the earliest offset in that partition, gets the position, and then seeks back or restores to the position it was at. The earliest offset is returned and stored in leastAvailableOffset variable.

Then, at line 148 recordSupplier.seek(streamPartition, nextOffset);, assumes that the call to getEarliestSequenceNumber possibly caused the current position to be different from the position it stored at the start in nextOffset. So it does an extra seek to restore the state as it was before. The reason I said it may be unnecessary is because I saw log lines like this:

getEarliestSequenceNumber() logs:
Seeking to EARLIEST offset of partition logs_metrics-14
Resetting offset for partition insight_logs_metrics-14 to offset 2944387224.
Seeking to offset 22400192044 for partition logs_metrics-14

and then
// reset the seek 
recordSupplier.seek(streamPartition, nextOffset); 
Seeking to offset 22400192044 for partition logs_metrics-14

As you can see, two seeks were performed to seek to offset 22400192044.

Having said that, I recently ran into an issue where even though I have resetOffsetAutomatically configured, Druid didn't call reset. This resulted in a stuck kafka consumer that just kept spewing OffsetOutOfRangeException messages. Looking closely at the code, it turned out that the offset reported in outOfRangePartition was (22400192044) which was higher than the leastAvailableOffset (2944387224)reported by Kafka. As a result the condition leastAvailableOffset > nextOffset was never met and the consumer just kept sleeping for 30 seconds before retrying again.

I wonder if you also have run into something similar?

FrankChen021 commented 2 years ago

@samarthjain You have run into the exact problem I described in this issue.

Since the earliestOffset is 2944387224, but the nextOffset is 22400192044, the code goes into this block

        if (leastAvailableOffset > nextOffset) {
          doReset = true;
          resetPartitions.put(topicPartition, nextOffset);
        }

Notice that here nextoffset holds the position where the offset will be reset to, but obvisouly, the valid offset is 2944387224, so the offset won't success. In the next poll period, the consumer tries to read the topic from the wrong offset, which results in outOfRangePartition again.

So the kafka ingestion does reset again and again, this is what I mean a 'infinite loop'.

This problem usually happens when the Kafka partition expires before Druid reads message from it.

samarthjain commented 2 years ago

@FrankChen021 - I don't think the code is entering that block since earliest offset (2944387224) is less than the current offset (22400192044) . There is an extra digit in current offset. I am not exactly sure how current offset can be higher than least offset, but that seemed to be the case here.

FrankChen021 commented 2 years ago

@samarthjain Sorry, these two numbers are too long that I took the earliest as the larger one. In your case, I bet the latest offset is also less than your current offset. I have never encountered such problem at Kafka side if there's no change on this topic.

There's a scenario that I can come up with which would lead to such problem: after consuming the topic for a while, delete the topic and then re-create the topic, the log offset at Kafka side would go back to start from zero, which both earliest and latest offset are less than the offset used at the consumer side.

So, for such case, I think we could check if the latest offset is also less than the current offset, if it, maybe we should reset the offset to the latest.

samarthjain commented 2 years ago

So I know now how we ended up with an offset that was significantly higher than the earliest offset and the reason is similar to what you mentioned, @FrankChen021.

The first few versions of the supervisor were talking to Kafka Cluster A. Then, the stream was recreated on Kafka Cluster B and the spec updated accordingly. As a result offsets that Druid had stored for this stream/datasource were no longer valid. We should have stopped the supervisor, wait for all handoffs to complete and then cleared all offsets in DB before resubmitting the new spec with the updated Kafka cluster endpoint. In general, updating streams or clusters or num partitions is problematic with Druid's Kafka ingestion and invariably involves clearing offsets stored in DB.

FrankChen021 commented 2 years ago

@samarthjain Right, updating Kafka clusters are problematic. I'm wondering if it's possible to clear the stored offsets when updating the supervisor spec once the Kafka cluster/topic has changed in the new spec. So, the offset problem won't be left to manual reset or auto reset.

saifat29 commented 2 years ago

My Druid Kafka cluster running on Docker Swarm went down due to a network failure, even though the cluster got back up and running, Druid got stuck in the "resetting offset automatically" infinite loop. It happened because Druid was trying to read from a higher offset than that was present in Kafka.

The resetOffsetAutomatically setting was already enabled, but it didn't help. I had to manually do a reset.

Anyone found a solution?

OliveBZH commented 2 years ago

Hello, I'm facing the same issue and it seems that this issue is not yet solved. Is there any workaround to avoid that pending the resolution ?

FarhadF commented 1 year ago

facing the same issue:

 2023-08-17T13:35:09,349 INFO [KafkaSupervisor-staging-campaign-install] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - [staging-campaign-install] supervisor is running.                           │
│ 2023-08-17T13:35:24,567 INFO [KafkaSupervisor-staging-campaign-install-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-staging-campaign-install-supervisor-19, groupId=st │
│ aging-campaign-install-supervisor] Seeking to latest offset of partition staging-campaign-install-2                                                                                                                                │
│ 2023-08-17T13:35:24,567 INFO [KafkaSupervisor-staging-campaign-install-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-staging-campaign-install-supervisor-19, groupId=st │
│ aging-campaign-install-supervisor] Seeking to latest offset of partition staging-campaign-install-0                                                                                                                                │
│ 2023-08-17T13:35:24,567 INFO [KafkaSupervisor-staging-campaign-install-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-staging-campaign-install-supervisor-19, groupId=st │
│ aging-campaign-install-supervisor] Seeking to latest offset of partition staging-campaign-install-1                                                                                                                                │
│ 2023-08-17T13:35:24,568 INFO [KafkaSupervisor-staging-campaign-install-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-staging-campaign-install-supervisor-19, groupId=st │
│ aging-campaign-install-supervisor] Resetting offset for partition staging-campaign-install-1 to position FetchPosition{offset=21, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[staging-kafka-2.staging │
│ -kafka-brokers.kafka.svc.cluster.local:9095 (id: 2 rack: null)], epoch=951}}.                                                                                                                                                      │
│ 2023-08-17T13:35:24,568 INFO [KafkaSupervisor-staging-campaign-install-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-staging-campaign-install-supervisor-19, groupId=st │
│ aging-campaign-install-supervisor] Resetting offset for partition staging-campaign-install-0 to position FetchPosition{offset=19, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[staging-kafka-0.staging │
│ -kafka-brokers.kafka.svc.cluster.local:9095 (id: 0 rack: null)], epoch=957}}.                                                                                                                                                      │
│ 2023-08-17T13:35:24,569 INFO [KafkaSupervisor-staging-campaign-install-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-staging-campaign-install-supervisor-19, groupId=st │
│ aging-campaign-install-supervisor] Resetting offset for partition staging-campaign-install-2 to position FetchPosition{offset=14, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[staging-kafka-1.staging │
│ -kafka-brokers.kafka.svc.cluster.local:9095 (id: 1 rack: null)], epoch=1043}}.

I can verify that the offsets that are reported by the logs are not ahead of kafka offsets for each partition. The workaround is to hard reset the supervisor but of course has the consequence of dataloss/duplicate data

sc-sityad commented 6 months ago

facing the same issue:

 2023-08-17T13:35:09,349 INFO [KafkaSupervisor-staging-campaign-install] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - [staging-campaign-install] supervisor is running.                           │
│ 2023-08-17T13:35:24,567 INFO [KafkaSupervisor-staging-campaign-install-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-staging-campaign-install-supervisor-19, groupId=st │
│ aging-campaign-install-supervisor] Seeking to latest offset of partition staging-campaign-install-2                                                                                                                                │
│ 2023-08-17T13:35:24,567 INFO [KafkaSupervisor-staging-campaign-install-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-staging-campaign-install-supervisor-19, groupId=st │
│ aging-campaign-install-supervisor] Seeking to latest offset of partition staging-campaign-install-0                                                                                                                                │
│ 2023-08-17T13:35:24,567 INFO [KafkaSupervisor-staging-campaign-install-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-staging-campaign-install-supervisor-19, groupId=st │
│ aging-campaign-install-supervisor] Seeking to latest offset of partition staging-campaign-install-1                                                                                                                                │
│ 2023-08-17T13:35:24,568 INFO [KafkaSupervisor-staging-campaign-install-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-staging-campaign-install-supervisor-19, groupId=st │
│ aging-campaign-install-supervisor] Resetting offset for partition staging-campaign-install-1 to position FetchPosition{offset=21, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[staging-kafka-2.staging │
│ -kafka-brokers.kafka.svc.cluster.local:9095 (id: 2 rack: null)], epoch=951}}.                                                                                                                                                      │
│ 2023-08-17T13:35:24,568 INFO [KafkaSupervisor-staging-campaign-install-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-staging-campaign-install-supervisor-19, groupId=st │
│ aging-campaign-install-supervisor] Resetting offset for partition staging-campaign-install-0 to position FetchPosition{offset=19, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[staging-kafka-0.staging │
│ -kafka-brokers.kafka.svc.cluster.local:9095 (id: 0 rack: null)], epoch=957}}.                                                                                                                                                      │
│ 2023-08-17T13:35:24,569 INFO [KafkaSupervisor-staging-campaign-install-Reporting-0] org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer clientId=consumer-staging-campaign-install-supervisor-19, groupId=st │
│ aging-campaign-install-supervisor] Resetting offset for partition staging-campaign-install-2 to position FetchPosition{offset=14, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[staging-kafka-1.staging │
│ -kafka-brokers.kafka.svc.cluster.local:9095 (id: 1 rack: null)], epoch=1043}}.

I can verify that the offsets that are reported by the logs are not ahead of kafka offsets for each partition. The workaround is to hard reset the supervisor but of course has the consequence of dataloss/duplicate data

Is this resolved now? Facing the same issue, but I don't want to duplicate the data and face any data loss by hard resetting. Please let me know if this is fixed I don't have any task leverage to increase the task count. So any other solution @FrankChen021