nomisRev / kotlin-kafka

Kafka bindings for Kotlin `suspend`, and Kafka streaming operators for KotlinX Flow.
https://nomisRev.github.io/kotlin-kafka/
Apache License 2.0
103 stars 10 forks source link

Receiver Poll only once at the begining. #199

Open Morpheusse opened 1 month ago

Morpheusse commented 1 month ago

Hello!

First of all, thank you for this library!

I tried to update it from 0.3.1 to 0.4.0, but the consumer is now stuck during initialization.

After some troubleshooting, it seems that this is related to this part:

https://github.com/nomisRev/kotlin-kafka/blob/9d7604f1a1d148b14578ae10979a02242a09ed64/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/EventLoop.kt#L273-L275

I added some logger for troubleshooting:

/**
   * If we were retrying, schedule a poll and set isRetryingCommit to false
   * If we weren't retrying, do nothing.
   */
  @ConsumerThread
  private fun schedulePollAfterRetrying() {
    logger.info("TROUBLESHOOTING isRetryingCommit ${isRetryingCommit.get()}")
    if (isRetryingCommit.getAndSet(false)) poll()
  }

the schedulePollAfterRetrying is called whenever the commit succeeds or fails.

LOGS BEFORE:

2024-07-26 13:56:40.750 [reactor-kafka-sender-1563863156] INFO  o.a.kafka.common.utils.AppInfoParser - null - Kafka version: 3.6.2
2024-07-26 13:56:40.750 [reactor-kafka-sender-1563863156] INFO  o.a.kafka.common.utils.AppInfoParser - null - Kafka commitId: c4deed513057c94e
2024-07-26 13:56:40.750 [reactor-kafka-sender-1563863156] INFO  o.a.kafka.common.utils.AppInfoParser - null - Kafka startTimeMs: 1721995000749
2024-07-26 13:56:40.767 [kotlin-kafka-consumer] INFO  o.a.kafka.common.utils.AppInfoParser - null - Kafka version: 3.6.2
2024-07-26 13:56:40.767 [kotlin-kafka-consumer] INFO  o.a.kafka.common.utils.AppInfoParser - null - Kafka commitId: c4deed513057c94e
2024-07-26 13:56:40.767 [kotlin-kafka-consumer] INFO  o.a.kafka.common.utils.AppInfoParser - null - Kafka startTimeMs: 1721995000767
2024-07-26 13:56:40.774 [kotlin-kafka-consumer] INFO  o.a.k.clients.consumer.KafkaConsumer - null - [Consumer clientId=client-id-1, groupId=group-id] Subscribed to topic(s): my-topic
2024-07-26 13:56:41.030 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.Metadata - null - [Producer clientId=producer-1] Cluster ID: ldHW1d7rRiuAAbu8MMH-ZA
2024-07-26 13:56:41.031 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.p.i.TransactionManager - null - [Producer clientId=producer-1] ProducerId set to 34 with epoch 0
2024-07-26 13:56:45.960 [kotlin-kafka-consumer] INFO  c.z.k.receiver.internals.EventLoop - null - TROUBLESHOOTING isRetryingCommit false
2024-07-26 13:56:50.962 [kotlin-kafka-consumer] INFO  c.z.k.receiver.internals.EventLoop - null - TROUBLESHOOTING isRetryingCommit false
2024-07-26 13:56:55.967 [kotlin-kafka-consumer] INFO  c.z.k.receiver.internals.EventLoop - null - TROUBLESHOOTING isRetryingCommit false
2024-07-26 13:57:00.971 [kotlin-kafka-consumer] INFO  c.z.k.receiver.internals.EventLoop - null - TROUBLESHOOTING isRetryingCommit false
2024-07-26 13:57:05.976 [kotlin-kafka-consumer] INFO  c.z.k.receiver.internals.EventLoop - null - TROUBLESHOOTING isRetryingCommit false
2024-07-26 13:57:10.981 [kotlin-kafka-consumer] INFO  c.z.k.receiver.internals.EventLoop - null - TROUBLESHOOTING isRetryingCommit false
2024-07-26 13:57:15.985 [kotlin-kafka-consumer] INFO  c.z.k.receiver.internals.EventLoop - null - TROUBLESHOOTING isRetryingCommit false

