zio / zio-kafka

A Kafka client for ZIO and ZIO Streams
https://zio.dev/zio-kafka
Apache License 2.0
337 stars 138 forks source link

RejectedExecutionException: Unable to run zio.internal.FiberRuntime - During shutdown #1036

Closed xkrillex closed 10 months ago

xkrillex commented 1 year ago

zio-kafka 2.4.1 zio 2.0.13

Here is a small reproducer:

object Reproducer extends ZIOAppDefault {

  val consumerSettings = ConsumerSettings(List("localhost:9092"))
    .withGroupId("test_12345")
    .withCloseTimeout(30000.milliseconds)
    .withPollTimeout(50.milliseconds)
    .withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest))
    .withProperties(
      ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG -> "45000",
      ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG -> "30000",
      ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG -> "3000",
      ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG -> "10000",
      ConsumerConfig.FETCH_MIN_BYTES_CONFIG -> "1",
      ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "500",
      ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG -> "300000",
    )
  override def run: ZIO[Environment with ZIOAppArgs with Scope, Nothing, Any] = {
    (for {
      _ <- ZIO.serviceWithZIO[AdminClient](_.createTopic(NewTopic("test_topic_2", 1, 1)))
      _ <- Producer.produce(new ProducerRecord("test_topic_2", "key_1", "value_1"), Serde.string, Serde.string)
      record <- Consumer
        .plainStream(Topics(Set("test_topic_2")), Serde.string, Serde.string)
        .mapZIO {
          record =>
            ZIO.logDebug(s"Processing record with value ${record.record.value()}") *>
              record.offset.commitOrRetry(Schedule.spaced(100.milliseconds) && Schedule.recurs(10)).as(record)
        }
        .runHead
      _ <- ZIO.logInfo(s"Record consumed: $record")
      _ <- ZIO.serviceWithZIO[AdminClient](_.deleteTopic("test_topic_2"))
    } yield ())
      .provide(
        AdminClient.live,
        ZLayer.succeed(AdminClientSettings(List("localhost:9092"))),
        ZLayer.scoped(Consumer.make(consumerSettings)),
        ZLayer.scoped(Producer.make(ProducerSettings(List("localhost:9092")))),
        LogLayers.layer
      )
      .exitCode
  }

}

With following error during shutdown:

Exception in thread "main" Exception in thread "zio-fiber-1" java.util.concurrent.RejectedExecutionException: Unable to run zio.internal.FiberRuntime@5c523d6f
    at zio.Executor.submitOrThrow(Executor.scala:79)
Caused by: java.util.concurrent.RejectedExecutionException: Unable to run zio.internal.FiberRuntime@5c523d6f
    at zio.Executor.submitOrThrow(Executor.scala:79)
    at zio.internal.FiberRuntime.drainQueueLaterOnExecutor(FiberRuntime.scala:249)
    at zio.internal.FiberRuntime.tell(FiberRuntime.scala:1404)
    at zio.internal.FiberRuntime.$anonfun$interruptAsFork$1(FiberRuntime.scala:130)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:904)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1174)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.evaluateEffect(FiberRuntime.scala:381)
    at zio.internal.FiberRuntime.evaluateMessageWhileSuspended(FiberRuntime.scala:504)
    at zio.internal.FiberRuntime.drainQueueOnCurrentThread(FiberRuntime.scala:220)
    at zio.internal.FiberRuntime.run(FiberRuntime.scala:139)
    at zio.internal.ZScheduler$$anon$4.run(ZScheduler.scala:476)
joelvim commented 10 months ago

Hello, we are encountering the same problem, in a different context : we have multiple ZIO that we run in parallel, to make it simple, let's say we have the following ZIO :

kafkaConsumerOnCluster1 <&> apiServerWithKafkaProducer <&> kafkaConsumerOnCluster2

if any ZIO fails, the program hangs with a similar stacktrace as mentioned in this issue :

Exception in thread "main" Exception in thread "zio-fiber-1" java.util.concurrent.RejectedExecutionException: Unable to run zio.internal.FiberRuntime@5eab1cd1
    at zio.Executor.submitOrThrow(Executor.scala:79)
Caused by: java.util.concurrent.RejectedExecutionException: Unable to run zio.internal.FiberRuntime@5eab1cd1
    at zio.Executor.submitOrThrow(Executor.scala:79)
    at zio.internal.FiberRuntime.drainQueueLaterOnExecutor(FiberRuntime.scala:249)
    at zio.internal.FiberRuntime.tell(FiberRuntime.scala:1404)
    at zio.internal.FiberRuntime.$anonfun$interruptAsFork$1(FiberRuntime.scala:130)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:904)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1174)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890)
    at zio.internal.FiberRuntime.evaluateEffect(FiberRuntime.scala:381)
    at zio.internal.FiberRuntime.evaluateMessageWhileSuspended(FiberRuntime.scala:504)
    at zio.internal.FiberRuntime.drainQueueOnCurrentThread(FiberRuntime.scala:220)
    at zio.internal.FiberRuntime.run(FiberRuntime.scala:139)
    at zio.internal.ZScheduler$$anon$4.run(ZScheduler.scala:478)

The problem is that, as the process does not complete, the pod remains up and we have no way to restart or stop the kafka consumer ZIO. We tried to remove the kafka consumer ZIO, and the problem disappears, so it is the responsible.

We encountered this with zio-kafka 2.3.4and 2.6.0 and zio 2.0.18.

erikvanoosten commented 10 months ago

This bug should be solved with zio-kafka 2.7.0 with #1109. Please re-open this issue when you still see the problem.