zio / interop-cats

ZIO instances for cats-effect type classes
Apache License 2.0
157 stars 67 forks source link

Memory leaks while using fs2-kafka consumer #687

Closed vkorchik closed 7 months ago

vkorchik commented 7 months ago

I have a ZIO-based app but for kafka I am still using fs2-kafka. As of interops I was using zio-interop-cats:23.0.0.0. Everything was fine.

After bumping all the libs to the newest versions I got memory leak. After some investigation I thought memory leak could be due to inconsistency of cats-effect versions: the newest one is 3.5.3 but zio-interops-cats are compiled against 3.4.8. Starting from 3.5.3 there are some crucial changes in runtime semantics so I have rollbacked versions a bit so they are the same throughout the whole stack, but memory leak still appears.

I have examined different versions of cats/interops/zio but no success.

After even more debugging I see that the consumer side is leaking: once consumer consumes events it does something wrong. A lot of objects of zio.ZIO$EvaluationStep$UpdateRuntimeFlags$$anon$28 are created, but never released. Number is just growing.

Versions that are working fine:

Versions that I am testing against and getting leaks:

Versions that are leaking are many but basically all of them are based on cats-effect:3.4.x.

This is the link to the repo and to the heap dump from leaking run.

And also picture of heap usage. Once heap is close to the current max size of heap it just breaks through it and this never ends until all heap is used. image

Appreciate any help regarding this issue.

vkorchik commented 7 months ago

After some more tests I have found out that problem started at zio-interop-cats:23.0.0.4 and persists till the newest version. 23.0.0.3 works fine for no matter which version of zio as well as fs2-kafka.

To be even more precise, the reason is actually bump to zio greater than 2.0.10. Any version above affects code (Consumer in my case) so the memory is leaking (amount of zio.ZIO$EvaluationStep$UpdateRuntimeFlags$$anon$28 is constantly growing).

vkorchik commented 7 months ago

After even more tests I have found the commit/PR and the exact line of code that has broken cats-interop back then and my example as well. It was one of the PRs of 2.0.11 release.

If I would revert this line (and also the other two dependent lines - 2575 and 5044 of code, otherwise compile fails) my example works just fine. But adding to a working example just the problematic line (not touching those two others 2575 and 5044) will break the code again. So obviously the problem is somehow connected to line 2589.

Maybe the actual code that has to be fixed is in cats-interop itself, but I am not able to track it down, and the only thing I was able to find is actually the exact change that caused my example to fail.

vkorchik commented 7 months ago

@adamgfraser , I want to add some new findings here.

It was confusing to me how changing type from Fiber.Runtime to FiberRuntime helps to run streams without memory leak. And now I know it. Since the memory leak is observed on consumer side, and leak was not present on interop:23.0.0.3 and below. The thing is that ZIO in project was of version above 2.0.10. And since signature of several methods was changed (like makeFiberChild) and consumer is using those methods...there was simply a MethodNotFoundException. Which was not printed out since the process was forked (I found this error later, much later). Consumer was not started - memory is not leaking. Yes, I know, stupid mistake of mine.

TLDR, I have made wrong conclusion. Using more specific type does not help us. Having that said - the memory leak is still there.

I am testing older versions of ZIO and interops to find a 100% working solution. I will put any new information here.

calvinlfer commented 7 months ago

Hey @adamgfraser, just to echo @vkorchik I have also tried to build the latest 2.0.x release with the fix you made and I want to also confirm that @vkorchik findings are accurate, the memory leak is still present even with the changed type. Another reproducer is here that @vkorchik and I worked on, https://github.com/calvinlfer/kafka-zio-integration-leak-reproducer/blob/main/src/main/scala/com/kaizensolutions/experiments/ReproducerFs2KafkaZIO.scala

Also cc’ing @neko-kai or @joroKr21 for their expertise

joroKr21 commented 7 months ago

Sorry, it doesn't ring a bell for me.

jdegoes commented 7 months ago

/bounty $500

algora-pbc[bot] commented 7 months ago

πŸ’Ž $500 bounty created by ZIO πŸ’Ž $500 bounty created by kaizen-solutions

πŸ™‹ If you start working on this, comment /attempt #687 along with your implementation plan πŸ‘‰ To claim this bounty, submit a pull request that includes the text /claim #687 somewhere in its body πŸ“ Before proceeding, please make sure you can receive payouts in your country πŸ’΅ Payment arrives in your account 2-5 days after the bounty is rewarded πŸ’― You keep 100% of the bounty award πŸ™ Thank you for contributing to zio/interop-cats!

