typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.38k stars 603 forks source link

`groupWithin` prematurely closes resources #3477

Open Jasper-M opened 2 months ago

Jasper-M commented 2 months ago

Code:

import scala.concurrent.duration._
import cats.effect.{IO, Resource}
import fs2.Stream
import cats.effect.unsafe.implicits.global

Stream.resource(
  Resource.make(IO.println("start"))(_ => IO.println("stop"))
)
.flatMap(_ => Stream(1,2,3,4).covary[IO].metered(100.millis))
.take(2)
.evalTap(IO.println)
.groupWithin(2, 1.second)
.evalTap(IO.println)
.compile
.drain
.unsafeRunSync()

Result:

start
1
2
stop
Chunk(1, 2)

The resource is already closed before the stage after groupWithin gets to process the chunk. So this can leak a closed resource.

ValdemarGr commented 2 months ago

Some of the fs2 combinators don't respect resources. The cause is that the foreground stream is compiled separately here.

If possible, can you structure your stream like this:

Stream
  .resource(
    Resource.make(IO.println("start"))(_ => IO.println("stop"))
  )
  .flatMap { _ =>
    Stream(1, 2, 3, 4)
      .covary[IO]
      .metered(100.millis)
      .take(2)
      .evalTap(IO.println)
      .groupWithin(2, 1.second)
      .evalTap(IO.println)
  }
  .compile
  .drain

I think a resource safe version of most combinators can be implemented via a more expressive Pull api, something like the following.

object Pull2 {
  // Option = Is there anything to lease now
  // Resource = Try to take a lease
  // Boolean = Is the resource still available; Was the Scope's resource closed between the Pull and when the resource was opened?
  def lease[F[_]]: Pull[F, Nothing, Option[Resource[F, Boolean]]] = ???
}

This would allow transferal of resource ownership across separately compiled streams.

// very contrived example which shows that ownership can cross through a channel
def leaseChunks[F[_], A](implicit
  F: Async[F]
): fs2.Pipe[F, A, (Chunk[A], F[Unit])] = stream =>
  Stream.resource(Supervisor[F]).flatMap { sup =>
    Stream
      .eval {
        concurrent.Channel.bounded[F, (Chunk[A], F[Unit])](1)
      }
      .flatMap { chan =>
        def send(c: Chunk[A]): Pull[F, Nothing, Unit] = for {
          r <- Pull2.lease[F]
          _ <- Pull.eval {
            r match {
              case None => F.unit
              case Some(r) =>
                F.uncancelable { poll =>
                  poll(r.allocated).flatMap {
                    case (false, _) => F.unit
                    case (true, release) =>
                      sup.supervise(Async[F].never[Unit].onCancel(release)).flatMap { fib =>
                        poll(chan.send((c, fib.cancel))).void
                      }
                  }
                }
            }
          }
        } yield ()

        val bg = stream.chunks.flatMap(c => send(c).stream).onFinalize(chan.close.void)

        chan.stream.concurrently(bg)
      }
  }

I have an issue open here regarding this.

Jasper-M commented 2 months ago

In this simple example that I adapted a little bit, using Pull.extendScopeTo actually seems to make a difference to keep the resource open until the result stream stops. But when I try to apply that transformation to a less trivial piece of code like groupWithin I always get the Scope lookup failure error, no matter if use F.start or stream.concurrently or where I compile which stream.

Jasper-M commented 2 months ago

Actually no, it does seem to work on groupWithin as well! It's just that if your input stream uses metered (or probably any other thing that uses a zip-like operation) you run into the scope lookup errors.

See, I only added metered here to my previous example, and then it blows up.

I've actually run into this issue before: https://github.com/typelevel/fs2/issues/3081#issuecomment-1387620624

ValdemarGr commented 2 months ago

Resources and interruption is implemented via Scope which acts as a tree of resources.

concurrently compiles the supplied stream of your program: https://github.com/typelevel/fs2/blob/eea0c2542efc0d6e6587fd90f6079fc627104cee/core/shared/src/main/scala/fs2/Stream.scala#L554 which introduces a new Scope tree: https://github.com/typelevel/fs2/blob/eea0c2542efc0d6e6587fd90f6079fc627104cee/core/shared/src/main/scala/fs2/Compiler.scala#L156-L158

When the lookup function is evaluated, which traverses the scope tree, the link between the sub-stream's scopes and the previous parent scopes is broken. https://github.com/typelevel/fs2/blob/eea0c2542efc0d6e6587fd90f6079fc627104cee/core/shared/src/main/scala/fs2/internal/Scope.scala#L357

There might be an implementation of concurrently that shares the root scope, however I am unsure of the implications.

All in all, the rule is that if a stream origins as substream, it might have weak references to state in its origin stream though scopeIds, thus you may not compile that stream separately.

Jasper-M commented 2 months ago

Well, a dummy-proof and easy API for transferring resource ownership between streams would be great.