typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.37k stars 601 forks source link

Errors are not always caught with .handleErrorWith #3045

Open nikiforo opened 1 year ago

nikiforo commented 1 year ago

Take a look at https://github.com/nikiforo/fs21107

The project can be run with sbt "test:runMain ru.delimobil.FS2Example"

I expect the execution to continue inifinitely. However it ends with Errored(java.lang.IllegalArgumentException: TheException) in tens seconds.

I can further minimize the example. I've checked fs2 3.2.4, 3.3.0, both versions end with Errrored message

armanbilge commented 1 year ago

Here's the minimizer with scala-cli.

```scala //> using scala "3.2.1" //> using lib "co.fs2::fs2-core::3.3.0" package ru.delimobil import cats.MonadThrow import cats.effect.IO import cats.effect.IOApp import cats.syntax.applicative._ import cats.syntax.option._ import fs2.Stream import scala.concurrent.duration._ object FS2Example extends IOApp.Simple { case class MaybeInt(int: Option[Int]) private val empty: MaybeInt = MaybeInt(none) private val big = 2 private val small = 1 private val track = empty.copy(int = big.some) def run: IO[Unit] = { val singleRun = Stream(track, track) .repeat .through(assemble) .evalTap(checkDateValidity) .parEvalMap(10)(_ => IO.unit) .handleErrorWith(ex => Stream.exec(IO.println(s"Caught ${ex.getMessage}"))) .groupWithin(512, 1.second) .compile .drain Stream.repeatEval(singleRun).compile.drain.guaranteeCase(IO.println) } private def assemble(tracks: Stream[IO, MaybeInt]): Stream[IO, MaybeInt] = (Stream(empty) ++ tracks) .sliding(2) .map { chunk => MaybeInt(chunk.head.get.int) } .chunks .flatMap(chunk => Stream.fromOption(chunk.last)) private def checkDateValidity(updateAction: MaybeInt): IO[Unit] = { val action = MonadThrow[IO].raiseError(new IllegalArgumentException("TheException")) action.whenA(updateAction.int.exists(_ > small)) } } ```
scott-thomson239 commented 1 year ago

If no one else is currently having a look at this, can I try and take this on?

armanbilge commented 1 year ago

Go for it! I played with a little bit this morning. All I discovered was that removing the parEvalMap or the groupWithin seemed to fix it. If I remember correctly.

nikiforo commented 1 year ago

I think I excluded parEvalMap

val err = new IllegalArgumentException("TheException")

def run: IO[Unit] = {
  val singleRun =
    Stream(()).concurrently((Stream(()) ++ Stream.raiseError[IO](err)).repeat)
      .handleErrorWith(ex => Stream.exec(IO.println(s"Caught ${ex.getMessage}")))
      .groupWithin(512, 1.second)

  Stream.repeatEval(singleRun.compile.drain).compile.drain.guaranteeCase(IO.println)
}
nikiforo commented 1 year ago

The exception persists in 3.3.0, 3.2.4, 3.0.0, 2.5.4, 2.5.0, 2.1.0. But doesn't appear in 2.0.0. Git bisect might help to understand the cause of the error.

UPD: I also checked 2.2.0, 2.3.0, 2.4.0, 3.1.0; the error persists in that versions

nikiforo commented 1 year ago

This is CE2 compliant code

  def run(args: List[String]): IO[ExitCode] = {
    val singleRun =
      Stream(()).concurrently((Stream(()) ++ Stream.raiseError[IO](err)).repeat)
        .handleErrorWith(ex => Stream.eval_(IO.delay(println(s"Caught ${ex.getMessage}"))))
        .groupWithin(512, 1.second)

    Stream.repeatEval(singleRun.compile.drain).compile.drain.guaranteeCase(c => IO.delay(println(c))).map(_ => ExitCode.Success)
  }
nikiforo commented 1 year ago

I bisected 2.1.0 ... 2.0.0

659791b0fd3eef85824fbff39f8280e9931281f8 is the first bad commit
commit 659791b0fd3eef85824fbff39f8280e9931281f8
Author: Diego E. Alonso-Blas <diesalbla@gmail.com>
Date:   Sat Sep 21 01:55:06 2019 +0100

    Solves memory Leak

    Reverts changes in commit 36b1279cda415ebe8bb3faa33cb9aa3e6c6697d9.

    This commit changed the Compilation Loop was modified in a recent PR
    to use a continuation and a stack of continuations. However, this
    introduced a memory leak that could be seen running a simple:
    `Stream.range(0, 2800934).fold(0L)(_ + _).compile.last`,
    as described in Issue 1625.

    This commit reverts those commits, and thus fixes that memory leak.

:040000 040000 aadfd4614a28b579a24d0faef1c7af8fc3f394d8 e59aad789baf548716d094f07aaba388029dd02f M  core

That's the link to the commit: https://github.com/typelevel/fs2/commit/659791b0fd3eef85824fbff39f8280e9931281f8

scott-thomson239 commented 1 year ago

Sorry this is taking so long, I haven't had much luck on the little time I have had available to look at this. For just now I will unassign myself so other people can have a go as well.

diesalbla commented 1 year ago

@nikiforo Some small minimisations: the .repeat of the background process is not needed. This seems correct, since the first failure should cut the execution. Furthermore, the first part of the stream is also not needed. So instead of Stream(()).concurrently((Stream(()) ++ Stream.raiseError[IO](err)).repeat), we can also reproduce the bug with just: Stream(()).concurrently(Stream.raiseError[IO](err)). On the other hand, replacing the front-ground stream Stream() with Stream.empty prevents the error, apparently by interrupting the background execution of the error stream before it can starts.

(this was based on the master branch of https://github.com/nikiforo/fs21107)

armanbilge commented 1 year ago

we can also reproduce the bug with just

@diesalbla would you mind posting a complete reproducer? I tried as you said and it does not reproducer for me: the error is caught.

//> using lib "co.fs2::fs2-core::3.4.0"

import cats.effect._
import fs2._

object App extends IOApp.Simple {
  def run = Stream(())
    .concurrently(Stream.raiseError[IO](new Exception("ruh roh")))
    .handleErrorWith(ex => Stream.exec(IO.println(s"Caught ${ex.getMessage}")))
    .compile
    .drain
}
diesalbla commented 1 year ago

@armanbilge Here is the modified code:

  object err extends IllegalArgumentException("TheException") with NoStackTrace

  val bullet: Stream[IO, Unit] = Stream.raiseError[IO](err)

  def handler(ex: Throwable) = Stream.exec(IO.println(s"Caught TheException"))

  def run: IO[Unit] = {
    val singleRun =
      Stream(()).concurrently(bullet)
        .handleErrorWith(handler)
        .groupWithin(100, 100.millis)

    def aux: IO[Unit] = singleRun.compile.drain >> aux

    aux.guaranteeCase(IO.println)
  }

This was done using fs2 version 3.4.0. Here are the details of the JVM I used:

fs21107 % java -version
java version "19" 2022-09-20
Java(TM) SE Runtime Environment (build 19+36-2238)
Java HotSpot(TM) 64-Bit Server VM (build 19+36-2238, mixed mode, sharing)

EDIT As a further minimisation, we can write outer loop without streams, using recursion in IO.