whatwg / streams

Streams Standard
https://streams.spec.whatwg.org/
Other
1.34k stars 159 forks source link

light weight transformations #461

Closed dominictarr closed 6 years ago

dominictarr commented 8 years ago

I hope it's not too late to make a suggestion,

The current design where a transform stream is a pair of writeable/readable streams is overkill for quite a few applications of transform streams.

Often, people use streams as (one hopes) very light weight transformations, for example, to parse an incoming file into lines, or maybe just to filter out items (there is a very strong correspondence to the array functions map, filter, etc)

These transformations simple transformations do not need their own back pressure - they simply need to proxy the back pressure further along the stream.

One simple way to achive this would be to think of a readable stream as a function that returns a promise when ever you call it, and those promises will be resolved in the order created.

A transform stream could instead be a function that takes a readable stream and returns another readable stream. Except when a transformed promise resolves it's value is transformed. This is great because you save a lot of queuing/dequeuing. The transform doesn't even need to be async, it can just return an the promise from upstream wrapped to return it's own result instead, but without creating a new promise.

You could flip this around the other way, and apply functions to a writeable stream - but that would mean a transform stream is a function that promises are passed to, and I feel a function that takes a readable stream is more natural.

For duplex streams, like a tcp client, a pair of streams is appropiate, because each side may have independent back pressure. but for simple cases, it just means now every little filter and map needs two queues one incoming one outcoming.

domenic commented 8 years ago

Hey, great to have you here!

In general the design of transform streams, and even writable streams, is still in flux and open to suggestions. (Readable streams, we're pretty happy with.) So feedback like this is definitely good to have.

The big struggle we've had so far is figuring out the balance between two positions:

For example, service worker APIs have leaned us toward the former for various technical reasons, which IMO as a fan of the Node style is disappointing.

My way of interpreting your feedback is that we'd want to preserve the three-concept ecosystem, but provide a performant and lightweight way to make synchronous transformations work. Here would be my strawman:

const transform = TransformStream.map(syncMappingFunction);

readable.pipeThrough(transform).pipeTo(writable);

Here:

In general I like the conceptual simplify of a transform always being a { writable, readable } pair, and always having the same API, instead of having one for duplex and one for simple mappings. That's why I suggest the above, which preserves that API, but should also give us the benefits of lightweight transforms when desired.

What do you think?

dominictarr commented 8 years ago

thanks!

reading the spec, I see pipeThrough but I don't understand what you mean by "not reifying" the stream.

I'm not sure whether you where implying TransformStream.map in the functional sense of "map", but there is definite need for a transform to be able to expand and contract the input.

Is the syncMap function passed a chunk? I'm talking about something that is passed the preceding readable.

I'd not particularily bothered by actual IO etc, being a "writable" stream, I do think it's simpler if they are {readable, reader} but I'm not gonna labour that. I'm also not against a transform stream that turns into a writable stream when you try to use it like that.

This is similar to the pipeFrom idea, but instead of explicitly using pipeFrom, you'd just use pipeThrough and it would detect if it would rather get the readable.

dominictarr commented 8 years ago

and yes, I do advocate the 4 concept model ;)

(although, if you have readable and reader, it's the 2^2 concept model - there is two concepts which are also combined 2 ways = 4 concepts, but two fundamental concepts)

tyoshino commented 8 years ago

FYI, in the context of https://github.com/whatwg/streams/pull/462, I'm trying to make TransformStream solid enough for production use. Just started: https://github.com/whatwg/streams/pull/462/commits/ae2e3f0ea1e75095904794cbd17f08f86dbfac25

I'll incorporate outcome of this thread to backpressure propagation in TransformStream.

domenic commented 8 years ago

reading the spec, I see pipeThrough but I don't understand what you mean by "not reifying" the stream.

I meant that pipeThrough would say "oh, this is a sync map transform stream; let me just run synchronous mapping as an unobservable optimization, instead of using the readable and writable ends and their queues." If you never call the ts.readable getter, no ReadableStream would necessarily even exist.

I'm not sure whether you where implying TransformStream.map in the functional sense of "map", but there is definite need for a transform to be able to expand and contract the input.

Yes, that's what I meant. If you want to expand and contract the output, you use the normal TransformStream constructor, as in this test

Is the syncMap function passed a chunk? I'm talking about something that is passed the preceding readable.

Right, that was what I was thinking.

A function that is passed a readable seems like something you can already do in JavaScript, with no need for additional work from the streams spec...

dominictarr commented 8 years ago

A function that is passed a readable seems like something you can already do in JavaScript, with no need for additional work from the streams spec...

Hmm, yes that is correct. given that there are several pipe methods, I'd imagine that people would be quite happy to instead use a pump style module, that would, say, figure out that it's a ReaderTransform, and handle it like that.

Although, embracing this idea does also mean you could delete TransformStream and be done already ;)

