nomisRev / kotlin-kafka

Kafka bindings for Kotlin `suspend`, and Kafka streaming operators for KotlinX Flow.
https://nomisRev.github.io/kotlin-kafka/
Apache License 2.0
106 stars 10 forks source link

[0.4.0] Regression in receiver #191

Open felixkrull-neuland opened 7 months ago

felixkrull-neuland commented 7 months ago

When using 0.4.0, the Kafka receiver doesn't correctly connect to the Kafka cluster in certain situations. I've seen this in a test using a Kafka test container: the first test using a particular container instance would always fail because it doesn't receive any messages in the expected time. However, any tests run after the first would (usually) succeed, regardless of order. I managed to create a minimal sample that reproduces the problem: https://github.com/felixkrull-neuland/kotlin-kafka-minimal-repro

With 0.3.1, the test succeeds. With 0.4.0, it fails. Judging by the logs, it looks like some part of the cluster (the consumer offsets topic?) isn't set up on the first request, but 0.4.0 doesn't retry in the way 0.3.1 did.

Excerpt when running with 0.3.1:

15:25:29.250 DEBUG - [LegacyKafkaConsumer.java:163] - [Consumer clientId=consumer-test-1, groupId=test] Initializing the Kafka consumer
15:25:29.254 INFO  - [KafkaMetricsCollector.java:297] - initializing Kafka metrics collector
15:25:29.275 INFO  - [AppInfoParser.java:124] - Kafka version: 3.7.0
15:25:29.275 INFO  - [AppInfoParser.java:125] - Kafka commitId: 2ae524ed625438c5
15:25:29.275 INFO  - [AppInfoParser.java:126] - Kafka startTimeMs: 1712669129275
15:25:29.276 DEBUG - [LegacyKafkaConsumer.java:256] - [Consumer clientId=consumer-test-1, groupId=test] Kafka consumer initialized
15:25:29.292 INFO  - [LegacyKafkaConsumer.java:475] - [Consumer clientId=consumer-test-1, groupId=test] Subscribed to topic(s): topic
15:25:29.293 DEBUG - [AbstractCoordinator.java:904] - [Consumer clientId=consumer-test-1, groupId=test] Sending FindCoordinator request to broker localhost:45653 (id: -1 rack: null)
15:25:29.303 INFO  - [Metadata.java:349] - [Consumer clientId=consumer-test-1, groupId=test] Cluster ID: XfzGHIQbQm-YtX_qZY_wAw
15:25:29.327 DEBUG - [AbstractCoordinator.java:917] - [Consumer clientId=consumer-test-1, groupId=test] Received FindCoordinator response ClientResponse(receivedTimeMs=1712669129326, latencyMs=32, disconnected=false, timedOut=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=consumer-test-1, correlationId=0, headerVersion=2), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='', nodeId=0, host='', port=0, coordinators=[Coordinator(key='test', nodeId=-1, host='', port=-1, errorCode=15, errorMessage='')]))
15:25:29.327 DEBUG - [AbstractCoordinator.java:944] - [Consumer clientId=consumer-test-1, groupId=test] Group coordinator lookup failed: 
15:25:29.327 DEBUG - [AbstractCoordinator.java:300] - [Consumer clientId=consumer-test-1, groupId=test] Coordinator discovery failed, refreshing metadata
org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
15:25:29.398 TRACE - [LegacyKafkaConsumer.java:683] - [Consumer clientId=consumer-test-1, groupId=test] Polling for fetches with timeout 0
15:25:29.399 DEBUG - [AbstractCoordinator.java:904] - [Consumer clientId=consumer-test-1, groupId=test] Sending FindCoordinator request to broker localhost:45653 (id: 1 rack: null)
15:25:29.405 DEBUG - [AbstractCoordinator.java:917] - [Consumer clientId=consumer-test-1, groupId=test] Received FindCoordinator response ClientResponse(receivedTimeMs=1712669129405, latencyMs=6, disconnected=false, timedOut=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=consumer-test-1, correlationId=4, headerVersion=2), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='', nodeId=0, host='', port=0, coordinators=[Coordinator(key='test', nodeId=1, host='localhost', port=45653, errorCode=0, errorMessage='')]))
15:25:29.405 INFO  - [AbstractCoordinator.java:936] - [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:45653 (id: 2147483646 rack: null)
15:25:29.407 DEBUG - [ConsumerCoordinator.java:707] - [Consumer clientId=consumer-test-1, groupId=test] Executing onJoinPrepare with generation -1 and memberId 
15:25:29.407 DEBUG - [AbstractCoordinator.java:1471] - [Consumer clientId=consumer-test-1, groupId=test] Heartbeat thread started
15:25:29.407 INFO  - [AbstractCoordinator.java:604] - [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
15:25:29.407 DEBUG - [ConsumerCoordinator.java:256] - [Consumer clientId=consumer-test-1, groupId=test] Joining group with current subscription: [topic]
<...>

And with 0.4.0, it just gives up and then gets stopped due to the 5-second timeout:

15:12:00.018 DEBUG - [LegacyKafkaConsumer.java:163] - [Consumer clientId=consumer-test-1, groupId=test] Initializing the Kafka consumer
15:12:00.021 INFO  - [KafkaMetricsCollector.java:297] - initializing Kafka metrics collector
15:12:00.042 INFO  - [AppInfoParser.java:124] - Kafka version: 3.7.0
15:12:00.043 INFO  - [AppInfoParser.java:125] - Kafka commitId: 2ae524ed625438c5
15:12:00.043 INFO  - [AppInfoParser.java:126] - Kafka startTimeMs: 1712668320042
15:12:00.043 DEBUG - [LegacyKafkaConsumer.java:256] - [Consumer clientId=consumer-test-1, groupId=test] Kafka consumer initialized
15:12:00.050 INFO  - [LegacyKafkaConsumer.java:475] - [Consumer clientId=consumer-test-1, groupId=test] Subscribed to topic(s): topic
15:12:00.051 DEBUG - [AbstractCoordinator.java:904] - [Consumer clientId=consumer-test-1, groupId=test] Sending FindCoordinator request to broker localhost:42635 (id: -1 rack: null)
15:12:00.059 INFO  - [Metadata.java:349] - [Consumer clientId=consumer-test-1, groupId=test] Cluster ID: e-Y1nUBjTgOPtG4C8IaZvA
15:12:00.079 DEBUG - [AbstractCoordinator.java:917] - [Consumer clientId=consumer-test-1, groupId=test] Received FindCoordinator response ClientResponse(receivedTimeMs=1712668320078, latencyMs=26, disconnected=false, timedOut=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=4, clientId=consumer-test-1, correlationId=0, headerVersion=2), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='', nodeId=0, host='', port=0, coordinators=[Coordinator(key='test', nodeId=-1, host='', port=-1, errorCode=15, errorMessage='')]))
15:12:00.079 DEBUG - [AbstractCoordinator.java:944] - [Consumer clientId=consumer-test-1, groupId=test] Group coordinator lookup failed: 
15:12:00.079 DEBUG - [AbstractCoordinator.java:300] - [Consumer clientId=consumer-test-1, groupId=test] Coordinator discovery failed, refreshing metadata
org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
15:12:00.156 TRACE - [LegacyKafkaConsumer.java:683] - [Consumer clientId=consumer-test-1, groupId=test] Polling for fetches with timeout 0
15:12:04.987 DEBUG - [ConsumerNetworkClient.java:190] - [Consumer clientId=consumer-test-1, groupId=test] Received user wakeup
15:12:04.988 TRACE - [LegacyKafkaConsumer.java:1123] - [Consumer clientId=consumer-test-1, groupId=test] Closing the Kafka consumer
<...>
nomisRev commented 7 months ago

Hey @felixkrull-neuland,

Thank you for this report! Super useful, I did upgrade to Kafka x.7.0 last week and haven't had a chance to work around this particular x.7.0 release. Not sure if it's that, or on kotlin-kafka, but can easily check with your reproducer!

Thank you, will report back soon-ish ☺️

nomisRev commented 7 months ago

Very strange that wasn't covered by the tests... I ran the test suite 100x before releasing 🤔

nomisRev commented 7 months ago

I was looking into this issue, but I had a week of PTO so this fell flat. On my extremely brief investigating, I am currently still looking at a change in Kafka itself, or a change in configuration properties (from library side).

I am not entirely sure if Gradle explicit version proofed Kafka version change was not the culprit, I have had a lot of issues with Gradle explicit versions.

See also, https://github.com/nomisRev/kotlin-kafka/issues/173.