πŸ‘‰ Add a bounty β€’ Share on socials

Attempt Started (GMT+0) Solution
🟒 @vkorchik Feb 17, 2024, 10:51:43 AM #688
vkorchik commented 7 months ago

It seems to me that issue first appeared in interop:23.0.0.2 and after this PR to be precise.

And what causes the issue is switching from parentFiber.scope to FiberScope.global according to cats-effect semantics.

I have switched back from global scope to parent one, and the heap usage is stable for me (that zio.ZIO$EvaluationStep$UpdateRuntimeFlags$$anon$28 also does not grow), comparing to what we have seen above in this issue and in discord thread.

image

I am not a PRO and it is hard for me to explain why parent scope vs global scope makes such a difference.

Anyway, I will create PR soon, so everyone can check it out and share thoughts, and we can compare the results for both zio 2.0.x as well as 2.1.x.

PS: should I use /attempt #687 tag?

neko-kai commented 7 months ago

Thanks for investigating this! Seems like this should be fixed in ZIO Core. Moving race to parent scope breaks cats-effect semantics, e.g. it prohibits the loser fiber from continuing to run if the parent fiber is interrupted after raceWith has finished.

vkorchik commented 7 months ago

@neko-kai thanks for explanation. Failing tests in my PR show this, I think. Simply switching from global to parent scope (as it was before 23.0.0.2) will make the app work, but bunch of tests are failing, those for checking interruptions. It could have been a quick fix, but not necessarily a good and proper one πŸ™‚

Also I am not sure I can tackle with this issue on my own, I am really very novice here, so, if you know how to fix it, please, go on πŸ’ͺ

algora-pbc[bot] commented 7 months ago

:gem: kaizen-solutions added a $500 bounty into the prize pool!

vkorchik commented 7 months ago

I have added explicit onInterrupt to raceFibersWith as was before the change with scopes, and it seems working for now, my memory consumption tests are still in progress. Will share results asap.

PS: results are here.

algora-pbc[bot] commented 7 months ago

πŸ’‘ @vkorchik submitted a pull request that claims the bounty. You can visit your bounty board to reward.

calvinlfer commented 7 months ago

Hello again, I think it's safe to close this unless you all say otherwise (i.e. if there's anything more to be done in ZIO core?)

Here is my reproducer with interop 23.1.0.1 (which has @vkorchik's changes) and zio 2.0.21:

import fs2.Stream
import fs2.kafka.*
import zio.{durationInt as _, *}
import zio.interop.catz.*
import zio.logging.backend.SLF4J

import scala.concurrent.duration.*

// Leak
object ReproducerFs2KafkaZIO extends ZIOAppDefault:
  override val bootstrap: ZLayer[ZIOAppArgs, Any, Any] =
    Runtime.removeDefaultLoggers ++ SLF4J.slf4j

  val topic = "example-topic"

  val consumer: Stream[Task, Unit] =
    val settings =
      ConsumerSettings[Task, String, String]
        .withBootstrapServers("localhost:9092")
        .withGroupId("test-consumer-group-id-2")
        .withAutoOffsetReset(AutoOffsetReset.Earliest)

    KafkaConsumer
      .stream[Task, String, String](settings)
      .evalTap(_.subscribeTo(topic))
      .stream
      .mapChunks(_.map(_.offset))
      .through(commitBatchWithin[Task](2048, 10.seconds))

  val producer: Stream[Task, ProducerResult[String, String]] =
    val settings =
      ProducerSettings[Task, String, String]
        .withBootstrapServers("localhost:9092")
        .withBatchSize(128)
        .withAcks(Acks.One)
        .withEnableIdempotence(false)
        .withRetries(128)

    val producerPipe = KafkaProducer.pipe[Task, String, String](settings)

    Stream
      .iterate[Task, Long](0L)(_ + 1L)
      .map(n => ProducerRecord(topic = topic, key = s"key: $n", value = s"value: $n"))
      .chunkN(n = 128, allowFewer = true)
      .map(ProducerRecords[fs2.Chunk, String, String])
      .through(producerPipe)

  override val run: Task[Unit] =
    Stream(producer, consumer).parJoinUnbounded.compile.drain

As you can see below, nice and flat, stable heap usage: image

Prior to this change, I was seeing the following: image

So this definitely fixed it

algora-pbc[bot] commented 7 months ago

πŸŽ‰πŸŽˆ @vkorchik has been awarded $500 by kaizen-solutions.io! 🎈🎊

algora-pbc[bot] commented 7 months ago

πŸŽ‰πŸŽˆ @vkorchik has been awarded $500 by ZIO! 🎈🎊