gwicke commented 8 years ago

I recently looked into simple stream transforms as well. After considering & implementing several options I settled on a simple pull solution based on the reader interface: makeTransform(readable or reader) returning a reader interface { read(): Promise<chunk>, cancel(): Promise }. Composition of such pull transforms is relatively easy, as is converting the exposed reader interface into an UnderlyingSource for consumption & buffering in a ReadableStream.

I'm not entirely convinced that the complexity of WritableStream and TransformStream is worth it. There are already several ways of achieving very similar things in the spec (ex: Reader vs. UnderlyingSource), and I fear that adding even more ways is only going to make it more confusing.

Edit: I added some microbenchmark results at https://github.com/gwicke/mixmaster. In this benchmark, piping chunks through the reference ReadableStream implementation is about twice as expensive as a basic Reader interface based transform.

ariutta commented 8 years ago

@dominictarr, are transducers relevant to what you're looking for? I searched the issues here for any discussion of transducers and was surprised not to find anything.

dominictarr commented 8 years ago

@ariutta yes very similar. you may also like https://github.com/pull-stream/pull-stream

ariutta commented 8 years ago

pull-stream looks cool!

Regarding transformations, one concern for web streams is that today's parser and converter ecosystem is geared for other streams/iterators, predominantly Node streams. If I'm writing a new parser, I'd love for it to be usable by everyone, regardless of their preference for stream/iterator implementation. But limited time on my part means I'm not going to write a version for every stream/iterator implementation.

That means I'll probably write my new parser for either 1) Node streams, because that's where most users are or 2) transducers, because then it's usable with native or Underscore reduce implementations, immutable-js, Node streams, RxJS, etc.

It would be great if the transformation option for web streams made it easy for me when I'm writing that new parser. Could the transformation option simply be a transduce operator?

dominictarr commented 8 years ago

@ariutta yes, you get the same sort of problem with many apis. Although streams are all bascially the same thing, there is still a lot of different ways to do them. So, what you want to build your parser to is the lowest common denominator, something that will be easy to wrap into any other interface. Node streams are not this. Node streams are very heavy. pull-streams are very light weight, but they target async - I'm not sure about rxjs or immutable-js. A parser is quite a simple case. so probably you can make something that can easily be wrapped to any of these interfaces.

Maybe just a thing that takes two functions (data, end) and can emit multiple items at each call and then end. data is called with each incoming data, and end is called once everything has finished. That maps to https://github.com/dominictarr/through https://github.com/rvagg/through2 https://github.com/rvagg/through2 https://github.com/pull-stream/pull-through

flatMap (i.e. map to an array, but then the array is expanded into the stream) is similar, but is limited because you may need to track a current state (say, an incomplete line or token). works great when the input is framed into "messages" though.

domenic commented 8 years ago

One thing I want to reassure people on, after seeing things like https://github.com/gwicke/mixmaster (cc @gwicke), is that we are very concerned about making transform streams performant. A lot of the locking infrastructure is in place specifically so that when you do rs.pipeThrough(transformStream), it can bypass a lot of the intermediate layers and just shuffle data from rs to transformStream.readable. The reference implementation is very early stage and does not reflect these kind of optimizations at all, so people shouldn't be taking it seriously as the ultimate destination.

Basically, imagine the syntax of the reference implementation, which is probably going to end up on something like

new TransformStream({
  transform(chunk, controller) {
    // use controller.enqueue()
    // return promise to succeed/fail asynchronously
  },
  flush(controller) {
    // use controller.enqueue() if you want
    // return promise to close/error asynchronously
  }
});

and then imagine the fastest possible implementation of getting data from point A to point B while applying the transform and flushing logic specified by those two functions. That is what we aim to make implementable in browsers (and polyfills).

ariutta commented 8 years ago

pull-stream and pull-through are just 9.8K bundled together and minified, which is far smaller than Node streams! It looks like a great solution for async. But as far as being the lowest common denominator, aren't transducers still lower, because they handle both async and non-async?

The incomplete token part is tricky. Most parsers need to deal with that, e.g., this one or this one. To handle this case, we'd need to use reduce followed by flatten instead of flatMap.

gwicke commented 8 years ago

@domenic: How would back pressure propagation work in this scheme? If enqueue() does not return a promise, would there be another interface to pace the transform?

Also, if it does not return a promise & concurrent calls to enqueue() are supported, buffering between stages of a pipeline becomes mandatory to ensure in-order processing. While the reference ReadableStream implementation is certainly not the last word on buffering performance, it seems very likely that such buffering is going to be more expensive than Promise.then().

gwicke commented 8 years ago

I wrote up some thoughts on a public pull sink interface to complement WritableStream at https://github.com/gwicke/mixmaster#musings-on-a-pushpull-mode-for-streams.

