whatwg / streams

Streams Standard
https://streams.spec.whatwg.org/
Other
1.35k stars 161 forks source link

Can we add buffering strategies via a transform stream? #24

Closed domenic closed 10 years ago

domenic commented 11 years ago

People have demonstrated how to do this with simpler always-async streams, e.g. in https://github.com/whatwg/streams/issues/13#issuecomment-27415443, but I am not sure how it would work with the current API.

In any case, it would be preferable to explain streams with buffering strategies (i.e. ReadableStream) in terms of BaseReadableStream + something else, instead of having ReadableStream overwrite parts of BaseReadableStream. Will take a bit of work, but should be doable.

domenic commented 10 years ago

It's time to find out of this is possible. It's starting to block a bunch of other stuff, regarding the evolutionary direction of the buffering strategies, base-class split, etc. I hope to sit down and have a real hard think about this sometime in the next couple days.

In the meantime, if anyone else wants to take a look, please keep me updated. We can use this issue as a scratchpad.

domenic commented 10 years ago

OK. I started down this path in a branch. But I have run into a roadblock which is making me mad; as far as I can tell, the current spec does not allow this possibility. Let me explain.

First off, what is this issue trying to accomplish? Our goal is to be able to have only BaseReadableStream, plus e.g. a ByteLengthHighWaterMarkTransformStream, such that you can do

var rs = getABaseReadableStreamWrappingAnUnderlyingSource();
var hwmTransform = new ByteLengthHighWaterMarkTransformStream(1024);
var output = rs.pipeThrough(hwmTransform);

and the following requirements are obeyed:

  1. If output is consumed faster than data is produced by the underlying source, then rs should never exert backpressure on the underlying source.
  2. If output is consumed faster than data is produced by the underlying source, then output should be able to vend data to that consumer just as fast as if the consumer were using rs directly.
  3. If output is consumed slowly, then rs will only exert backpressure on its underlying source once the underlying source has pushed through 1024 bytes of data.

We can even simplify this scenario, as I tried to do in the branch's scratch.js, by setting the HWM to Infinity. In that case the requirements are simply 1 and 2; backpressure should never be exerted.

Well, remember how backpressure signals are communicated to readable streams, and thus to underlying sources: the push return value. E.g.,

var rs = new BaseReadableStream({
  start(push) {
    source.ondata = chunk => {
      if (!push(chunk)) {
        source.exertBackpressure();
      }
    };
    // etc.
  }
});

The current definition of BRS's [[push]] always returns false, so this is clearly impossible. We seem to be stuck before we begin.

But wait! What if we make a small, reasonable tweak: it returns true when pushing into an empty buffer. So the first time anyone calls push, no backpressure. The second time, if nobody has taken away that data, then backpressure; but if someone has taken away that data, then no backpressure! OK, that sounds reasonable, even without the motivation of this issue.

The hope is that this tweak will give us the breathing room we need, so that we can write our infinite-high-water-mark transform stream to immediately suck out data the moment it appears, so that push will always return true, and never false.

This turns out to be impossible, though. Because consider a stream like this:

var rs = new BaseReadableStream({
  start(push) {
    console.log(push('a'));
    console.log(push('b'));
    // etc.
  }
});

i.e. a stream that synchronously pushes two buffer chunks in. (This is quite realistic, I believe: e.g. if the underlying source allocates buffers in a fixed size, but several of them are available at a given time.)

There's no "time" in between the first push and the second for us, the transform stream, to suck away the data empty the buffer! So the second push will always return false.

Why is there no "time"? Because once the buffer becomes empty, then our pipeTo algorithm notices the stream is in a "waiting" state, and then it says: source.wait().then(fillDest, abortDest);. So it waits to write to the destination stream (the input side of our transform) until the end of the microtask. Which of course happens after push('b'), which is literally the next line of code after push('a'). Stated another way, simply calling push('a') does not trigger any reaction from the transform stream being piped through: that is waiting to happen at the end of the microtask.

I believe Node streams get around this by using synchronous events, i.e. .on('readable', fillDest) instead of our asynchronous promise-based wait().then(fillDest).


