zio / zio-kafka

A Kafka client for ZIO and ZIO Streams
https://zio.dev/zio-kafka
Apache License 2.0
334 stars 133 forks source link

Partition lost not recovering, possible issue with `RunLoop` #1250

Open josdirksen opened 4 weeks ago

josdirksen commented 4 weeks ago

This might be related to the https://github.com/zio/zio-kafka/issues/1233 issue, but the last couple of weeks / months we see issues where after a partition is lost, it isn't recovering correctly. We've tried to analyze or debug it, but this occurs so infrequently that we haven't been able to isolate it.

By analyzing the code we might have identified the reason, but there is so much async stuff happening there, that we might be interpreting stuff wrongly.

What happens in our case is the following:

  1. Services are consuming from n partitions. Note that these are partitions for a topic that has intervals with little to no traffic.
  2. At a certain point, usually in the middle of the night, there is some network issue where kafka-client identifies the partitions as lost.
  3. We see these lost partitions in the logging, but partitions aren't being recovered, and we also see no errors in our own client code that the streams have failed.

For partitions that are revoked everything seems to be working correctly though.

What we see as possible cause for this is this. In the Runloop this happens for lost partitions:

      onLost = lostTps =>
        for {
          _              <- ZIO.logDebug(s"${lostTps.size} partitions are lost")
          rebalanceEvent <- lastRebalanceEvent.get
          state          <- currentStateRef.get
          lostStreams = state.assignedStreams.filter(control => lostTps.contains(control.tp))
          _ <- ZIO.foreachDiscard(lostStreams)(_.lost)
          _ <- lastRebalanceEvent.set(rebalanceEvent.onLost(lostTps))
          _ <- ZIO.logTrace(s"onLost done")
        } yield ()

Resulting in this call in the PartitionStreamControl:

  private[internal] def lost: UIO[Boolean] = {
    val lostException = new RuntimeException(s"Partition ${tp.toString} was lost") with NoStackTrace
    interruptionPromise.fail(lostException)
  }

Looking at the way the interruptionPromise is handled this doesn't seem to work correctly when there are no records to be processed. In PartitionStreamControl we've got this repeating effect:

                 ZStream.repeatZIOChunk {
                   // First try to take all records that are available right now.
                   // When no data is available, request more data and await its arrival.
                   dataQueue.takeAll.flatMap(data => if (data.isEmpty) requestAndAwaitData else ZIO.succeed(data))
                 }.flattenTake.chunksWith { s =>
                   s.tap(records => registerPull(queueInfo, records))
                     // Due to https://github.com/zio/zio/issues/8515 we cannot use Zstream.interruptWhen.
                     .mapZIO(chunk => interruptionPromise.await.whenZIO(interruptionPromise.isDone).as(chunk))
                 }

And here the interruptionPromise is checked to see if we need to interrupt this effect. But, how would this work if there are no active chunks to process? The requestAndAawaitData function:

      requestAndAwaitData =
        for {
          _     <- commandQueue.offer(RunloopCommand.Request(tp))
          _     <- diagnostics.emit(DiagnosticEvent.Request(tp))
          taken <- dataQueue.takeBetween(1, Int.MaxValue)
        } yield taken

Blocks the current fiber until at least 1 element is taken. So when the lost function fails the promise, that promise is never checked, since there are no records coming in on the dataQueue (or I'm reading stuff wrong here, which is of course also possible).

For the revoke flow, the dataQueue gets an additional Take.end), to get out of therequestAndAwaitDatawait state. But that doesn't happen for thelost` scenario.

So, shouldn't the code for lost also make sure the dataQueue at least gets some value, since it seems to be stuck in the requestAndAwaitData loop indefinitely.

erikvanoosten commented 4 weeks ago

Thanks @josdirksen , that is some awesome spelunking there. I think you have found all the right places in the code, and also your analysis seems correct.

We can fix requestAndAwaitData by racing together with interruptionPromise.await:

      requestAndAwaitData =
        for {
          _     <- commandQueue.offer(RunloopCommand.Request(tp))
          _     <- diagnostics.emit(DiagnosticEvent.Request(tp))
          taken <- dataQueue
                     .takeBetween(1, Int.MaxValue)
                     .race(interruptionPromise.await)  // <-- added race here
        } yield taken

should do it.

However, I am beginning to wonder if we should fail the stream like this at all! It seems that lost partitions are more common then we thought. I was always working under the assumption that lost partitions are 'end of the world' type of situations, e.g. network splits, and network lost for a log time, where any processing that is still going on should be aborted ASAP.

Perhaps we should return to the situation we had before, where we treated a lost partition the same as a revoked partition. OR, we could treat it as a revoked partition when the internal queues are empty anyway... 🤔