dominictarr commented 8 years ago

@ariutta pull-streams can be sync.

since your earlier post here, I've been thinking about what the ideal lowest common denomenator is. in the line parser (split) you need to return {remainder: '...', output: [...]}, and then you flatmap the output,

but pass the remainder back into the input the next time. You could use reduce for this, but then you have to pass back the whole {remainder, output} object, and it just appends to the output array. reduce works, but if you need to parse an enormous file, you actually only need a tiny bit of state. a stream that moves slowly, and where you want data out in real time is the same. With reducers, can you get data out before the stream has ended?

I've been thinking maybe there is a lower level, which reduce, and split are both special cases. something like, you can return a feedback state, and an output so far. if the parser returns the state, instead of encapsulating it then it's still functional, but also very lightweight.

other examples of operations which are more like split are unique (immediately emit the first time you see an item, but not again) or things that calculate something based on a sliding (etc) window

gwicke commented 8 years ago

@dominictarr, @ariutta: You can use flatMap / concatMap if you hide the remainder in transformer state, but then the transformer needs a flush / end handler as well. I do like the generality of concatMap, but went for the pragmatic solution of sticking the remainder handling in a match transform wrapper, where it can be applied to different matchers.

Another interesting aspect of flatMap / concatMap is the ability to return a batch of values at once. This got me thinking about a .readBatch() method in the reader interface, which would return at least a single-element array, but possibly additional entries that might be available / buffered without blocking in the transform.

ariutta commented 8 years ago

@dominictarr, @gwicke: one real use case is a streaming API for the superagent HTTP client. We'd like to make it play nicely with web streams, but the smallest I've been able to get the web streams polyfill down to was 47K minified. Since the browser version of superagent is currently just 13K, the web streams polyfill is too large to include.

@dominictarr, any suggestions for how your idea above could be implemented for the superagent streaming API? I like the idea of making the API the lowest common denominator and very lightweight. The options I've been considering for how to make the stream available:

dominictarr commented 8 years ago

@ariutta after this discussion I realized I needed that for https://github.com/dominictarr/map-filter-reduce (which is for database queries, and needs all these types of transforms) oh, you might also want to check out https://github.com/mafintosh/vintage-streams it's compatible with node-streams, but rewritten from scratch. looks shorter (although not as short as pull-streams, of course)

for map-filter-reduce, I definitely want something that is as close to a normal iteration function as possible. still thinking about this though

domenic commented 8 years ago

@domenic: How would back pressure propagation work in this scheme? If enqueue() does not return a promise, would there be another interface to pace the transform?

I'm not sure I quite understand the question. It would return a promise, so that's a pretty solid way to pace the transform. So rs would automatically get backpressure applied.

Also, if it does not return a promise & concurrent calls to enqueue() are supported, buffering between stages of a pipeline becomes mandatory to ensure in-order processing. While the reference ReadableStream implementation is certainly not the last word on buffering performance, it seems very likely that such buffering is going to be more expensive than Promise.then().

I'm not sure what you mean by "concurrent" here, since JS doesn't have concurrency except with web workers. Do you mean the transform enqueuing multiple chunks in response to one incoming chunk? Yes, that is definitely supported. But in rs.pipeThrough({ writable, readable }), readable already has a buffer we can use---one which is automatically drained if there are outstanding read requests (which is I think what you are referring to by talking about Promise.prototype.then()). So I guess the cost you are worried about is about putting an element in the array, then taking it out? I'm not really concerned about that, and it can be optimized away if necessary to just directly resolve the pending promise.


I noticed later people are talking about flatMap. In case it isn't clear, that's exactly the existing transform() interface, just using enqueue() instead of returning an array.


@ariutta

but the smallest I've been able to get the web streams polyfill down to was 47K minified.

Well, this is because of two factors. One, you're copying the reference implementation. A from-scratch polyfill would be a lot smaller, inlining functions and removing asserts and such. Two, you're using a transpiler, and those are well-known for massive bloat in output size. If you need ES5 or ES3 compatibility, you really need to write the code in ES5/3, instead of transpiling from ES6.

gwicke commented 8 years ago

@domenic: How would back pressure propagation work in this scheme? If enqueue() does not return a promise, would there be another interface to pace the transform?

I'm not sure I quite understand the question. It would return a promise, so that's a pretty solid way to pace the transform. So rs would automatically get backpressure applied.

Okay, that's good. As you are probably aware, this differs from ReadableStreamDefaultControllerEnqueue. Are you planning to make that return a promise as well? It would be somewhat unfortunate to have two almost identical interfaces with subtly different semantics. It also affects performance, as I'll explain below.

I'm not sure what you mean by "concurrent" here, since JS doesn't have concurrency except with web workers.

You seem to be thinking of parallelism. There is plenty of support for concurrency in a single node process, but no support for parallelism. See for example this explanation of the difference between the two.

