typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.37k stars 602 forks source link

Fast, Unboxed Operator Fusion Without Compromising Generality #340

Closed djspiewak closed 9 years ago

djspiewak commented 9 years ago

I would appreciate any and all feedback on the following proposal, but I'm particularly in need of feedback from @pchiusano. It is, after all, his library. :-) If @viktorklang and/or @rkuhn has any time to offer any feedback or experiences with operator fusion (what works, what doesn't, and what isn't worth the effort) I would also be very much indebted.

Motivation

Right now, map is awful. It not only generates a lot of intermediate state that is subsequently discarded, but it aggressively deoptimizes packed chunks into singleton emits. Considering that packed chunks are the only way to get serious performance out of Process, this is a very significant problem! Furthermore, Process does not provide any mechanism for preserving unboxed data structures and operations through a full pipeline. While this would be a somewhat unusual feature for a functional streaming library, I do believe that it is possible without breaking type signatures or performing any trickery which is visible outside of the Process implementation.

One optimization technique that is conceptually within our grasp and which addresses both of the above together with several other issues (such as wasted passes in long heterogeneous pipelines) is operator fusion. The idea is relatively simple:

input map f filter p take 10 map g

The idea is that all of the above would get squished together into a single pass over every Emit within input, and furthermore only the first ten results from f such that p is satisfied would ever be evaluated, and g would only run exactly ten times. The "fusion" part of the name comes from obvious optimizations that you can perform even above and beyond the runtime strategy:

input map f map g

Clearly this is equivalent to the (much faster!):

input map (g compose f)

We're compiling a stream pipeline description down to a secondary program description. We should be able to make the above transformation, in addition to others like it.

Implementation Sketch

Code speaks louder than words:

// what follows is a *draft* of my thoughts!

import annotation.tailrec
import reflect.ClassTag
import util.control.Breaks._

case class Emit[O](data: Emit.Data[O]) extends Process[Nothing, O]

object Emit {

  def resolve[O](data: Data[O]): Chunk[O] = data match {
    case data: Chunk[O] => data

    case data => {

      case class TraverseResults[I, O](chunk: Chunk[I], stack: Stages[I, O]) {

        def run: Chunk[O] = chunk match {
          case vc: VectorChunk[i] => {
            val VectorChunk(vec) = vc

            val dest = new Array[AnyRef](vec.length)
            var destI = 0

            vec foreach { v =>
              breakable {
                dest(destI) = stack(v).asInstanceOf[AnyRef]

                destI += 1
              }
            }

            val arr2 = new Array[AnyRef](destI)
            System.arraycopy(dest, 0, arr2, 0, destI)
            VectorChunk(arr2.toVector.asInstanceOf[Vector[O]])
          }

          case ArrayChunk(arr, _tag) => ???
        }
      }

      // @tailrec     // somehow this doesn't work??
      def traverse[E](data: Data[E], acc: Stages[E, O]): TraverseResults[I, O] forSome { type I } = data match {
        case c: Chunk[E] => TraverseResults(c, acc)

        case s @ Mapped(data2, _) => traverse(data2, SCons(s, acc))
        case s @ Filtered(data2, _) => traverse(data2, SCons(s, acc))
      }

      traverse(data, SNil()).run
    }
  }

  sealed trait Stages[I, O] {
    def apply(in: I): O
  }

  final case class SCons[I, E, O](head: Stage[I, E], tail: Stages[E, O]) extends Stages[I, O] {
    def apply(in: I): O = tail(head(in))
  }

  final case class SNil[I]() extends Stages[I, I] {
    def apply(in: I): I = in
  }

  sealed trait Data[O]

  sealed trait Chunk[O] extends Data[O]

  sealed trait Stage[I, O] extends Data[O] {
    // TODO error management
    def apply(in: I): O
  }

  final case class VectorChunk[O](v: Vector[O]) extends Chunk[O]
  final case class ArrayChunk[O](a: Array[O]) extends Chunk[O]

  final case class Mapped[I, O](data: Data[I], f: I => O) extends Stage[I, O] {
    def apply(in: I): O = f(in)
  }

  final case class Filtered[O](data: Data[O], p: O => Boolean) extends Stage[O, O] {      // o_O
    def apply(in: O): O = if (p(in)) in else break
  }
}

Let's unpack the above…

You'll notice in the above code snippet that I noted a minor problem with exceptions thrown by map and filter functions. Technically, such exceptions need to "split" the Emit, such that an appended Halt is interleaved. I believe this is very possible, just messy, and so I didn't implement it. Exercise for the pull requester (which will probably be me anyway).

