nomisRev / kotlin-kafka

Kafka bindings for Kotlin `suspend`, and Kafka streaming operators for KotlinX Flow.
https://nomisRev.github.io/kotlin-kafka/
Apache License 2.0
103 stars 10 forks source link

KafkaReceiver throws java.util.ConcurrentModificationException when running with IO dispatcher #195

Open winkey728 opened 3 months ago

winkey728 commented 3 months ago

I use this library in my Ktor project, and runs the KafkaReceiver with IO dispatcher, and sets commitStrategy = CommitStrategy.BySize(100), it throws java.util.ConcurrentModificationException. The full stacktrace is

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: kotlin-kafka-test-group-id-1 @coroutine#8, id: 31) otherThread(id: 33)
    at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.acquire(LegacyKafkaConsumer.java:1226)
    at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.acquireAndEnsureOpen(LegacyKafkaConsumer.java:1207)
    at org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.commitAsync(LegacyKafkaConsumer.java:740)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1117)
    at io.github.nomisRev.kafka.receiver.internals.EventLoop.commitAsync(EventLoop.kt:320)
    at io.github.nomisRev.kafka.receiver.internals.EventLoop.commit(EventLoop.kt:299)
    at io.github.nomisRev.kafka.receiver.internals.EventLoop.access$commit(EventLoop.kt:63)
    at io.github.nomisRev.kafka.receiver.internals.EventLoop$scheduleCommitIfRequired$1.invokeSuspend(EventLoop.kt:287)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:104)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

I can also reproduce this exception with the following test code:

@Test
  fun `All acknowledged messages are committed on flow completion`() = withTopic {
    publishToKafka(topic, produced)
    withContext(Dispatchers.IO) { // Make it runs in IO dispatcher
      val receiver = KafkaReceiver(
        receiverSetting().copy(
          commitStrategy = CommitStrategy.BySize(count)
        )
      )
      receiver.receive(topic.name())
        .take(count)
        .collectIndexed { index, value ->
          if (index == lastIndex) {
            assertEquals(0, receiver.committedCount(topic.name()))
            value.offset.acknowledge()
          } else value.offset.acknowledge()
        }

      assertEquals(receiver.committedCount(topic.name()), count.toLong())
    }
  }