This seems kind of bad. I need to think about what we can do here. Off the top of my head:

  1. Use synchronous events?? But then either we lose portability with platforms that don't have EventTarget, or we have to invent our own EventTarget/EventEmitter-like abstraction, which sucks. And we have the usual problem with these state-transition events, where people can "miss" them if they're not subscribed early enough.
  2. Hide the synchronous "events" internally, by reifying the relationship that gets set up between rs and hwmTransform? So pipeTo is no longer based on a simple algorithm involving the primitives, but instead actually sets up some pointer from rs to the pipe destination (i.e. to hwmTransform.input), so that when rs is pushed into, the piping logic can write to hwmTransform.input immediately. This is kind of sad, as I liked how pipeTo was just sugar over primitives.
  3. Crazy hacks? E.g., maybe if when pipeTo saw the source was in a "waiting" state, it would queue a microtask to check back? I could envision this potentially working such that all the work happens via recursively-queued microtasks, so only one actual event loop tick passes.
  4. More broad rethinking? E.g. use some of Gozala's channel/buffer separation techniques. (Gozala, please don't turn this into yet another issue advocating your repo; we have enough of those. This issue is for solving problems with the existing solution.)
  5. Just give up and bake the buffering strategies into the base classes? The base classes are kind of crippled right now, in that instances of them have no way of choosing the return value of their push method. So perhaps the separation was ill-conceived. This is along the lines of #81.

Rarrrgh.

domenic commented 10 years ago

Upon reflection solutions 2 and 5 seem most feasible, with 5 of course being easiest. I think I will start prototyping 2 tonight to see how it goes...

domenic commented 10 years ago

Oh, I forgot to mention: the virtue of 2 is that it seems very in-line with #97.

othiym23 commented 10 years ago

I'm in favor of 4, for the reasons that you list for 2 being kind of gross. Maybe I'm crazy, but it seems like there should be a solution that breaks I/O streaming down into a set of simple combinators that the high-level API functions compose. Put another way, there are primitive operations underlying all this, it's just whether they're visible or obvious underneath the current specification language that's at issue. I'll put some thought into how this might look and post here if I come up with anything that seems promising to me. I still feel like @Gozala's approach has promise here, but I agree that we're discussing that in enough other places for now that we don't need to do it here.

domenic commented 10 years ago

I'd love any ideas you come up with, certainly. I think I'm most interested in ones that preserve the current API's capabilities and ergonomics, at least for the consumers of the readable and writable stream objects. (Modifying how the creators deal with things seems fine, and might indeed be the ticket out of here.)

domenic commented 10 years ago

My work in #110 made me think of an idea for a re-thinking approach. It's not well baked yet so this might be rambly. But I want to get the thoughts down before I head to sleep.

In #110, I made a direct pipe connection be given "special access" to the readable stream's internals: whenever data gets pushed into the readable stream from the underlying source, the piped-to stream will immediately, synchronously be written to (assuming it's currently writable).