Do you mean the transform enqueuing multiple chunks in response to one incoming chunk? Yes, that is definitely supported. But in rs.pipeThrough({ writable, readable }), readable already has a buffer we can use---one which is automatically drained if there are outstanding read requests (which is I think what you are referring to by talking about Promise.prototype.then()). So I guess the cost you are worried about is about putting an element in the array, then taking it out? I'm not really concerned about that, and it can be optimized away if necessary to just directly resolve the pending promise.

The point both @dominictarr and myself have made & experimentally explored is that threading a single Promise chain through several transforms is significantly cheaper than allocating a new Promise + resolve / reject closures per chunk and transform. Sync transforms just add another link to the promise chain they received from their Reader, and return this to their respective caller. Once you add mandatory buffering between transforms & return a Promise from the transform enqueue (but not from ReadableStreamDefaultControllerEnqueue?), you typically end up doing new Promise((resolve, reject) => { ... }) for each chunk & transform in a pipeline. This adds quite a few allocations.

Now, you could make a push strategy work with a reasonable number of allocations if you pass Promises all the way back up through the pipeline, and avoid buffering by only feeding in one chunk at a time. This would require a change in the spec to return a Promise from ReadableStreamDefaultControllerEnqueue. A piped ReadableStream would also need to synchronously call its downstream transform if no concurrent enqueues are pending (in the queue). For a fully synchronous transform pipeline, this would mean that the TransformStream would return the Promise instance returned by the last stage's ReadableStream.enqueue() call.

All this also assumes that transform in your code example is never called concurrently:

new TransformStream({
  transform(chunk, controller) {
    // use controller.enqueue()
    // return promise to succeed/fail asynchronously
    // **IMPORTANT: Transform is only called once 
    // previous return promise has been resolved / rejected.**
  },
  flush(controller) {
    // use controller.enqueue() if you want
    // return promise to close/error asynchronously
  }
});

This is the same no-concurrency guarantee that keeps unbuffered pull transforms simple.

If concurrent transform calls were allowed, then you would force async transforms to implement internal buffering, and sync transforms to immediately write out all their results to the next ReadableStream's buffer. Both cases would be worse than in a pull model. While the async case is probably obvious, the sync case might be less so. To illustrate, typical synchronous split transforms already return their results in an array. In a pull model, that array is then returned one by one (code example), and when it is exhausted the next input is pulled in & transformed. In a push model, the transform needs to synchronously write out all chunks to the downstream ReadableStream to escape the need to buffer incoming calls. This just transfers chunks from an existing buffer to another buffer, adding unnecessary overheads in the process.

domenic commented 8 years ago

Okay, that's good. As you are probably aware, this differs from ReadableStreamDefaultControllerEnqueue. Are you planning to make that return a promise as well? It would be somewhat unfortunate to have two almost identical interfaces with subtly different semantics.

No plans in that direction. The interfaces are purposefully meant to be very different; it's unfortunate that we've managed to confuse you into thinking they are almost identical :(.

You seem to be thinking of parallelism. There is plenty of support for concurrency in a single node process, but no support for parallelism.

I'm aware that a couple popular articles have attempted to redefine the terms in the way you seem to be using them. Other popular articles have done the opposite. I wasn't aware until now which interpretation you were using.

The point both @dominictarr and myself have made & experimentally explored is that threading a single Promise chain through several transforms is significantly cheaper than allocating a new Promise + resolve / reject closures per chunk and transform.

Right, that's why we wouldn't do those allocations. As I said, this can all be easily optimized away. To repeat, "imagine the fastest possible implementation of getting data from point A to point B while applying the transform and flushing logic specified by those two functions."

All this also assumes that transform in your code example is never called concurrently:

That assumption is true.

ariutta commented 8 years ago

"imagine the fastest possible implementation of getting data from point A to point B while applying the transform and flushing logic specified by those two functions."

This implies it would be possible to avoid intermediate collections, as in the second image below (from this article). Is that right?

transform-chained chained transformation

transform-transduced transduced transformation

domenic commented 8 years ago

I'm not sure what a collection is, or exactly what the different geometric shapes in the animation represent, but I think the answer to your question is probably "yes".

dominictarr commented 8 years ago

@ariutta I recently learnt about a variation on reduce called a "scanner" it is a reduce, except it returns the currently reduced value on every iteration. this could be combined with a map/filter to implement a parser that is all pure functions but also streaming.

ariutta commented 8 years ago

@dominictarr, would you suggest that functionality be built into WHATWG streams, or that it be part of another library, in the way that Highland.js is related to Node streams?

dominictarr commented 8 years ago

I think standards should focus on being low level, and solving the technical problem, and leave high level things, such as being user friendly, up to libraries.

It's easy to iterate to produce a better library, but nearly impossible to do that with a standard