typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.36k stars 601 forks source link

Unreliable stream interruption #2330

Open njwilson23 opened 3 years ago

njwilson23 commented 3 years ago

When a stream is interrupted, I expect based on these docs that the interruption is final and can only be handled with Stream.bracket (or something built on Stream.bracket).

So, I would not expect the following code, which attempts to restart a stream indefinitely, to work when the provided stream is interrupted:

def resume[F[_] : Concurrent : RaiseThrowable, A, B](mk: A => Stream[F, B], checkpoint: B => A)(start: A): Stream[F, B] = {
  def go(s: Stream[F, Either[Throwable, B]], watermark: A): Pull[F, B, Unit] = s.pull.uncons1.flatMap {
    case Some((Right(b), rest)) => Pull.output1(b) >> go(rest, checkpoint(b))
    case Some((Left(_), _)) => go(mk(watermark).attempt, watermark)
    case None => go(mk(watermark).attempt, watermark)
  }

  go(mk(start).attempt, start).stream
}

However, I find that it sometimes works, and sometimes doesn't. In the following test:

// How many times should we interrupt the stream?
val Interrupts: Int = 1

// Interrupt the stream after five items, up to a max number of times
def interrupter[A](deferred: Deferred[IO, Unit], interruptCount: Ref[IO, Int]): Pipe[IO, A, A] = {
  input: Stream[IO, A] =>
    input.zipWithIndex
      .evalTap {
        case (_, 5) => interruptCount.getAndUpdate(_ + 1).flatMap { i =>
          if (i < Interrupts) {
            deferred.complete(())
          } else IO.unit
        }
        case _ => IO.unit
      }
      .map(_._1)
      .interruptWhen(deferred.get.attempt)
}

val stream: Int => Stream[IO, Int] = Stream.iterate(_)(_ + 1)

val assertion = for {
  interruptCount <- Ref.of[IO, Int](0)
  _ <- resume[Int, Int](
    start => Stream.eval(Deferred[IO, Unit]).flatMap(d => stream(start).through(interrupter(d, interruptCount))),
    _ + 1
  )(0)
    .take(1000)
    .compile
    .toList
    .map(lst => assert(lst == List.range(0, 1000))
} yield ()

if I only interrupt the stream a small number of times (e.g. once), most likely the restart function works and I get the 1000 elements from .take. But if I allow many interruptions (e.g. 10), typically the stream I get is truncated.

This is very surprising! Is this nondeterminism a bug?

diesalbla commented 3 years ago

@njwilson23 Good afternoon, Nat. Thanks for reporting this bug. We will try to look at it as soon as we can.

In the meantime, to help us with testing, could you submit this as an executable IOApp, in a PR? Also, could you describe on which version of fs2 were you running this example? If you could run it in the latests releases of 2.x, as well as the latest milestone of 3.x, that would help us get into it.

njwilson23 commented 3 years ago

Sure, PR opened. I did my testing on 2.5.0, although I've been tracking similar strangeness since at least 2.2.x. I'll try to get it working on 3.x and report back.

njwilson23 commented 3 years ago

I verified that it exhibits the same surprising behaviour with

scalaVersion := "2.13.5"

libraryDependencies += "org.typelevel" %% "cats-core" % "2.4.2"
libraryDependencies += "org.typelevel" %% "cats-effect" % "3.0.0-RC2"
libraryDependencies += "co.fs2" %% "fs2-core" % "3.0.0-M9"
diesalbla commented 3 years ago

I have just been running the executable, in the PR you opened, on top of the current main. It seems that the issue has been fixed in the four weeks since that 3.0.0-M9 release.

njwilson23 commented 3 years ago

That's interesting and surprising! However, I tried changing the numbers slightly, requiring 100 interruptions (up from 10), and consuming 10,000 values (up from 1,000). That reproduces the flaky behaviour observed with previous versions, so it doesn't seem like it's fixed in a consistent manner.

I'm also curious, what do we expect to happen? From a (superficial) reading of the code, I expected resume to work with streams experiencing interruption because only the current scope is interrupted. However, based on the documentation I linked in the OP, I would expect that it would not, since it doesn't use bracket or onFinalize. (On main tip commit e4872e68ad1477a88e39ae6f608737a4a24085a3 as well as others I've tried, it sometimes works and sometimes doesn't.)