We observed that the KafkaSupervisor won't enter idle mode after processing all messages even though no new messages are produced in inactiveAfterMillis duration.
We traced this problem down to cases were the last message has been produced inside a kafka transaction.
In this case, the offset lag reported in the supervisor status only reduces to a minium of 1 instead of 0 when all messages have been processed.
This is caused by currentOffsets being less than latestOffsets in the supervisor status.
Our assumption is that the offset lag is wrongly computed because the latestOffsets might point to kafka transaction markers (more on that in section Mutual error cause).
Note: This issue only appears when the last message has been written as part of a kafka transaction. The lastestOffsets will point to the offset after the last written (possibly aborted) message which corresponds to the transaction marker.
When producing a non-transactional message after an arbitrary amount of transactional messages the lag is correctly calculated as 0 and idle mode is entered.
Minimal example
The following is tested against a single topic & single partition configuration.
See last section for a full reproduction of this behaviour.
Consider we insert only one single message into a kafka topic as part of a kafka transaction.
This message will have the offset 0.
As the kafka supervisor reports the next offset to be processed we would expect the supervisor to report currentOffsets == latestOffsets == 1.
But the status is actually reported as:
currentOffsets (actually next to-be-processed offset): 1
latestOffsets (expected next to-be-processed offset): 2
This leads to a lag of 1 which should indicate that there is 1 message to consume.
But as there are no more messages this is unexpected behaviour.
We further found out that it is possible to further increment the latestOffsets when producing messages in an aborted transaction.
The currentOffsets are not changed by this as they seems to correctly report the offset of the last processed message +1.
But latestOffsets is now reported as 4, leading to a lag of 3 but there are still no new messages to process.
Updated status after producing another message and aborting transaction
We can rule out issues with the isolation.level because the latestOffsets only get updated once the transaction is either committed or aborted. So the process seems to adhere to isolation.level = read_committed.
By producing a new message without a transaction it is now possible to reduce the lag to 0.
This will result in currentOffsets == latestOffsets == 5 (after processing just 2 valid messages!).
Based on the sudden jumps in the offset we repeated the experiment consuming the topic in isolation.level = read_uncommitted mode in a separate process.
This results in the following offsets after the respective produce calls:
Produce call Offset
(1) first message (transaction committed) [0]
(2) failed message (transaction aborted) [2]
(3) non-transactional message (no transaction) [4]
This raises the question what happened with the offsets 1 and 3 which is where the Kafka transaction comes in.
These offsets are used by kafka for the transaction markers which signal if the transaction was committed or aborted.
These messages are not intended to be processed by the consumer as a usual message.
So actually the topic looks like this:
Produce call Offset Relevant position for current/lastestOffsets @ produce call
(1) first message (transaction committed) [0] currentOffset (1, 2)
(1) [TRANSACTION COMMITTED] [1] latestOffset (1)
(2) failed message (transaction aborted) [2]
(2) [TRANSACTION ABORTED] [3] latestOffset (2)
(3) non-transactional message (no transaction) [4] currentOffset (3), latestOffset (3)
Unfortunately the KafkaSupervisor seems to use the offsets of the transaction markers to determine the latestOffsets.
The transaction marker offsets will never be processed by the ingestion task and can therefore never be considered for the currentOffsets.
This is why we are left with a partition lag > 0 even though there aren't any messages left to process.
This makes it impossible for the current implementation of the idle mode to detect that there are no more messages to process (if the lastestOffsets point to a transaction marker). The supervisor will just keep running and spawning tasks forever.
Possible solution
The method for determining latestOffsets needs to be updated so that it determines the latest available offset that can be consumed by the indexing task (especially ignoring transaction markers).
Unfortunately I was not able to identify the exact code sections that would need to be adapted.
But it will very likely have something to do with KafkaRecordSupplier.seekToLatest (src) or where it is called by the supervisor.
Edit (2024-07-05)
According to the docs for KafkaConsumer.seekToEnd [docs] (which is used in KafkaRecordSupplier.seekToLatest):
If isolation.level=read_committed, the end offset will be the Last Stable Offset, i.e., the offset of the first message with an open transaction.
What if - in the case of a committed transaction - the offset of the first message with an open transaction points to the next offset after the transaction marker?
Given the last consumable message of a transaction would have offset the offset X, the druid supervisor stats would result in:
latestOffsets: X + 2 -> last stable offset (next offset after the transaction marker).
This could explain the observed behaviour.
The question is, if the last stable offset can really point to a non-existent offset (if there is no open transaction yet).
Unfortunately we have no abilities to debug this in an actual druid environment to confirm or deny this assumption.
Is there somebody who could double-check this?
Reproduction
Code examples (python) have authentication details redacted. Adapt the snippets according to your setup.
Create an empty kafka topic test-idle-mode
Insert single message in transaction -> it will have offset 0
[`Python`] Produce message in transaction
```python
# Produce one message in a transaction
import json
from confluent_kafka import Producer
producer = Producer({
"bootstrap.servers": "",
..., # auth redacted
"transactional.id": "idletest"
})
producer.init_transactions()
producer.begin_transaction()
producer.produce(
topic="test-idle-mode",
key="",
value=json.dumps(
{"__time": 1577833200000, "value": 1}
),
on_delivery=print,
)
producer.commit_transaction()
producer.flush()
producer.poll(0)
```
Create a KafkaSupervisor based on the topic test-idle-mode
Affected Version
Reproduced with:
Description
We observed that the
KafkaSupervisor
won't enter idle mode after processing all messages even though no new messages are produced ininactiveAfterMillis
duration. We traced this problem down to cases were the last message has been produced inside a kafka transaction.In this case, the offset lag reported in the supervisor status only reduces to a minium of 1 instead of 0 when all messages have been processed. This is caused by
currentOffsets
being less thanlatestOffsets
in the supervisor status. Our assumption is that the offset lag is wrongly computed because thelatestOffsets
might point to kafka transaction markers (more on that in section Mutual error cause).Note: This issue only appears when the last message has been written as part of a kafka transaction. The
lastestOffsets
will point to the offset after the last written (possibly aborted) message which corresponds to the transaction marker. When producing a non-transactional message after an arbitrary amount of transactional messages the lag is correctly calculated as 0 and idle mode is entered.Minimal example
The following is tested against a single topic & single partition configuration. See last section for a full reproduction of this behaviour.
Consider we insert only one single message into a kafka topic as part of a kafka transaction. This message will have the offset 0. As the kafka supervisor reports the next offset to be processed we would expect the supervisor to report
currentOffsets == latestOffsets == 1
.But the status is actually reported as:
currentOffsets
(actually next to-be-processed offset): 1latestOffsets
(expected next to-be-processed offset): 2This leads to a lag of 1 which should indicate that there is 1 message to consume. But as there are no more messages this is unexpected behaviour.
Example status after single message with offset 0
```json { "dataSource": "test-idle-mode", "stream": "test-idle-mode", "partitions": 1, "replicas": 1, "durationSeconds": 3600, "activeTasks": [ { "id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih", "startingOffsets": { "0": 0 }, "startTime": "2024-07-03T15:04:34.022Z", "remainingSeconds": 3568, "type": "ACTIVE", "currentOffsets": { "0": 1 }, "lag": { "0": 1 } } ], "publishingTasks": [], "latestOffsets": { "0": 2 }, "minimumLag": { "0": 1 }, "aggregateLag": 1, "offsetsLastUpdated": "2024-07-03T15:05:02.179Z", "suspended": false, "healthy": true, "state": "RUNNING", "detailedState": "RUNNING", "recentErrors": [] } ```
We further found out that it is possible to further increment the
latestOffsets
when producing messages in an aborted transaction. ThecurrentOffsets
are not changed by this as they seems to correctly report the offset of the last processed message +1. ButlatestOffsets
is now reported as 4, leading to a lag of 3 but there are still no new messages to process.Updated status after producing another message and aborting transaction
```json { "dataSource": "test-idle-mode", "stream": "test-idle-mode", "partitions": 1, "replicas": 1, "durationSeconds": 3600, "activeTasks": [ { "id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih", "startingOffsets": { "0": 0 }, "startTime": "2024-07-03T15:04:34.022Z", "remainingSeconds": 1607, "type": "ACTIVE", "currentOffsets": { "0": 1 }, "lag": { "0": 3 } } ], "publishingTasks": [], "latestOffsets": { "0": 4 }, "minimumLag": { "0": 3 }, "aggregateLag": 3, "offsetsLastUpdated": "2024-07-03T15:37:32.514Z", "suspended": false, "healthy": true, "state": "RUNNING", "detailedState": "RUNNING", "recentErrors": [] } ```
We can rule out issues with the
isolation.level
because thelatestOffsets
only get updated once the transaction is either committed or aborted. So the process seems to adhere toisolation.level = read_committed
.By producing a new message without a transaction it is now possible to reduce the lag to 0. This will result in
currentOffsets == latestOffsets == 5
(after processing just 2 valid messages!).Status after producing non-transactional message
```json { "dataSource": "test-idle-mode", "stream": "test-idle-mode", "partitions": 1, "replicas": 1, "durationSeconds": 3600, "activeTasks": [ { "id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih", "startingOffsets": { "0": 0 }, "startTime": "2024-07-03T15:04:34.022Z", "remainingSeconds": 917, "type": "ACTIVE", "currentOffsets": { "0": 5 }, "lag": { "0": 0 } } ], "publishingTasks": [], "latestOffsets": { "0": 5 }, "minimumLag": { "0": 0 }, "aggregateLag": 0, "offsetsLastUpdated": "2024-07-03T15:49:02.588Z", "suspended": false, "healthy": true, "state": "RUNNING", "detailedState": "RUNNING", "recentErrors": [] } ```
Mutual error cause
Based on the sudden jumps in the offset we repeated the experiment consuming the topic in
isolation.level = read_uncommitted
mode in a separate process.This results in the following offsets after the respective produce calls:
This raises the question what happened with the offsets 1 and 3 which is where the Kafka transaction comes in. These offsets are used by kafka for the transaction markers which signal if the transaction was committed or aborted. These messages are not intended to be processed by the consumer as a usual message.
So actually the topic looks like this:
Unfortunately the
KafkaSupervisor
seems to use the offsets of the transaction markers to determine thelatestOffsets
. The transaction marker offsets will never be processed by the ingestion task and can therefore never be considered for thecurrentOffsets
. This is why we are left with a partition lag > 0 even though there aren't any messages left to process.This makes it impossible for the current implementation of the idle mode to detect that there are no more messages to process (if the
lastestOffsets
point to a transaction marker). The supervisor will just keep running and spawning tasks forever.Possible solution
The method for determining
latestOffsets
needs to be updated so that it determines the latest available offset that can be consumed by the indexing task (especially ignoring transaction markers).Unfortunately I was not able to identify the exact code sections that would need to be adapted. But it will very likely have something to do with
KafkaRecordSupplier.seekToLatest
(src) or where it is called by the supervisor.Edit (2024-07-05) According to the docs for
KafkaConsumer.seekToEnd
[docs] (which is used inKafkaRecordSupplier.seekToLatest
):What if - in the case of a committed transaction - the offset of the first message with an open transaction points to the next offset after the transaction marker? Given the last consumable message of a transaction would have offset the offset
X
, the druid supervisor stats would result in:currentOffsets
:X + 1
(processed message + 1 = transaction marker).latestOffsets
:X + 2
-> last stable offset (next offset after the transaction marker).This could explain the observed behaviour. The question is, if the last stable offset can really point to a non-existent offset (if there is no open transaction yet). Unfortunately we have no abilities to debug this in an actual druid environment to confirm or deny this assumption. Is there somebody who could double-check this?
Reproduction
Code examples (python) have authentication details redacted. Adapt the snippets according to your setup.
Create an empty kafka topic
test-idle-mode
Insert single message in transaction -> it will have offset 0
[`Python`] Produce message in transaction
```python # Produce one message in a transaction import json from confluent_kafka import Producer producer = Producer({ "bootstrap.servers": "",
..., # auth redacted
"transactional.id": "idletest"
})
producer.init_transactions()
producer.begin_transaction()
producer.produce(
topic="test-idle-mode",
key="",
value=json.dumps(
{"__time": 1577833200000, "value": 1}
),
on_delivery=print,
)
producer.commit_transaction()
producer.flush()
producer.poll(0)
```
Create a KafkaSupervisor based on the topic
test-idle-mode
Ingestion spec
```json { "type": "kafka", "spec": { "dataSchema": { "dataSource": "test-idle-mode", "timestampSpec": { "column": "__time", "format": "millis", "missingValue": null }, "dimensionsSpec": { "dimensions": [ { "type": "long", "name": "value", "multiValueHandling": "SORTED_ARRAY", "createBitmapIndex": false } ], "dimensionExclusions": [ "__time" ], "includeAllDimensions": false, "useSchemaDiscovery": false }, "metricsSpec": [], "granularitySpec": { "type": "uniform", "segmentGranularity": "DAY", "queryGranularity": { "type": "none" }, "rollup": false, "intervals": [] }, "transformSpec": { "filter": null, "transforms": [] } }, "ioConfig": { "topic": "test-idle-mode", "topicPattern": null, "inputFormat": { "type": "json", "keepNullColumns": false, "assumeNewlineDelimited": false, "useJsonNodeReader": false }, "replicas": 1, "taskCount": 1, "taskDuration": "PT3600S", "consumerProperties": { "bootstrap.servers":,
"druid.dynamic.config.provider": {
"type": "environment",
"variables": {
}
}
},
"autoScalerConfig": null,
"pollTimeout": 100,
"startDelay": "PT5S",
"period": "PT30S",
"useEarliestOffset": true,
"completionTimeout": "PT1800S",
"lateMessageRejectionPeriod": null,
"earlyMessageRejectionPeriod": null,
"lateMessageRejectionStartDateTime": null,
"configOverrides": null,
"idleConfig": {
"enabled": true,
"inactiveAfterMillis": 60000
},
"stopTaskCount": null,
"stream": "test-idle-mode",
"useEarliestSequenceNumber": true
},
"tuningConfig": {
"type": "kafka",
"appendableIndexSpec": {
"type": "onheap",
"preserveExistingMetrics": false
},
"maxRowsInMemory": 150000,
"maxBytesInMemory": 0,
"skipBytesInMemoryOverheadCheck": false,
"maxRowsPerSegment": 5000000,
"maxTotalRows": null,
"intermediatePersistPeriod": "PT10M",
"maxPendingPersists": 0,
"indexSpec": {
"bitmap": {
"type": "roaring"
},
"dimensionCompression": "lz4",
"stringDictionaryEncoding": {
"type": "utf8"
},
"metricCompression": "lz4",
"longEncoding": "longs"
},
"indexSpecForIntermediatePersists": {
"bitmap": {
"type": "roaring"
},
"dimensionCompression": "lz4",
"stringDictionaryEncoding": {
"type": "utf8"
},
"metricCompression": "lz4",
"longEncoding": "longs"
},
"reportParseExceptions": false,
"handoffConditionTimeout": 900000,
"resetOffsetAutomatically": false,
"segmentWriteOutMediumFactory": null,
"workerThreads": null,
"chatRetries": 8,
"httpTimeout": "PT10S",
"shutdownTimeout": "PT80S",
"offsetFetchPeriod": "PT30S",
"intermediateHandoffPeriod": "P2147483647D",
"logParseExceptions": false,
"maxParseExceptions": 2147483647,
"maxSavedParseExceptions": 0,
"numPersistThreads": 1,
"skipSequenceNumberAvailabilityCheck": false,
"repartitionTransitionDuration": "PT120S"
}
},
"context": null,
"suspended": false
}
```
When checking the supervisor after the task started, the
currentOffsets
are 1 (last message offset 0 + 1 = 1) butlatestOffsets
is 2 instead of 1.[`Status`] Example status after single message with offset 0
This is the same as in section "Example". ```json { "dataSource": "test-idle-mode", "stream": "test-idle-mode", "partitions": 1, "replicas": 1, "durationSeconds": 3600, "activeTasks": [ { "id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih", "startingOffsets": { "0": 0 }, "startTime": "2024-07-03T15:04:34.022Z", "remainingSeconds": 3568, "type": "ACTIVE", "currentOffsets": { "0": 1 }, "lag": { "0": 1 } } ], "publishingTasks": [], "latestOffsets": { "0": 2 }, "minimumLag": { "0": 1 }, "aggregateLag": 1, "offsetsLastUpdated": "2024-07-03T15:05:02.179Z", "suspended": false, "healthy": true, "state": "RUNNING", "detailedState": "RUNNING", "recentErrors": [] } ```
Insert single message in transaction but abort it -> unavailable message will have offset 2
[`Python`] Produce message and abort transaction
```python # Produce one message in a transaction import json from confluent_kafka import Producer producer = Producer({ "bootstrap.servers": "",
..., # auth redacted
"transactional.id": "idletest"
})
producer.init_transactions()
producer.begin_transaction()
producer.produce(
topic="test-idle-mode",
key="",
value=json.dumps(
{"__time": 1577833201000, "value": 1}
),
on_delivery=print,
)
# Make sure message is sent to broker before aborting the transaction
producer.flush()
producer.poll(0)
producer.abort_transaction()
producer.flush()
producer.poll(0)
```
The supervisor status still reports
currentOffsets == 1
butlatestOffsets
is now 4.[`Status`] Updated status after producing another message and aborting transaction
```json { "dataSource": "test-idle-mode", "stream": "test-idle-mode", "partitions": 1, "replicas": 1, "durationSeconds": 3600, "activeTasks": [ { "id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih", "startingOffsets": { "0": 0 }, "startTime": "2024-07-03T15:04:34.022Z", "remainingSeconds": 1607, "type": "ACTIVE", "currentOffsets": { "0": 1 }, "lag": { "0": 3 } } ], "publishingTasks": [], "latestOffsets": { "0": 4 }, "minimumLag": { "0": 3 }, "aggregateLag": 3, "offsetsLastUpdated": "2024-07-03T15:37:32.514Z", "suspended": false, "healthy": true, "state": "RUNNING", "detailedState": "RUNNING", "recentErrors": [] } ```
Produce a non-transactional message (will have offset 4)
[`Python`] Produce message without transaction
```python # Produce one message without transaction import json from confluent_kafka import Producer producer = Producer({ "bootstrap.servers": "",
..., # auth redacted
})
producer.produce(
topic="test-idle-mode",
key="",
value=json.dumps(
{"__time": 1577833202000, "value": 1}
),
on_delivery=print,
)
producer.flush()
producer.poll(0)
```
New supervisor status with
currentOffsets == latestOffsets == 5
(now finally able to enter idle mode)[`Status`] Status after producing non-transactional message
```json { "dataSource": "test-idle-mode", "stream": "test-idle-mode", "partitions": 1, "replicas": 1, "durationSeconds": 3600, "activeTasks": [ { "id": "index_kafka_test-idle-mode_f982fc91af103bc_nehpmfih", "startingOffsets": { "0": 0 }, "startTime": "2024-07-03T15:04:34.022Z", "remainingSeconds": 917, "type": "ACTIVE", "currentOffsets": { "0": 5 }, "lag": { "0": 0 } } ], "publishingTasks": [], "latestOffsets": { "0": 5 }, "minimumLag": { "0": 0 }, "aggregateLag": 0, "offsetsLastUpdated": "2024-07-03T15:49:02.588Z", "suspended": false, "healthy": true, "state": "RUNNING", "detailedState": "RUNNING", "recentErrors": [] } ```