fd4s / fs2-kafka

Functional Kafka Streams for Scala
https://fd4s.github.io/fs2-kafka
Apache License 2.0
296 stars 101 forks source link

Stale consumers after JVM shutdown #1341

Open misja-alma opened 3 months ago

misja-alma commented 3 months ago

Our application runs on 2 Kubernetes pods and uses fs2-kafka for several consumers; most of the time we have 8 consumers in total per topic, 4 consumers on each pod, every consumer subscribed to its own partition. When our pods are scaled down and up again, i.e. when our application is restarted, we often find that for one topic, some 'zombie' consumers are still hanging around on the broker after the restart. For this topic we see a mix of new consumers from both restarted pods, plus a few consumers from one pod from before the restart. At least, that's what the broker shows, in reality the old pod has long died and those consumers don't exist anymore. However since the broker is unaware of this, the stale consumers are still assigned to partitions, which means that new messages from those partitions are not consumed anymore and a lag builds up.

The topic where this happens is not always the same, also sometimes a restart doesn't give any problems. And the workaround is simply to restart our pods once more, usually the stale consumers are gone after that.

We suspect that the consumer might not have unsubscribed properly during JVM shutdown, but this is just a guess: we don't do any unsubscribing by ourselves, we completely rely on fs2-kafka for this. We usually subscribe like this:

    KafkaConsumer
      .stream(consumerSettings)
      .evalTap(_.subscribeTo(topic))
      .flatMap(_.stream)
      .evalTap(x => log(show"Received ${x.record.value} from $topic", logLevel))
      .groupWithin(groupWithin.messages, groupWithin.duration)
      .evalMap(processRecords)
      .evalMap(CommittableOffsetBatch.fromFoldable(_).commit.whenA(commitOffset))

and we don't have any code to explicitly shutdown or unsubscribe upon JVM exit.

Could it be that we are not using fs2-kafka in the correct way, and that this is causing the random stale consumers? Or is this perhaps a known issue?

aartigao commented 3 months ago

Even if FS2 is failing to unsubscribe, this is mostly a Kafka Broker issue.

Which value do you have set for session.timeout.ms?

Regardless of the lib used (FS2, raw KafkaConsumer, etc) the broker should free those partitions and reassign them to another instance live in the Consumer Group

misja-alma commented 2 months ago

We left session.timeout.ms at its default, so 45000 (45s).

We have reason to suspect that the problem is on our side, because other application that are using Akka Kafka clients and are on the same broker do not have this problem.

aartigao commented 2 months ago

You can try to set org.apache.kafka logs to DEBUG level. Here you'll see if the lib is unsubscribing or not. But if it's not, I'd bet on some resource shutdown order issue in your app. We have multiple projects using FS2 at $work at these graceful shutdown correctly.