zio / zio-kafka

A Kafka client for ZIO and ZIO Streams
https://zio.dev/zio-kafka
Apache License 2.0
334 stars 133 forks source link

Runaway stream when `max.poll.interval.ms` is exceeded #1262

Open myazinn opened 1 week ago

myazinn commented 1 week ago

Here's a self-contained test that reproduces the issue

package zio.kafka.consumer

import zio._
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.serde.Serde
import zio.kafka.testkit.KafkaTestUtils._
import zio.kafka.testkit.{ Kafka, KafkaRandom }
import zio.test.TestAspect._
import zio.test._

import java.util.concurrent.atomic.AtomicInteger

object ZombieConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
  override val kafkaPrefix: String = "zombieconsumespec"

  override def spec: Spec[TestEnvironment with Scope, Throwable] =
    suite("ZombieConsumerSpec")(
      test("should not leave runaway stream") {
        val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i"))
        for {
          topic  <- randomTopic
          client <- randomClient
          group  <- randomGroup

          _ <- produceMany(topic, kvs)

          iteration = new AtomicInteger(0)
          result <- Consumer
                      .plainStream(Subscription.Topics(Set(topic)), Serde.string, Serde.string)
                      .mapZIO { r =>
                        ZIO.suspendSucceed {
                          val iter           = iteration.incrementAndGet()
                          val processingTime = if (iter == 1) 10.seconds else 1.second
                          ZIO.sleep(processingTime).as(r -> iter)
                        }
                      }
                      .mapZIO { case (r, iter) =>
                        ZIO.debug(s"[$iter] Committing offset ${r.offset.offset}") *> r.offset.commit
                      }
                      .take(99)
                      .runDrain
                      .debug("Test Result")
                      .retryN(1)
                      .exit
                      .provideSomeLayer[Kafka](consumer(client, Some(group)))
        } yield assertTrue(result.isSuccess)
      }
    )
      .provideSome[Kafka](producer)
      .provideShared(Kafka.embedded) @@ withLiveClock @@ timeout(1.minute)

  def consumer(
    clientId: String,
    groupId: Option[String] = None,
    maxPollInterval: Duration = 2.seconds,
    `max.poll.records`: Int = 5
  ): ZLayer[Kafka, Throwable, Consumer] =
    (ZLayer(
      consumerSettings(
        clientId = clientId,
        groupId = groupId,
        maxPollInterval = maxPollInterval,
        `max.poll.records` = `max.poll.records`
      )
    ) ++
      ZLayer.succeed(Diagnostics.NoOp)) >>> Consumer.live

}

It models a scenario when event handling takes more time than max.poll.interval.ms value (which happens for us from time to time :( ). The expected behaviour seems to be that once the poll interval is exceeded, stream is interrupted along with all child fibers. So each time we "retry" Kafka stream, it should behave like it starts from scratch and nothing happened. But that's not what we see here. Once max.poll.interval.ms is exceeded, zio-kafka "forgets" the stream and re-subscribes to Kafka. But the "forgotten" stream is not actually dead and messes with RunLoop state. Here's what you'll get if you run the test

+ ZombieConsumerSpec
[1] Committing offset 0
<FAIL> Test Result: Fail(org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.,Stack trace for thread "zio-fiber-":
)
[2] Committing offset 0
[3] Committing offset 1
[4] Committing offset 2
[5] Committing offset 3
<FAIL> Test Result: Fail(zio.kafka.consumer.Consumer$CommitTimeout$: Commit timeout,Stack trace for thread "zio-fiber-131":
    at zio.kafka.consumer.internal.Runloop.commit(Runloop.scala:263)
    at zio.kafka.consumer.ZombieConsumerSpec.spec(ZombieConsumerSpec.scala:38)
    at zio.kafka.consumer.ZombieConsumerSpec.spec(ZombieConsumerSpec.scala:42))

Note that 1) We commit a record with offset 0 twice, even though the first iteration should've never gone that far. 2) That commit from a first iteration eventually breaks the working stream with mysterious CommitTimeout exception. The fail itself could be ok, but the exact exception is extremely misleading.

