typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.38k stars 603 forks source link

`Pull#lease`, `Pull.bracketCase` and `Pull.extendScopeTo` are not cancellation safe. #3474

Open ValdemarGr opened 2 months ago

ValdemarGr commented 2 months ago

Pull.extendScopeTo here A safer variant could be:

object Pull {
  ...
  def leaseScope[F[_]: MonadThrow]: Pull[F, Nothing, Resource[F, Unit]] = 
    Pull.getScope[F].map(s => Resource.make(s.lease)(_.cancel.rethrow))
}
The code that can produce the error. ```scala import cats.effect.* import cats.effect.unsafe.implicits.global import fs2.* import scala.concurrent.duration.* import cats.implicits.* import cats.* object Main extends IOApp.Simple { def run = IO.deferred[Unit].flatMap { d => IO.ref(0).flatMap { ref => val s = Stream .resource(Resource.make(ref.update(_ + 1))(_ => ref.update(_ - 1))) .pull .uncons1 .flatMap { case None => Pull.done case Some((hd, tl)) => Pull.output1(hd) >> Pull.extendScopeTo(Stream.unit.covary[IO]).flatMap { killLease => Pull.eval(d.complete(()).void >> IO.never[Unit]) >> Pull.eval(killLease.compile.drain) >> tl.pull.echo } } .streamNoScope IO.race(s.compile.drain, d.get) >> ref.get.flatMap(IO.println(_)) } } } ```

Pull.bracketCase can be cancelled after the acquire and during use. I don't know what uses Pull.bracketCase has, other than Pull#lease, but I think it should at least be documented that it shouldn't be used for resources, since release is not guaranteed.

Pull#lease is subject to the issues of bracketCase and can cause resources on the scope to never be released. I think it could be implemented in a safe way using the new leaseScope introduced above.

sealed abstract class Pull[...] {
  ...
  // binary incomat with `lease`
  def leaseSafe(implicit F: MonadCancel[F, _]): Pull[...] =
    Pull.leaseScope.flatMap(r => Stream.resourceWeak(r).pull.echo >> this)
}

Will make the PR, just wanted a discussion first.