Closed nbali closed 1 year ago
I just noticed that despite having 'org.apache.kafka:kafka-clients:3.2.0'
in the POM, the generated jar and every dependency tool output if I deploy it with 2.44
it still uses 'org.apache.kafka:kafka-clients:2.4.1'
, meanwhile 2.45
uses 3.2.0
as expected. I'm not really sure why.
Anyway as I have modified the 3.2.0
to 2.4.1
the issue stopped happening in newer versions as well.
It turns out these changes were introduced in 3.2.0
due to https://issues.apache.org/jira/browse/KAFKA-12980
https://github.com/apache/kafka/blob/3.1.2/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1238-L1251
vs
https://github.com/apache/kafka/blob/3.2.0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1242-L1260
This is a breaking change for beam
. Either the implementation has to follow-up on the changed consumer implementation (basically how I described before), or it should fail-fast with kafka consumers with version>=3.2.0
.
Thanks so much for the detailed investigation.
Essentially what was fixed with the watermark for streaming jobs with #24205 has to be also fixed for the tracker/restriction/offset.
CC: @johnjcasey who did the fix for the watermark. A fix for the tracker would hopefully resolve this long standing issue.
the generated jar and every dependency tool output if I deploy it with 2.44 it still uses 'org.apache.kafka:kafka-clients:2.4.1'
kafka client is a provided version for Beam: https://github.com/apache/beam/blob/9c614557c51ad55230211f864e70f48ad0914326/sdks/java/io/kafka/build.gradle#L68
should be able to assign different versions. Not sure why it still picked the default 2.4.1
Also, this reminds me that kafka version is also important for KafkaIO issue. We saw incoming issues that did not capture by our tests. We may want to update the default kafka version also.
I'm aware that it is provided
, but I can't find any difference that causes the different behaviour between 2.44 and 2.45, and why would 2.45 use my custom version from my pom.xml, and 2.44 would use 2.4.1 no matter what I do. Anyway I quickly checked 2.43/2.44/2.45, and it seems that when I launch the flex template all 3 shows Kafka version: 3.2.0
in the Job logs
and 2.45 shows that in the Worker logs
too, but both 2.43 and 2.44 shows Kafka version: 2.4.1
.
Thank you for this. I agree this is a breaking change.
I'd rather not do a failfast. I think 2 or 3 are the least intrusive, as we can then remove it when there is a fix in kafka. I prefer 2, but I'm also not sure about the gaurentees provided by consumer.position()
Well, if we won't explicitly forbid using certain kafka versions with a failfast, that means we have to support even the currently "bugged" versions. That means "as we can then remove it when there is a fix in kafka" will never happen. So if the goal is to have an implementation that supports pre-3.2.0, the current bugged versions, the possibly fixed versions, and pretty much anything that might be changed by kafka implementation details then I would say none of the 5 proposal works perfectly.
I would say the most generic solution would be to call the consumer.poll(KAFKA_POLL_TIMEOUT)
in a loop as well, and break the loop only if we received a rawRecords
that isn't empty OR the KAFKA_POLL_TIMEOUT
is over. That essentially simulates pre-3.2.0 behaviour.
True. I hadn't thought through how my statements contradicted each other.
I think doing the pre 3.2.0 simulation is a good short term fix, though I'm beginning to believe we will eventually want to stop supporting older Kafka versions
@johnjcasey https://github.com/apache/beam/pull/26142
encounter the same issue!!!
@gabrywu isn't it stuck in a different way?
@gabrywu isn't it stuck in a different way?
need more investigation on it. if a flink task manager OOM, it's stuck when the task restart
@nbali our kafka reader stuck after this error
org.apache.kafka.clients.FetchSessionHandler [] - [Consumer clientId=xxxconsumer-1, groupId=xxxconsumer] Error sending fetch request (sessionId=1286546196, epoch=546094) to node 4: org.apache.kafka.common.errors.DisconnectException: null
Since kafka reader stuck, it seems that all the TM stuck, all following checkpoint expired and failed eventually. We must restart flink job to resolve it.
@johnjcasey should I create a new issue if the issue still there?
Yes, please create a new issue. This existing issue was fixed, even if the symptoms are similar
What happened?
Scenario: Batch Kafka ingestion (
.withStartReadTime()
,.withStopReadTime()
, experiments:beam_fn_api
,unsafely_attempt_to_process_unbounded_data_in_batch_mode
,shuffle_mode=appliance
) reading a few billions / few hundred gb of kafka records from a few hundred ktable topic partitions. Some of these partitions has a tendency to get stuck.The issue started happening with 2.45 (still do with 2.46), so my original idea was that there must have been some change introduced in 2.45 that caused this. ... but after some investigation my guess is that previously it was just hidden/supressed, but the issue already existed and some change only revealed/emphasized it.
So I think the problematic part is this: https://github.com/apache/beam/blob/ddae966f3346fbe247486324cbf8a8a532895316/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L383-L388
What causes the issue is that the
consumer.poll
can returnConsumerRecords
that contains noKafkaRecord
- so we assume we have reached the end - when there are still remaining records to be consumed. This happens because if you check the kafka consumer implementation ( https://github.com/apache/kafka/blob/f79c2a6e04129832eacec60bbc180173ecc37549/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1259-L1276 and https://github.com/apache/kafka/blob/f79c2a6e04129832eacec60bbc180173ecc37549/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java#L102-L104 ) you can see that thepoll()
will consider aFetch
"successful" if the "position has advanced" even if thenumRecords
is 0.I'm not completely sure this is the reason, but totally seems plausible to me: So lets assume we have KTable kafka topics with key compaction turned on. Lets assume they exist for a long time, and has a very huge compaction rate (only a few percent of the offsets contain a record.) It is not hard to imagine there are huge ranges of offsets that has no records. Well this is the case with our topics.
So what I see is that it tries to poll, gets back an "empty"
ConsumerRecords
(0 record, butpositionAdvanced
is true), and returns with aProcessContinuation.resume()
. This keeps repeating forever.The loop happens because, we stop the current consumption with a return, and the next
processElement()
willseek()
the consumer to thestartOffset
(akatracker.currentRestriction().getFrom()
https://github.com/apache/beam/blob/ddae966f3346fbe247486324cbf8a8a532895316/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L376 ), but in the previousprocessElement()
we didn't notify the tracker of the progress at all, so thestartOffset
stays the same.Essentially what was fixed with the watermark for streaming jobs with https://github.com/apache/beam/pull/24205 has to be also fixed for the tracker/restriction/offset.
In order to solve this we have to know if the "empty"
ConsumerRecord
is really at the end of the stream, or it's just at a "gap".So what I came up with as some possible solutions:
ConsumerRecords
contains some distinction between the two types of "empty". (Essentially representingpositionAdvanced
, or possible the consumer position.) - No idea if they will accept it, and even if they do it's going to take the longest to include in beamconsumer.position()
before and afterconsumer.poll()
to figure out if we have "advanced" or not. - It can be used to detect the advancement, but not sure if the position of the consumer always correlates with the end of the returned range or it might differ so if it can be used to update the tracker or notConsumerRecords
toConsumerRecords.EMPTY
- I would say this is ugly as it's an implementation detail of the Kafka library, but we can test if they change it with unit tests((HasProgress) tracker).getProgress().getWorkRemaining()
is positive. - I'm not sure how up-to-date that information is, but it can certainly indicate if we are in a huge gap in the middle of the processing..stop()
when they reach the end of the range anyway. - This will fix this issue only when consuming kafka in batch pipelines, but this could most likely happen with streaming pipelines too if they use the same SDF implementation.Once we know what case we encountered, we can either continue the
while (true)
loop andpoll
the next batch, or if we have the position/offset, we can update the tracker and return with.resume()
.FYI increasing batch/poll size by increasing
ConsumerConfig.FETCH_MAX_BYTES_CONFIG
,ConsumerConfig.MAX_POLL_RECORDS_CONFIG
, etc alleviates the issue, but doesn't fix it completely.Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components