Compilation

So where do we inject all of this magic in the compilation process and how disruptive is it going to be? As it turns out, we already have a really wonderful abstraction behind which we can efficiently hide the Emit.Data compilation and all of its gritty details, all without having any effect whatsoever on the rest of the codebase. In other words, implementing the above will not involve changing stepAsync, wye, tee, flatMap or really much of anything!

The abstraction is step. This function is absolutely brilliant, and it allows us to encapsulate all of this weirdness without ever leaking into the rest of the library. Basically, the idea is that step promises a Seq[O]. Right now, that is wrapped up in an Emit. I propose a very small change to the type signature, which allows step to produce an Emit.Chunk rather than an Emit. Everything works out exactly the same at the call sites (with some caveats for specialization), and step completely hides the details of compiling and efficiently evaluating the operator AST.

What I propose is that step perform the compilation and optimization of an Emit.Data AST when it hits the relevant Emit node. Once the compilation and optimization has been performed, the optimized Stages result will be saved and mapped over the Cont produced by that particular step iteration. Any Emit nodes in the continuation which have the same pipeline of transformations will be transparently swapped in for the relevant Stages, simply pointing at a different terminal Emit.Chunk. Obviously the equality testing is a little dicy here, but I believe we can do well enough to at least be useful. I would propose relying on pointer equality for functions where relevant (e.g. inside of Mapped). Furthermore, I would propose that we bail out of the whole recursive continuation transformation as soon as we hit an Emit node which doesn't match, since still-later emits are highly unlikely to match once we get outside a contiguous region of matches. This is probably a minor optimization though.

Anyway, so step performs the compilation, maps that compiled result into the Emit nodes in the continuation and then runs the compiled results on the current terminal Emit.Chunk to produce an evaluated Emit.Chunk, which is returned to the caller. Everything else in the ENTIRE LIBRARY proceeds according to the normal rules and with its existing implementation.

Specialization

Ah! This is where things get really insane. You'll notice I have an ArrayChunk special case in the example above. This is more of a demonstration to show that we can have different chunk types. What I really want to do is have a primitive chunk type for every primitive array! We would also need one for object arrays as well, which we could wrap within Array[AnyRef] to avoid having to mess around with class tags. Furthermore, I want to identify when functions passed to map and filter (and the like) are specialized on primitive input and/or output types, allowing us to simply cast the function to the appropriate specialized function type and apply directly to the array within a tight while loop.

Take a moment to gloss over the whole "cast" and "detect" and "AnyRef" bits in the above and zero in on what we can feasibly accomplish here: fused, unboxed operations applied in a single pass to primitive arrays (where relevant). This is incredibly exciting, at least to me. It means that it would suddenly become possible to write extremely high performance code operating on primitive types, all within the same compositional framework that gives us clean high-level invariants and effect control. Imagine scientific computing done using scalaz-stream because it provides potentially better performance than the hand-written equivalent while still granting the same set of extremely elegant and uniform combinators that we know and love. Imagine databases and high-volume analytics implemented directly on top of Process with all of the raw data manipulation JITted down to primitive CPU instructions and registers. That's the dream, and it's a dream that I'm 100% certain we can make a reality without compromising even in the slightest on our type signatures, purity or compositional invariants.

What we do need to compromise on is some of the deep dark internals of a few of these functions, most notably step and map. To be clear, map will continue to look like this from the outside:

def map[B](f: O => B): Process[F, B]

In other words, I'm not in any way proposing that we turn map into something that isn't map.

What I am proposing is that we allow map to peel back the veil of parametricity just a bit in order to kick off all of this wonderful madness. Here's how it would work:

def fooI(x: Int): Int = x + 42
// > fooI: (x: Int)Int
def fooB(x: Int): Boolean = true
// > fooB: (x: Int)Boolean

val x: Nothing => Any = fooI _
// > x: Nothing => Any = <function1>

val II = (fooI _).getClass.getSuperclass
// > warning: there was one feature warning; re-run with -feature for details
// > II: Class[?0] forSome { type ?0 >: ?0; type ?0 <: Int => Int } = class scala.runtime.AbstractFunction1$mcII$sp
val IB = (fooB _).getClass.getSuperclass
// > warning: there was one feature warning; re-run with -feature for details
// > IB: Class[?0] forSome { type ?0 >: ?0; type ?0 <: Int => Boolean } = class scala.runtime.AbstractFunction1$mcZI$sp

II isAssignableFrom x.getClass
// > res0: Boolean = true
IB isAssignableFrom x.getClass
// > res1: Boolean = false