What's distasteful about this approach is that it gives piped-to streams special privileges that ordinary consumers of the stream do not get. That is, to consume data as fast as possible, you essentially have to create a writable stream and pipe to it; the usual read loops won't work. (The problem here, to reiterate, is that the usual read loops---the type pipeTo was formerly based on---use wait() to be notified of the readable stream's buffer becoming nonempty, so once the buffer is empty, then even if within the same event loop turn it becomes nonempty, the loop code will not know about this until the end of the turn.)

What if we made this less of a hack, and more of a base part of the API? That is, the most basic part of the API for readable streams is "piping to a writable stream." (Those words might not apply as well anymore as e.g. "subscribing with an observer," but let's keep using stream terminology for now.) By default, the readable stream is hooked up to a writable stream that buffers things internally (call it bufferWS), so pushing synchronously writes into bufferWS, which keeps it around. And hooking up to a non-default writable stream (call it dest) will move the contents of bufferWS's buffer into dest. And now once that hookup has been established, pushes from the underlying source will end up in dest.

Hmm, but that doesn't exactly work if the dest stream is slow to accept writes. We need to keep the bufferWS in the chain to deal with that. That still seems doable.

OK, let me step back and look at this again. Ignoring the buffering, the basic primitive is a base readable stream whose interface is more "push" than "pull", that "pushes" into a writable stream. The writable stream must have some way to communicate back when its data is acknowledged; if it is acknowledged slowly, the base readable stream applies backpressure to its underlying source. As-is, the base readable stream has no buffer at all. I guess if it is not hooked up to any writable stream, then the data gets dropped on the floor. Oh no!! Badness. But solvable.

Now, how do we get the same API ergonomics as our current solution? I.e., how do we create readable streams which add buffering on top of our no-buffering base readable streams? Well, upon creation, readable streams (not base readable streams) are piped to a buffering transform stream (still an input/output writable/readable stream pair). Its input end will acknowledge writes from the original readable stream immediately, except if nobody has read from its output end quickly enough. Again we have the default-strategy vs. high-water-mark strategy idea; the default transform would e.g. acknowledge the write as long as its buffer is empty. But what's going on with the output end? Hmm. Let's see. Calling rs.pipeTo(ws) will do two things: dump all buffered data into ws as fast as ws will accept it; and once that completes, hook up the output end of the internal transform directly to ws. If there is no pipe hookup, then you can call rs.read(), which will draw from the buffer directly. And maybe you can use wait() etc. just as easily.

So I think the BRS API ends up being mostly pipeTo(ws), and pushing into it drops on the floor. But the RS API ends up being pipeTo(ws) plus read() and wait() to access the internal buffer if you want; pushing into it puts things in that internal buffer.


What does this gain us? Maybe not enough over approach 5. It makes BRS even more useless, instead moving all buffering into RS. This is nice though because it "explains" the buffering in terms of existing mechanisms, namely transform streams. And I guess it allows people to build more complicated buffering strategies in a natural way, using the full power of transform streams and a fairly simple protocol, instead of us having to account for it manually and add fields to the buffering strategy objects (like needsMoreData and count). So maybe that's enough.

It also seems to be converging with some Rx-style thinking, of observables and observers and push being primary. (Interestingly, when I ask Rx people which is primary, "hot" or "cold" observables, I get contradictory answers.) That's not necessarily a virtue in and of itself, but it does mean that there is relevant stuff to learn.

I guess I need to spend some time prototyping this approach. It seems like a big enough departure that it's going to run into its own set of gotchas and failures, which makes me wary of trying it. But it seems like a net positive in that it doesn't reduce expressiveness or usefulness of the ReadableStream API (just BRS), and opens up a path toward explaining buffering strategies instead of baking them in. That is, unlike other potential big departures, this one doesn't shake up the API or ergonomics or applicability to our use cases.

tyoshino commented 10 years ago

The writable stream must have some way to communicate back when its data is acknowledged; if it is acknowledged slowly, the base readable stream applies backpressure to its underlying source.

Something different than state of dest?

And I guess it allows people to build more complicated buffering strategies in a natural way, using the full power of transform streams and a fairly simple protocol, instead of us having to account for it manually and add fields to the buffering strategy objects (like needsMoreData and count).

I don't come up with any concrete example of this complicated buffering strategies. I think the strategy interface should be modified to be able to look at all contents in the buffer and what I stated here https://github.com/whatwg/streams/issues/76#issuecomment-38900199, but then there shouldn't be much difference between strategy approach and transform stream approach.

tyoshino commented 10 years ago

Base stream interfaces should designed to work as a buffer

As:

, we cannot get rid of a buffer from the readable side interface. So, to me, it's natural to me that we implement buffering control mechanism directly on it. I'm not so sure why separating buffering strategy from the base interface is so important (I understand it's simpler but ...). It's already a "buffer". We want to control size of it.

Could you please teach me or point some issue if exists to understand the reason you want to separate buffering from the BRS. As I've been stating, I really like number 5 of your plans.

We're already (asynchronously) observing BRS

Currently, we're trying to control the BRS not to buffer much data by looking at state and signaling back pressure to the source (as far as the source understands back pressure). This can be viewed as "wait() is to subscribe the readable stream as an observer" though it's not sync, I think.

You just made some tweak to make infinite transform stream to drain data sync to push(). If it's important to have such a feature (see the next section below), I think we shouldn't keep it special to piping, yes, I agree with you. We should also expose such synchronous observing interface (with dam while there's no observer so that we don't drop data on the floor) to users and pipeTo() can again just use it.

But I'm not convinced that such synchronous chaining is really essential. Once we bake the buffering strategy into the base, it's resolved at least for the "draining data push()-ed on BRS sync some writable stream". Streams primitive just provides convenient buffer with nice read/write interface, and if fine grained control is needed, users could control pipe() invocation and configure buffer sizes manually to make the system perform nicely, ... maybe.

If we really think making each push()/write() to drive whole chain (network) of streams synchronously to drain the buffer as much as possible synchronously, we should just have the sync-observe interface.

domenic commented 10 years ago

Yay, glad to get your thoughts @tyoshino.

Base stream interfaces should designed to work as a buffer

I think I agree in theory, but in practice I am not sure we can make this performant. In particular, any interface based on .wait() + read() runs into the problem. (More on that below.)

Could you please teach me or point some issue if exists to understand the reason you want to separate buffering from the BRS.

Sure. I don't necessarily want to separate buffering from BRS entirely, mind. I just think it would be nice if the buffering strategy was somehow separable from the stream itself, so that you could choose different ones using syntax like rs.pipeThrough(customBufferingStrategy). That way the complexity of a buffering strategy would be encapsulated entirely in a transform stream, instead of modifying the base stream interfaces.

The motivation here is largely to decouple concerns, if possible, and to provide a more flexible approach for custom buffering strategies. As-is, looking at ReadableStream and WritableStream, they essentially expose a few custom hooks into their innards, allowing special strategy parameters to their constructors to inject custom behavior there. The fact that this is necessary seems to me to imply the system is not decomposed enough: that we are working around flaws in it by letting the user override behavior in certain selected places. Why not more places? Why those particular places? E.g. in #76 you pointed out another place that should probably get a hook. You can imagine particular cases wanting more such hooks.

That said, this is somewhat abstract, and I certainly am not ruled by a need for a theoretical decoupling purity. If the best approach that meets all the use cases is number 5, possibly with another hook to cover #76, then that's what we'll do. But I want to explore other options first to see if they yield something better.

I appreciate you bringing things back to reality though. Perhaps it would be best to merge approach 5 into master ASAP and then continue developing alternatives like 2 and 4 in branches.

think we shouldn't keep it special to piping, yes, I agree with you. We should also expose such synchronous observing interface (with dam while there's no observer so that we don't drop data on the floor) to users and pipeTo() can again just use it.

This makes perfect sense to me. But once I started going down this path, it led me to my above rambly ideas! E.g., this dam you propose could just be a predefined buffering transform, instead of baking it into the base stream; then you keep buffers out of the base stream entirely. (Assuming my rambly ideas translate well into code, that is!)

The most straightforward hack to fix this would be to replace rs.wait().then(cb) with rs.wait(cb). We could keep the guarantee that cb will not run before wait returns, but we would discard the guarantee that cb runs with an empty stack (i.e. in the next microtask). So that if data gets pushed into rs, then rs can synchronously call cb within the same turn. That is, we mostly avoid Zalgo, although we don't get the clean stack guarantee of promises.

This is pretty distasteful, though, and hurts a future await-syntax usage (since await would work with promises, not this weird semi-async callback thing).

But I'm not convinced that such synchronous chaining is really essential.

I am convinced it is absolutely essential. Many, many transforms are synchronous, and imposing a next-microtask penalty for each step in such a synchronous transform chain is not a good thing. Even if your ultimate consumer is slow, you should get data to that consumer as fast as possible. It also is the foundational underlying behind any generic combinators (e.g. map, filter, etc.).

If we really think making each push()/write() to drive whole chain (network) of streams synchronously to drain the buffer as much as possible synchronously, we should just have the sync-observe interface.

Do you have an idea for this? Was rs.wait(cb) along the lines you were thinking?

domenic commented 10 years ago

Perhaps it would be best to merge approach 5 into master ASAP and then continue developing alternatives like 2 and 4 in branches.

Actually, I am not sure this is feasible. Given our next major work item is transform streams (that's the biggest missing feature IMO), the important problem that needs to be solved ASAP is whether synchronous transform streams are possible at all. This issue about buffering strategies is just the catalyst. Any ASAP solution that we merge in needs to solve that problem primarily.

tyoshino commented 10 years ago

Sorry for delay

I just think it would be nice if the buffering strategy was somehow separable

OK. Personally I'm fine if there we introduce a little complexity to integrate strategy into the BRS but don't object to trying the other options.

... possibly with another hook to cover #76, then that's what we'll do.

The base ideas behind my proposals are:

With this, I ... guess the API gets shaped well. It's possible there we find some important use cases not feasible to be covered as you're worried about... but for now.

But I want to explore other options first to see if they yield something better.

Yeah, let's keep trying.

rambly ideas

Looking forward to it!

The most straightforward ...

This sounds good to me.

await

Do you mean a method works the same as current wait() after the replace?

rs.wait(cb)

Yes, though we could give some longer/descriptive name if necessary.

domenic commented 10 years ago

I have largely given up on this idea, and baked queueing strategies into the base stream, with simple defaults.

I have also been convinced that, without experience-based performance arguments, synchronous transforms are not compelling enough to be worth the way they break the clean-stack invariant.

Closing this, but further discussion of queueing strategies, along the lines of the last few comments, to continue in #119.