typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.38k stars 603 forks source link

parEvalMap and it's variations hang on canceled effect #3486

Open enlait opened 1 month ago

enlait commented 1 month ago

fs2 version 3.10.2

this code does not terminate:

Stream.unit // number of elements is irrelevant, as long as there are any; does not reproduce on Stream.empty
  .covary[IO]
  .parEvalMap(2)(_ => IO.canceled)
  .compile.drain

while there is no issue with evalMap:

Stream.unit
  .covary[IO]
  .evalMap(_ => IO.canceled)
  .compile.drain

even worse, adding interruptAfter after parEvalMap does not make the code terminate:

Stream.unit
  .covary[IO]
  .parEvalMap(2)(_ => IO.canceled)
  .interruptAfter(5.seconds)
  .compile.drain

it is not necessary for all effects to be cancelled, one is enough. This also hangs:

    Stream(IO.unit, IO.canceled)
      .covary[IO]
      .parEvalMap(2)(identity)
      .compile.drain

expected behavior:

while it's not obvious what to hope for in relation to concurrently executing effects - whether they should also cancel or not - one would expect overall termination semantics to be the same across evalMap and parEvalMap. It is also difficult to reason how to work around such situation.

ValdemarGr commented 1 month ago

I think the fix is that the line here should be:

F.start(stop.get.race(action).guarantee(releaseAndCheckCompletion))