The above is a proof of concept. The proof is at the very bottom. Note that I'm performing a test which returns true for a generic function (e.g. O => B!) that is secretly a fully primitive-specialized function from Int to Int. I then perform a similar test and show that the function is not somehow also specialized on Int to Boolean (which is of course, impossible). The point is that one of these tests is returning true and the other is returning false.

Using this technique, I can test for specialization on input functions once, at the call site, and then cast things into the secretly-actually-specialized "true versions" for the rest of the pipeline. I can even use this technique to determine unambiguously that I must be operating on an unboxed array (since the compiler would not have allowed a map invocation to compile where the function takes an Int, the Emit contains an Array and that array is not itself specialized on Array[Int]).

From that point on, we have full type information (by exploding the number of cases in the Emit.Data algebra, probably using @specialized). When it comes time to actually evaluate the algebra, we know at compile time that we're dealing with an input of (for example) Array[Int] and a mapping function of type Int => Double, which is information we can use to preserve the unboxed nature of the values through the entire pipeline. Instant performance, just add water!

In case you're wondering, the getClass, getSuperclass and isAssignableFrom checks are remarkably efficient. They aren't as fast as HotSpot's own intrinsics for the same operations (naturally), but they aren't as crazy as something like getMethod().invoke(), which is of course absurdly awful. We're also performing these checks only once for a given process compilation, so it's not something that's happening in anything close to a hot path.

Objections

I can see three very serious objections to what I have above (arguably four):

I think these are all very, very fair objections. I just think that the benefits outweigh the aesthetic concerns. Think about it. Fast, unboxed scientific computing built on top of scalaz-stream. #goosebumps

As I said, the person I really, really need to hear from on this issue before I put in a ton (more) work is @pchiusano. If scalaz-stream were my framework, I'd already be diving down this rabbit hole, but it's not my framework. Flex your veto muscle, Paul!

Thoughts welcome from any and all sources.

runarorama commented 9 years ago

Serious question: Why is Emit hardcoded to take a sequence? What is wrong with "chunking" being Process[F, Seq[A]]? Then map stops being awful because you use map . map instead.

djspiewak commented 9 years ago

@runarorama I think that's an entirely possible way to achieve things, though it suffers from the same API annoyances as monad composition without monad transformers (it is in fact, precisely analogous).

As an aside, it is somewhat difficult for users to realize global-optimization benefits like operator fusion (e.g. fusing map, filter, take and then map again) when their chunk datatype is an eager sequence. We can do these things internally. Whether we should is very much up for debate, but we can and the results should be fairly dramatic.

runarorama commented 9 years ago

@djspiewak Well, the API can be made richer for convenience, and it's pretty easy to reason about.

Can your proposed solution guarantee e.g. that x flatMap (emit compose f) is precisely the same as x map f?

djspiewak commented 9 years ago

@runarorama Unless you take a stopwatch to the resulting Task or crack open a profiler, yes, it would be the same. I wouldn't even consider a solution which would produce different output results or even different observable effects. It is partially for this reason that we need to be very careful to get exceptions right during fusion.

djspiewak commented 9 years ago

@runarorama Misclick, I hope. :-)

runarorama commented 9 years ago

Yeah, wrong window :)

On Sat, Mar 21, 2015 at 1:02 AM Daniel Spiewak notifications@github.com wrote:

@runarorama https://github.com/runarorama Misclick, I hope. :-)

— Reply to this email directly or view it on GitHub https://github.com/scalaz/scalaz-stream/issues/340#issuecomment-84257244 .

ghost commented 9 years ago

:+1: I'm not a huge fan of the reflection, but I guess I accept the rationale. Too bad, though: in an HTML5/websockets world, there are good reasons to wish we could retain Scala.js support.

More broadly, though, I have easily anticipatable use-cases for this today, so I'd like to encourage/help however I can.

VladUreche commented 9 years ago

@djspiewak, if I understand the proposal correctly, you're avoiding intermediate stream generation and parsing by batching up operations. But IIRC fusion entails actually merging the operations together, which enables many more optimizations. Shouldn't the current solution be called deforestation instead? Or am I missing something?

djspiewak commented 9 years ago

@VladUreche It's not just deforestation. Fusion is proposed here, I'm just not actually doing it in any of the examples. I'm actually somewhat skeptical that fusion alone is going to yield that many benefits given our algebra. This is part of where I wanted to hear some experiential thoughts from the Akka guys.

@psnively I haven't looked at Scala.js too closely, but maybe there's some way of branching depending on which compiler backend we're in, allowing us to avoid the reflection (at the cost of losing the specialization) on that platform specifically.

