typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.37k stars 601 forks source link

zip + uncons + concurrency = scope lookup failure #3478

Open Jasper-M opened 1 month ago

Jasper-M commented 1 month ago

Quoting @armanbilge:

The problem seems to be running the tail of a zipped Stream .concurrently.

I ran into this issue before here: https://github.com/typelevel/fs2/issues/3081#issuecomment-1398495481

IIUC Arman worked around that problem for hold1 by moving the .pull.uncons into the concurrent process. However the core issue still remains: if you do stream.pull.uncons and then concurrently process the tail, things will blow up if stream happens to contain any kind of zipping. Possibly other operators that are implemented with stepLeg have the same issue?

I ran into this again while trying to use Pull.extendScopeTo for extending resource lifetimes across async boundaries. That actually seems to work, but requires stream.pull.peek or any other unconsing variation, which means it's incompatible with zipped streams:

input1.pull.peek.flatMap{
  case Some((_, stream)) =>
    Pull.extendScopeTo(asyncBufferThing(stream)).flatMap(s => Pull.output1(s))
  case None =>
    Pull.done
}
.stream
.flatten

https://scastie.scala-lang.org/56CQGK8uTzecITHSwmO0AQ

armanbilge commented 1 month ago

possible dupe of https://github.com/typelevel/fs2/issues/3123? In any case, I liked the idea I had there

when compiling the background stream in concurrently, instead of giving it a new root scope it should derive its scope from the foreground stream.

but I couldn't quite make that work.

Jasper-M commented 1 month ago

Oh sorry, I didn't see that other issue. You can run into the issue via stream.compile.drain.start as well, which is what groupWithin was doing. Though if you could at least avoid it by staying in Stream, that would already be a big improvement.

Jasper-M commented 1 month ago

And to underscore how little I understand of the scope system, if you replace Pull.extendScopeTo(asyncBufferThing(stream)).flatMap(s => Pull.output1(s)) with just Pull.output1(asyncBufferThing(stream)) it still seems to "extend" the scope to the output stream... Though extendScopeTo still seems to be required in more complicated cases.

ValdemarGr commented 1 month ago

When unconsing, the tail of the stream is (effectively) prefixed with an eval effect that closes the previous scope.

Here is an example: https://scastie.scala-lang.org/k8hcN1noTzCEA7VQzWtLcQ

When you do Stream#evalMap and Stream#flatMap a stack of transformations are composed, once a Pull.output is performed, the outputs are pushed through this stack of transformations and finally through whatever .compile operation chose such as drain. The Scope reasoning lies in the tail.

I've experimented a bit with a non-cps based Pull; I think the composition is a bit more explicit here.

What does scope lookup mean?

In fs2 scope's are referenced by scopeId when doing things like StepLeg or closing a scope (see the CloseScope node in Pull). If we were to omit scopeId from a hypothetical fs2 implementation and reference scopes directly by reference (Scope[F]) then we'd either open a bunch of soundness holes or make the API unwieldy.

The F in Scope is not necessarily the same F as in Pull, remember, Pull is covariant in F and the Translate node allows F ~> G, so the real effect type G may be either a supertype (G[x] >: F[x]) or an embedding (hopefully) F ~> G.

For instance, for Stream[Id, A] the actual effect used to evaluate the stream (G) is SyncIO, where Id ~> SyncIO (trivially).

Say we'd like to compile a unconsed tail without getting scope lookup failures, and our plan of attack was swapping out the scopeIds with Scope references. But now a problem arises; Pull is defined in F, but Scope is defined in G. With the current structure there is no way of embedding Scope[G] into Pull[F, O, R].

Say this tail Pull defined in F was instead compiled in H where H and G both do not occur in any partial orderings, or more specifically, H[x] !<: G[x] and G[x] !<: H[x].

// A diamond structure of effect structures
trait Effect1[A]
trait Effect2[A]
trait SubEffect[A] extends Effect1[A] with Effect2[A]

val myResource: Resource[SubEffect, Int] = ???

Stream
  .resource(myResource)
  .repeatN(3)
  .pull
  .uncons1
  .flatMap{
    case None => Pull.done
    case Some((hd, tl)) =>
      // notice that tl is now evaluated in Effect1, but the origin stream still remains in SubEffect
      tl.covary[Effect1].compile.drain
  }
  .stream
  // the  stream will be compiled in Effect2 which does not share any partial ordering with Effect1
  .covary[Effect2]
  .compile.drain

In my experiment I have explicitly encoded this issue here. In fs2, instead an exception is (justifiably) thrown when Pull's Scope cannot find the scopeId in the tree.

By taking the liberty to introduce G ~> F, there is a composition of G ~> F and then F ~> H such that G ~> H, although, such composition is almost certainly a lossy one and may introduce other issues related to covariance. More than what already may exist for translation.

So what if we just make sure to only open scopes in F? What if we never reason with the G and the relation F ~> G? In an earlier draft of my experiment I tried something like that here. This implies that any translation F ~> F2 must also guarantee that F2 has all the required tools to handle scopes (Compiler.Target). I have not explored this solution as elaborately yet and I am unsure how well it works with the current model for interruption, nevertheless I think it is an interesting avenue to explore since it allows invariant operators (GetScope) to be public.

It may seem strange that you are able to control future consumers, so to speak, but that's exactly the ability that makes Stream more powerful than Resource, for example it's what makes concurrently capable of surfacing background errors to the foreground, whereas Resource.background can only do the reverse.

I spent a while trying to grok the whys in fs2 and want to contribute to making a sound and expressive streaming model. So any discussion is much appreciated!

Disregarding soundness issues of moving scopes between effects, @armanbilge, what issues did you hit when passing the active scope to the background stream in concurrently https://github.com/typelevel/fs2/pull/3112#issuecomment-1399139450? In particular, this commit https://github.com/typelevel/fs2/commit/779a1709fd74db8856cf5c296f81e9eceb0a5367