I had to update this code so the consumer can poll when commitSuccess and leave the schedulePollAfterRetrying when commitFailure.

/**
   * Commit was successfully:
   *   - Set commitFailures to 0
   *   - Schedule poll if we previously were retrying to commit
   *   - Complete all the [Offset.commit] continuations
   */
  @ConsumerThread
  private fun commitSuccess(
    commitArgs: CommittableBatch.CommitArgs,
    offsets: Map<TopicPartition, OffsetAndMetadata>
  ) {
    checkConsumerThread("commitSuccess")
    if (offsets.isNotEmpty()) consecutiveCommitFailures.set(0)
-    schedulePollAfterRetrying()
+    poll()
    commitArgs.continuations?.forEach { cont ->
      cont.resume(Unit)
    }
  }

LOGS AFTER THE CHANGE:

2024-07-26 14:00:17.025 [reactor-kafka-sender-201551015] INFO  o.a.kafka.common.utils.AppInfoParser - null - Kafka version: 3.6.2
2024-07-26 14:00:17.025 [reactor-kafka-sender-201551015] INFO  o.a.kafka.common.utils.AppInfoParser - null - Kafka commitId: c4deed513057c94e
2024-07-26 14:00:17.025 [reactor-kafka-sender-201551015] INFO  o.a.kafka.common.utils.AppInfoParser - null - Kafka startTimeMs: 1721995217024
2024-07-26 14:00:17.051 [kotlin-kafka-consumer] INFO  o.a.kafka.common.utils.AppInfoParser - null - Kafka version: 3.6.2
2024-07-26 14:00:17.051 [kotlin-kafka-consumer] INFO  o.a.kafka.common.utils.AppInfoParser - null - Kafka commitId: c4deed513057c94e
2024-07-26 14:00:17.051 [kotlin-kafka-consumer] INFO  o.a.kafka.common.utils.AppInfoParser - null - Kafka startTimeMs: 1721995217051
2024-07-26 14:00:17.060 [DefaultDispatcher-worker-5] INFO  IotCoreManager - null - Initializing IotCoreManager
2024-07-26 14:00:17.073 [kotlin-kafka-consumer] INFO  o.a.k.clients.consumer.KafkaConsumer - null - [Consumer clientId=client-id-1, groupId=group-id] Subscribed to topic(s): my-topic
2024-07-26 14:00:17.339 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.Metadata - null - [Producer clientId=producer-1] Cluster ID: ldHW1d7rRiuAAbu8MMH-ZA
2024-07-26 14:00:17.340 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.p.i.TransactionManager - null - [Producer clientId=producer-1] ProducerId set to 50 with epoch 0
2024-07-26 14:00:22.241 [kotlin-kafka-consumer] INFO  c.z.k.receiver.internals.EventLoop - null - TROUBLESHOOTING isRetryingCommit false
2024-07-26 14:00:22.253 [kotlin-kafka-consumer] INFO  org.apache.kafka.clients.Metadata - null - [Consumer clientId=client-id-1, groupId=group-id] Cluster ID: ldHW1d7rRiuAAbu8MMH-ZA
2024-07-26 14:00:22.253 [kotlin-kafka-consumer] INFO  o.a.k.c.c.i.ConsumerCoordinator - null - [Consumer clientId=client-id-1, groupId=group-id] Discovered group coordinator broker:9092 (id: 2147483646 rack: null)
2024-07-26 14:00:22.254 [kotlin-kafka-consumer] INFO  o.a.k.c.c.i.ConsumerCoordinator - null - [Consumer clientId=client-id-1, groupId=group-id] (Re-)joining group
2024-07-26 14:00:22.274 [kotlin-kafka-consumer] INFO  o.a.k.c.c.i.ConsumerCoordinator - null - [Consumer clientId=client-id-1, groupId=group-id] Request joining group due to: need to re-join with the given member-id: client-id-1-15acc7e0-7a9a-4d8f-b6c7-27a7e613736f
2024-07-26 14:00:22.275 [kotlin-kafka-consumer] INFO  o.a.k.c.c.i.ConsumerCoordinator - null - [Consumer clientId=client-id-1, groupId=group-id] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)
2024-07-26 14:00:22.275 [kotlin-kafka-consumer] INFO  o.a.k.c.c.i.ConsumerCoordinator - null - [Consumer clientId=client-id-1, groupId=group-id] (Re-)joining group
2024-07-26 14:00:27.245 [kotlin-kafka-consumer] INFO  c.z.k.receiver.internals.EventLoop - null - TROUBLESHOOTING isRetryingCommit false
2024-07-26 14:00:27.247 [kotlin-kafka-consumer] INFO  o.a.k.c.c.i.ConsumerCoordinator - null - [Consumer clientId=client-id-1, groupId=group-id] Successfully joined group with generation Generation{generationId=27, memberId='client-id-1-15acc7e0-7a9a-4d8f-b6c7-27a7e613736f', protocol='range'}
2024-07-26 14:00:27.251 [kotlin-kafka-consumer] INFO  o.a.k.c.c.i.ConsumerCoordinator - null - [Consumer clientId=client-id-1, groupId=group-id] Finished assignment for group at generation 27: {client-id-1-15acc7e0-7a9a-4d8f-b6c7-27a7e613736f=Assignment(partitions=[my-topic-0, my-topic-1, my-topic-2])}
2024-07-26 14:00:27.259 [kotlin-kafka-consumer] INFO  o.a.k.c.c.i.ConsumerCoordinator - null - [Consumer clientId=client-id-1, groupId=group-id] Successfully synced group in generation Generation{generationId=27, memberId='client-id-1-15acc7e0-7a9a-4d8f-b6c7-27a7e613736f', protocol='range'}
2024-07-26 14:00:27.260 [kotlin-kafka-consumer] INFO  o.a.k.c.c.i.ConsumerCoordinator - null - [Consumer clientId=client-id-1, groupId=group-id] Notifying assignor about the new Assignment(partitions=[my-topic-0, my-topic-1, my-topic-2])
2024-07-26 14:00:27.261 [kotlin-kafka-consumer] INFO  o.a.k.c.c.i.ConsumerCoordinator - null - [Consumer clientId=client-id-1, groupId=group-id] Adding newly assigned partitions: my-topic-0, my-topic-1, my-topic-2
2024-07-26 14:00:27.280 [kotlin-kafka-consumer] INFO  o.a.k.c.c.i.ConsumerCoordinator - null - [Consumer clientId=client-id-1, groupId=group-id] Setting offset for partition my-topic-2 to the committed offset FetchPosition{offset=160, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[broker:9092 (id: 1 rack: null)], epoch=0}}
2024-07-26 14:00:27.280 [kotlin-kafka-consumer] INFO  o.a.k.c.c.i.ConsumerCoordinator - null - [Consumer clientId=client-id-1, groupId=group-id] Setting offset for partition my-topic-0 to the committed offset FetchPosition{offset=124, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[broker:9092 (id: 1 rack: null)], epoch=0}}
2024-07-26 14:00:27.280 [kotlin-kafka-consumer] INFO  o.a.k.c.c.i.ConsumerCoordinator - null - [Consumer clientId=client-id-1, groupId=group-id] Setting offset for partition my-topic-1 to the committed offset FetchPosition{offset=120, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[broker:9092 (id: 1 rack: null)], epoch=0}}

I am not sure if my change is the right one, though.