VladUreche commented 9 years ago

Thanks @djspiewak! Looking forward to better understanding the proposal when you do a PR :wink:

Regarding specialization, the approach you showed reminds me of optimistic re-specialization, explained by @tixxit here and here. Shamelessly promoting my work, if you were using miniboxing, a cleaner approach would be to use the reflection feature to peek at the specialized type using reifiedType[O] and reifiedType[B].

rkuhn commented 9 years ago

@djspiewak We have not yet looked all that deeply into the performance implications of either kind of optimization, the only obvious fact is that reducing the number of asynchronous boundaries is beneficial for small computational steps—but who would object to that?

djspiewak commented 9 years ago

@VladUreche Maybe at some point down the line. :-) I definitely can't change the signature of map though, even to add miniboxing, so I'm somewhat limited in that respect.

@rkuhn Interesting. Remember that scalaz-stream doesn't actually pipeline deterministic operations in the way that Akka's flows do. So, we don't have asynchronous boundaries in a single linear stream regardless of how many un-fused operations we have. I guess that makes fusion a lot less compelling for us, but it's sort of an open question how much we're going to lose.

rkuhn commented 9 years ago

@djspiewak Your notion of fusion concerns whether you perform a transformation column-wise or row-wise, which will probably depend on the use-case (since it relates to data vs. instruction cache utilization and such effects)—basically “big data vs. big computation”. We’ll think about that as well, but only once the more obvious things have been cleared away ;-) In that sense I’d call this a micro-optimization.

djspiewak commented 9 years ago

@rkuhn It will eliminate (possibly more than) one megamorphic call in a chain within a tight loop. So that's definitely worth something.

rkuhn commented 9 years ago

@djspiewak The number of megamorphic call sites that are fundamentally required should be the same in either case—discounting all the vagaries of actual implementation.

djspiewak commented 9 years ago

@rkuhn Oh yeah, they're just hidden in compose. Actually since compose is unspecialized (I think?), it might even be faster not to fuse things. Getting everything into the same tight loop is probably a much more impactful optimization.

rkuhn commented 9 years ago

… unless your processing steps do factorization or streaming digest calculation, where any sort of invocation cost is irrelevant and cache locality more important.

viktorklang commented 9 years ago

I'd think there are real implications of inlining into a tight loop when it comes to loop unrolling etc.

djspiewak commented 9 years ago

@pchiusano Any thoughts on this? Obviously this touches on some core design features (and tradeoffs!) and I really don't want to proceed unless I know that it meshes with your vision for the library.

pchiusano commented 9 years ago

@djspiewak I guess my reaction is... meh. :) I can't really see baking some ad hoc fusion rules into scalaz-stream. If people want to do that sort of thing, they could do so on their own and deal with explicitly chunked streams. Having the 'available1/L/R' functions would be useful for this.

One thing I would be open to is changing from using Seq to represent chunks to something with well-defined performance. But perhaps still keep it as an interface, so people can go nuts implementing their own chunk types if they want? Honestly I haven't thought enough about it.