And there's more. In a more "real-world" scenario it could lead to stream hanging indefinitely. Here's how (leaving only the important part)

        val kvs = (1 to 100).toList.map(i => (s"key$i", s"msg$i"))
        for {
          topic1 <- randomTopic
          topic2 <- randomTopic
          client <- randomClient
          group  <- randomGroup

          _ <- produceMany(topic1, kvs)
          _ <- produceMany(topic2, kvs)

          iteration = new AtomicInteger(0)
          result <- Consumer
                      .partitionedStream(Subscription.Topics(Set(topic1, topic2)), Serde.string, Serde.string)
                      .flatMapPar(Byte.MaxValue) { case (_, stream) =>
                        stream.mapZIO { r =>
                          ZIO.suspendSucceed {
                            val iter           = iteration.incrementAndGet()
                            val processingTime = if (iter == 1) 10.seconds else 1.second
                            ZIO.sleep(processingTime).as(r -> iter)
                          }
                        }.mapZIO { case (r, iter) =>
                          ZIO.debug(
                            s"[$iter] Committing offset ${r.offset.offset} from ${r.offset.topicPartition}"
                          ) *> r.offset.commit
                        }
                      }
                      .take(99)
                      .runDrain
                      .debug("Test Result")
                      .retryN(1)
                      .exit
                      .provideSomeLayer[Kafka](consumer(client, Some(group)))
        } yield assertTrue(result.isSuccess)

Here we have two topics, one of which is perfectly fine and the second one would fail on first iteration. Eventually that "broken" iteration will fail both streams, and test will never complete. Here's what you'll get when you run the test

+ ZombieConsumerSpec
[2] Committing offset 0 from zombieconsumespec-topic-3920c775-d472-4cd1-a669-ac232de392ef-0
[3] Committing offset 1 from zombieconsumespec-topic-3920c775-d472-4cd1-a669-ac232de392ef-0
[4] Committing offset 2 from zombieconsumespec-topic-3920c775-d472-4cd1-a669-ac232de392ef-0
[1] Committing offset 0 from zombieconsumespec-topic-e8e5c3c3-0464-42bc-8739-b5b0d5e28778-0
<FAIL> Test Result: Fail(zio.kafka.consumer.Consumer$CommitTimeout$: Commit timeout,Stack trace for thread "zio-fiber-117":
    at zio.kafka.consumer.internal.Runloop.commit(Runloop.scala:263)
    at zio.kafka.consumer.ZombieConsumerSpec.spec(ZombieConsumerSpec.scala:40)
    at zio.kafka.consumer.ZombieConsumerSpec.spec(ZombieConsumerSpec.scala:33))
21:12:21.080 [ZScheduler-Worker-2] [] WARN  zio-slf4j-logger - Test ZombieConsumerSpec - should not leave runaway stream has taken more than 1 m to execute. If this is not expected, consider using TestAspect.timeout to timeout runaway tests for faster diagnostics.
  - should not leave runaway stream
Timeout of 1 m exceeded.
0 tests passed. 1 tests failed. 0 tests ignored.

And that's what've actually encountered :( It seems that it is caused by a workaround for this issue, but I'm not sure. Let me know if you need anything else from my side

erikvanoosten commented 1 week ago

Hi @myazinn . Thanks for the extensive report! Very useful.

This is complicated stuff, I have to think about it more before I can give a good answer. For now: because zio-kafka doesn't run the streams (the user does) zio-kafka does not have full control over them. In particular we cannot interrupt the stream at any moment, only when it needs to fetch more records. In the example this happens every 5 records. This explains why offset 0 is committed even though processing took 10 seconds. Unfortunately, the commit does not complete, otherwise the retry would skip offset 0. I have to think a bit more on why the first consumer does not completely close and even tries to complete the commit (and eventually fails).

erikvanoosten commented 1 week ago

As a quick fix: it is perfectly fine to raise maxPollInterval. Set it to a couple of hours if you need to, a day, 2 days, all fine. The only downside is that the broker will take longer to detect a dead-locked consumer. But hopefully you have other guards against this already.

myazinn commented 1 week ago

Thanks @erikvanoosten! Yeah, that's what we've decided to do as well (increasing maxPollInterval, and also removing .retry on a stream just in case). I understand that's it's quite tricky to interrupt a Stream which you don't fully control. One solution that I could think of is to expose some Promise that the user can race against and interrupt his own code immediately, but it seems like an awkward API and it's unlikely that anyone would even know about it. So I just hope that eventually someone will find the solution which will work automatically