At the moment, I'd rather focus on some of the core issues:

  1. Fixing finalization issues (we need a new primitive, onComplete is insufficient as we've determined)
  2. Investigating making effectful channels the basis for the library

Then maybe after that we could consider some hacks in the name of (serious) performance improvements.

djspiewak commented 9 years ago

@pchiusano

I can't really see baking some ad hoc fusion rules into scalaz-stream. If people want to do that sort of thing, they could do so on their own and deal with explicitly chunked streams. Having the 'available1/L/R' functions would be useful for this.

Explicitly chunked streams make the available combinators irrelevant, actually. available is interesting with the implicit chunking that we provide.

It would be impractical almost to the point of impossibility for users to provide their own respecialization of the form that I propose in the OP without losing performance. One of the absolutely critical elements of my proposal is implementing the resolution of the respecialization in step. Users do not have the ability to meaningfully redefine this function, nor do they have the ability to (practically) inject the appropriate mappings at all of the points where step is invoked.

One thing I would be open to is changing from using Seq to represent chunks to something with well-defined performance. But perhaps still keep it as an interface, so people can go nuts implementing their own chunk types if they want?

I spent some time working on this (in fact, the OP comes out of many of those thoughts). We basically need almost the full generality of Seq, or something remarkably close to it. It is possible to make things a bit more generic, but not much, and abstraction basically removes the benefit of moving away from Seq (predictable performance).

My preference would be to do something close to what I have in the OP, where we have a limited algebra of different emit chunks with specific types. Barring that, Vector it.

Then maybe after that we could consider some hacks in the name of (serious) performance improvements.

Full fusion (i.e. merging Mapped nodes) does seem to be a red herring, given that we don't incur asynchronous context shifts at simple operator boundaries and the call sites don't magically disappear. However, while I don't have benchmarks for the other suggestions (like respecialization), I am very confident that the gain will be enormous. It's also worth noting that partial fusion (i.e. not eliding AST nodes, but still evaluating in a single pass) is a significant performance gain, and I do have benchmarks showing this in similar applications in the past. It is also a performance gain that you specifically touched on in #237 with your map and take example.

map needs to be fixed. Even if all of the above is ignored, the fact that such an incredibly common operation actively deoptimizes performance is quite serious. I agree that the most elegant way to do this is the available combinators, but if we can't get those in soon, we should just special-case map temporarily. With that said, since we're looking at effectful transducers, we could just implement available within that framework (since it's green-field anyway) and we won't have to worry about specializing anything.

Fixing finalization issues (we need a new primitive, onComplete is insufficient as we've determined)

Proposal already on #333.

Investigating making effectful channels the basis for the library

+1!

It's worth reiterating that the effectful transducers I propose in #351 are done, with the only caveat being that type inference doesn't work unless I sprinkle some implicits on a few of the core type signatures (notably append).

pchlupacek commented 9 years ago

@pchiusano @djspiewak apart from implementing some sort of mechanics like suggested in #333 I think we have to also look on other two issues:

djspiewak commented 9 years ago

@pchlupacek

  • associativity of append's finalizers
  • making await(req)(a => rcv(a).onComplete(xx)) being the same as p.flatMap(a => f(a).onComplete(xx)), except perhaps for interrupted awaits, that will need to be resolved in await dependent code.

Recall my argument on #333: these goals are fundamentally unachievable because our claimed invariants for kill are directly in conflict with our claimed invariants for onComplete. Simple example:

def gen(n: Int) = emit(n) onComplete release(n) append gen(n + 1)

gen(0).kill         // finite or infinite?

In order for onComplete to be associative over append and invariant-preserving, or to have await(req)(a => rcv(a).onComplete(xx)) be the same as p.flatMap(a => f(a).onComplete(xx)) (these are the same goal!), then gen(0).kill would need to be defined as an infinite stream of finalizers. However, defining kill in such a way means that we would never have the ability to interrupt processes, and we would generate infinities in a lot of unexpected places (I don't suspect that users would expect gen(0).kill to be infinite).

The only solution is to give up on our claimed invariants for onComplete. We can't promise that every onComplete in your call graph is going to get invoked exactly once, because promising that is the same as reneging on our promise of controlled termination. Instead, we relax our onHalt invariants (which is to say, basically where we are today), discourage the use of onComplete (or remove it entirely) and give users another primitive to use for finalization. Specifically, a more limited finalization primitive that we can make strong guarantees about without compromising other invariants.

pchlupacek commented 9 years ago

@djspiewak I did some experiments with modified append (as suggested in #333) , with cause branch to be defined differently than Halt(cause). I manage to get your test for associativity working, and in fact, just got very few places where this is still non terminating. I think perhaps, even in current algebra signature this may be solvable, although it may not be completely easy implementation and definitely requires modification of at least, repeat, kill, suspend and probably some other few combinators as well.

I am not completely sure if that will really work at the end, but I think current await/flatMap difference is a flaw that we must solve, even when this will require algebra modification.

I think what we need to achieve is associativity of appends. I am not sure if I understand your example correctly, but do you want to have associativity between appends and interleaved onCompletes? I think that may not be solvable as you say, and I as well don't think that (example below) p1 is same as p2. But I think that p1 shall be same as p3, which is unfortunately not today.


val p1 = (emit(1) ++ emit(2)).onComplete(emit(3))
val p2 = (emit(1).onComplete(emit(2) ++ emit(3))
val p3 = emit(1) ++ emit(2).onComplete(emit(3))

I will post progress updates on this in #333

djspiewak commented 9 years ago

Maybe we'll be able to come back to this someday. Hopefully.

pchiusano commented 9 years ago

@djspiewak I think the new design gives you a lot of what you want here. map and many other operations now preserve chunking (and you can unbuffer beforehand if you want full laziness). And when transforming streams, you can obtain unboxed chunks and work with those if you like.

I didn't fuse map and/or filter but I think those would be straightforward extensions to the stream interpreter. However, I didn't want to complicate things and wasn't sure it was going to be much of a performance win anyway.