whatwg / streams

Streams Standard
https://streams.spec.whatwg.org/
Other
1.34k stars 159 forks source link

simpler, less stateful basic streams? #104

Closed othiym23 closed 9 years ago

othiym23 commented 10 years ago

At the summit today, @evancz, @Gozala, and I sat down to try to hash out a version of the streams API that was the simplest possible interface that would satisfy the following requirements (but maybe not all at the same time, or without subclassing):

After a little back and forth, we came up with a pretty minimal set of operations:

We worked through a few of the common use cases and compared this with @Gozala's gist and it seemed like this was able to handle a wide variety of use cases fairly straightforwardly:

It also eliminates the started / waiting state machine that sits in the middle of BaseReadableStream (and seems like a natural evolution of @creationix's min-streams proposal). The cost of this approach is that either it tempts the wrath of Zalgo (although @Gozala's generators-based approach looks pretty clean in this regard) or it creates, to speak to @Benvie's point, a whole bunch more promises to deal with.

It also seems that as long as the interface is implemented in a standard way, it should be easy to drop in implementations that use writev internally, or use the "native piping" that Jonas Sicking was discussing. All while keeping a pretty simple core implementation that doesn't conflate too many concerns / complect unnecessarily.

@Gozala / @evancz, is this more or less what you were thinking?

Raynos commented 10 years ago

:+1:

evancz commented 10 years ago

Yeah, I think that is a good overview! Based on those primitives, you can define all sorts of more complex combinators, like map and filter but also infinite queues and all sorts of multiplexers. A lot of FRP stuff can be expressed in terms of this actually. Point is that the API itself is expressive enough to permit a lot of drastically different uses yet simple enough that it's actually pretty pleasant to use even without anything added.

Also, if there is ever light-weight concurrency in JS, I bet this will be an important API :)

domenic commented 10 years ago

I've spent a lot of time thinking about this since our discussions. As presented I don't think it works, for reasons I will get into. I am happy to continue discussion about it as an alternate, but my intuition is that if those problems are solved, the API will eventually evolve into something more or less like the current draft.

First, this does not provide a mechanism for separating out only-readable or only-writable streams (which of course have many real-world use cases). The current draft does so through the revealing constructor pattern in the same way promises does, whereas this proposal includes both put and take on the API. When I mentioned this to @Gozala he proposed instead returning objects which only expose the subset of the API via bound functions, e.g. return { take: channel.take.bind(channel) }. This isn't really acceptable since it means all read-only instances on the platform would no longer be able to share a common prototype and method definition, but instead have own-properties for every method. You lose many of the expected features of a platform API if it is not specified through the prototypal model.

Second, as @othiym23 alludes to, the methods given can either be promise-returning--- in which case they are unacceptably causing a next-tick--- or they can be Zalgo-releasing, e.g. @Gozala's version returns either a promise or a non-promise depending on whether it's available. Neither of these is really acceptable. The only solution I believe we have found for this kind of situation, going back to Isaac's original post on public-webapps, is to provide the kind of state + read() + wait() trio that exists in the current draft, or exists in Node streams (null as sentinel + read() + on('readable', ...)).

Third, I don't see how it allows notification of changes of the size of the total/available slots. I.e. what happens if getFreeCount() returns zero--- do I have to poll to wait for it to become non-zero? This seems to lead to some kind of wait() method like the current draft has. However from our discussions it seemed like the truth here is more complicated and the proposed system does have ways of working around that that I wasn't really able to understand, at least not in a way that lasted until 24 hours later. So perhaps this point does not hold up to scrutiny.

Fourth, as-is there is no API for handling abort or cancel, which are very real use cases and are necessary for the level of streams we need to be specifying to unblock other web platform specs.

All that said, I fully support efforts to add backpressure strategies via composition (e.g., by piping to a buffering stream). I am still holding out hope that this can be done: #24. If so we can have the spec include e.g. ReadableStream, WritableStream, and LengthBufferingTransformStream instead of LengthBufferingStrategy; then platform specs will be defined as creating ReadableStreams naively and then piping them immediately to LengthBufferingTransformStreams whose output is actually exposed to the user, or similar. Doing so would be a really really helpful thing for anyone looking to evolve the current spec in a way that is not very radical and would be welcomed as a clear, uncontroversial, no-downsides win by everyone involved.

Gozala commented 10 years ago

I've spent a lot of time thinking about this since our discussions. As presented I don't think it works, for reasons I will get into. I am happy to continue discussion about it as an alternate, but my intuition is that if those problems are solved, the API will eventually evolve into something more or less like the current draft.

I'm happy to work with you and anyone interested to prove this simple API can handle all the use cases can have elegant solutions.

First, this does not provide a mechanism for separating out only-readable or only-writable streams (which of course have many real-world use cases). The current draft does so through the revealing constructor pattern in the same way promises does, whereas this proposal includes both put and take on the API. When I mentioned this to @Gozala he proposed instead returning objects which only expose the subset of the API via bound functions, e.g. return { take: channel.take.bind(channel) }. This isn't really acceptable since it means all read-only instances on the platform would no longer be able to share a common prototype and method definition, but instead have own-properties for every method. You lose many of the expected features of a platform API if it is not specified through the prototypal model.

That is true, I said that maybe one of the simplest solutions that I could think of top of my head. I also pointed out that different languages provide different APIs to address this specific point. For example there is always an alternative to separate input and output ports like:

let { input, output } = new Channel();

Of course that would imply that you would be able to only input.take() or output.put(). Now that being said there are pros and cons and I'll get to them later.

There is also another alternative to actually subtype Channel's which is more or less what I have had in mind:

class Input extends Channel {
  constructor: function(source) {
     this[@@source] = source
  },
  put(data) {
    throw TypeErorr("Can't put on input")
  },
  take() {
    return this[@@source].take()
  }
}

const Input = channel => new Input(channel)

class Output extends Channel {
  constructor: function(source) {
     this[@@source] = source
  },
  put(data) {
    return this[@@source].put(data)
  },
  take(data) {
    throw TypeErorr("Can't teke form output")
  },
}

const output = channel => new Output(channel)

Given above you could decide to share only read / write versions of the existing channel, which specific users.

Gozala commented 10 years ago

Second, as @othiym23 alludes to, the methods given can either be promise-returning--- in which case they are unacceptably causing a next-tick--- or they can be Zalgo-releasing, e.g. @Gozala's version returns either a promise or a non-promise depending on whether it's available. Neither of these is really acceptable. The only solution I believe we have found for this kind of situation, going back to Isaac's original post on public-webapps, is to provide the kind of state + read() + wait() trio that exists in the current draft, or exists in Node streams (null as sentinel + read() + on('readable', ...)).

I think example why thas is Zalgo-releasing would be useful. But let's put that aside, always returning a promise is in fact a pretty reasonable solution & I don't quite agree that would prevent a better throughput with in a single tick. Let me try to explain why. Only way one could hit the tick overhead issues is if producer put's data puts data faster then receiver takes it from it. Of course that can happen, but consumer can also easily make sure that would not be a throughput bottleneck, by taking data as it comes in and buffering it into something other than channel itself:

const balance = (input, concat) {
  const buffer = []
  const output = new Channel();
  const next = (write) => {
    const read = input.take();
    write.then(() => { buffer.splice(0) });
    read.then(data => {
      buffer.push(data);
      // if not taken yet we're going to take it on our own.
      output.take();
      next(output.put(concat(...buffer)));
    })
  }
  next();
  return output;
}

Above balance function can be given any input channel and function that knows how to combine chunks together. In return it returns channel that guarantees maximum throughput that will make sure that final consumer won't have to pay the cost of ticks on each take.

This is one strategy but there can be a lot more and the best part strategy that works best for the application use case can be employed.

Also note that with channels simple use cases are not constrained by the whole state machine, but if your application needs to get best possible throughput, there is an option of applying simple strategy like one above to address that.

Gozala commented 10 years ago

Third, I don't see how it allows notification of changes of the size of the total/available slots. I.e. what happens if getFreeCount() returns zero--- do I have to poll to wait for it to become non-zero? This seems to lead to some kind of wait() method like the current draft has. However from our discussions it seemed like the truth here is more complicated and the proposed system does have ways of working around that that I wasn't really able to understand, at least not in a way that lasted until 24 hours later. So perhaps this point does not hold up to scrutiny.

I don't think getFilledCount or getSize are necessary. Note they're not present in my prototype in the gist neither I don't recall mentioning them yesterday.

othiym23 commented 10 years ago

getFilledCount and getSize were part of @evancz's sketch of how a channel-based API might also handle LWM / HWM strategies. I agree they're not necessary in the minimum viable API.

Gozala commented 10 years ago

Fourth, as-is there is no API for handling abort or cancel, which are very real use cases and are necessary for the level of streams we need to be specifying to unblock other web platform specs.

That is not true, although we have not talked about it too much. There is notion of closing a channel let's think of it as channel.put(Channel.end) operation (let's not bikeshed on the API itself we can find a good way to express that). The point is that both put's and takes can observe the fact that channel is closed. So if you wanna abort the task only thing you need to do is close it. I'm hesitating to put all the code in here, but as mentioned I do plan to put implementation and implement all the examples including aborts.

Gozala commented 10 years ago

getFilledCount and getSize were part of @evancz's sketch of how a channel-based API might also handle LWM / HWM strategies. I agree they're not necessary in the minimum viable API.

I would say size and filledCount are more of properties of buffers. For example in core.async you have sliding-buffer & dropping-buffer etc.. Which can be expressed as subclasses of Channels that maintain certain number of data items in them. They also apply different strategies when overflowing.

Gozala commented 10 years ago

Cc @jlongster as he expressed his interest in this subject on twitter & I imagine he knows more of the CSP stuff than I do.

Gozala commented 10 years ago

I would also recommend taking about an hour and a half to watch two different takes on introducing CSP concepts into a modern languages. Given that new languages like go, rust & clojure introduce all the same ideas there must be something to it:

  1. Rob Pike - Concurrency Is Not Parallelism http://vimeo.com/49718712

    This is good introduction to a channels in Go that is one of the key concepts in the language and also full of good examples how different use cases require different IO coordination and how channels let you express them.

  2. Rich Hickey - Clojure core.async http://www.infoq.com/presentations/clojure-core-async

    Rich Hickey introduces CSP concepts in Clojure(Script) explaining all of the merits of concurrency they introduce even in single threaded environments like browsers.

Gozala commented 10 years ago

Little bit of an off topic, but I wanted to throw it out it here, since at the summit there were ppl asking for combinatorial functions and FRP style signals. I think given such a channels it's pretty straight forward to build up an FRP stuff. I think @evancz even mentioned something along this lines yesterday.

othiym23 commented 10 years ago

When I mentioned this to @Gozala he proposed instead returning objects which only expose the subset of the API via bound functions, e.g. return { take: channel.take.bind(channel) }. This isn't really acceptable since it means all read-only instances on the platform would no longer be able to share a common prototype and method definition, but instead have own-properties for every method.

Leaving aside performance considerations for the moment, why is this bad? The state encapsulated by this definition is fairly minimal, and the API is as minimal as we could make it. There's just not much there except the stream instance itself.

You lose many of the expected features of a platform API if it is not specified through the prototypal model.

Can you enumerate some of these? I'm not trying to play devil's advocate here -- I don't know what you consider the attributes of a platform API. My own expectations here are minimal, and mostly around consistent interfaces for use with duck typing.

[T]he methods given can either be promise-returning--- in which case they are unacceptably causing a next-tick--- or they can be Zalgo-releasing, e.g. @Gozala's version returns either a promise or a non-promise depending on whether it's available. Neither of these is really acceptable.

Why is a promise-returning implementation necessarily unacceptable? Now that promises are a part of the web platform, they can be optimized, and I see no inherent reason why it shouldn't be able to make microtasks roughly as fast as straight loops once some work has been done on the implementation.

I think the current Node solution is pretty gross, FWIW. I'm also not thrilled with the need for an explicit state attribute on the stream. It feels like synthesized, redundant information and as such a potential source of pretty subtle, racy bugs.

Third, I don't see how it allows notification of changes of the size of the total/available slots. I.e. what happens if getFreeCount() returns zero--- do I have to poll to wait for it to become non-zero?

@Gozala is right, this conflates buffering with the channel, and as such may be state that shouldn't be exposed. However, in principle, it was intended for extension -- a delegate or subclass could monitor the number of free slots, and use that to implement high-water and low-water marks.

I also realized, upon reflection, that the problem of returning a promise on write is that it prevents the propagation of backpressure upstream unless the contract were that each new chunk is only written as part of the resolution of the previously-returned promise.

Fourth, as-is there is no API for handling abort or cancel, which are very real use cases and are necessary for the level of streams we need to be specifying to unblock other web platform specs.

Cancellable streams seem to me like cancellable promises -- they're a specialized tool for specialized use cases, and as such cancellation shouldn't be included in the base API. That said, either sentinel values passed through to readers / writers, or rejecting pending promises, seem to be a straightforward way to implement cancellation.

Handling stream aborting is a specific instance of deciding upon error-handling strategies. I probably should have just recorded this part of yesterday's conversation, because I'm sure to misrepresent somebody's position here, but it seems like there are multiple ways to respond to operational errors here -- retry the operation, ignore the failure, or invalidate the stream. These seem to be options that should be left up to either the implementor of a given type of streams, or the consumer of the stream API. The current approach feels inflexible from that point of view, because it presumes one failure shuts down the stream.

I'm in favor of doing what we can to provide an API with the minimum necessary surface area, and provide everything else via stratification (whether that's composition, subclassing, or delegation). I like Irakli's suggestion that we check out Rob Pike's and Rich Hickey's presentations; I've found that Hickey, in particular, is exceptionally good at finding fundamental abstraction and exposing them in very powerful ways.

othiym23 commented 10 years ago

I just watched the core.async presentation and while I found it to be as lucid and coherent as Hickey's talks typically are, I found the way he conflated the use of the thread and go constructs made difficult to evaluate the applicability of core.async's model to the problem we're trying to solve. In particular, being able to switch between CPS-style inversion of control (i.e. the JS callback model) and blocking on real threads (i.e. the Java queue model) makes it easy to handwave your way around the problem of backpressure, by allowing users to decide whether to make put and take blocking for particular applications. Right now the biggest problem of the proposals defined on this issue is discovering how to exert backpressure without blocking, which is what wait() and state help provide.

Gozala commented 10 years ago

I also realized, upon reflection, that the problem of returning a promise on write is that it prevents the propagation of backpressure upstream unless the contract were that each new chunk is only written as part of the resolution of the previously-returned promise.

That is actually not the problem but rather a feature, note:

  1. This allows communicating upstream weather data is drained yet or not, if not that implies that upstream is going too fast.
  2. Consumer could use things like buffers to give producers some flexibility range, in a way low / high marks are used in node to not push back on source too soon.
  3. There is always a chance that producer may not respect consumers throughput and keep on pushing data (and there is nothing wrong with it BTW). Consumer still has flexibility to put some balancing logic between itself and producer. For example sliding-buffer(n) would allow producer to push up to n items, n + 1 will still be pushed but would pop 1st item. There is also dropping-buffer(n) which is similar but ignores last item. There are more strategies (infinite buffering for one) but the point is it's a consumers problem how to deal with producers that don't respect throughput characteristics of the consumer. If some producers do respect throughput than even better.

In nutshell the reason why it is a feature is because it allows producer and consumer to coordinate without necessarily following the same throughput.

Gozala commented 10 years ago

I just watched the core.async presentation and while I found it to be as lucid and coherent as Hickey's talks typically are, I found the way he conflated the use of the thread and go constructs made difficult to evaluate the applicability of core.async's model to the problem we're trying to solve.

Please note that core.async is not only a solution for JVM, clojurescript run's in the browser and has exact same concerns as we do. core.async is their solution for these problems as much as it is for ones in JVM. go routines are essentially generators with http://taskjs.org/ or async await or other takes of the same core idea.

In particular, being able to switch between CPS-style inversion of control (i.e. the JS callback model) and blocking on real threads (i.e. the Java queue model) makes it easy to handwave your way around the problem of backpressure, by allowing users to decide whether to make put and take blocking for particular applications. Right now the biggest problem of the proposals defined on this issue is discovering how to exert backpressure without blocking, which is what wait() and state help provide.

So in clojurescript there is no blocking, they came up with term parking. Which same technique as task.js and alike use to park generator until promise is resolved and then resume it. Here is example of that from my gist:

let forward = function*(input, output) {
  let message = void(0)
  // put messages onto output until falsy message is received
  // Note there is no `yield` in front of `put` which means messages
  // are to be forwarded as they arrive ignoring weather they're drained
   // or not.
  while (message = yield take(input))
    put(output, message)
}

go(forward, input, output)

Above example illustrates something similar to stream.pipe that does not actually respects output channels back pressure cause messages are put onto output as soon as they are received from input. Note that if you want to respect the backpressure it's as easy as this:

let pipe = function*(input, output) {
  let message = void(0)
  // put messages onto output until falsy message is received
  // Note that `yield` in front of `put`, this way routine is parked
  // until channel has more room to take more messages. This
  // way we also apply backpressure on input as no items are
  // taken
  while (message = yield take(input))
    yield put(output, message)
}

I thin this is simple and powerful and would be even better with async await. What I'm trying to get to blocking is not essential in there the point is providing points of synchronization which can be achieved by blocking, parking or continuation passing (parking is kind of it). Also note that generators make this a lot more expressive but same could be expressed with plain old promises and actually even callbacks.

othiym23 commented 10 years ago

I understand that part of core.async works in ClojureScript, but a lot of Rich's talk is around constructs that aren't available, like >!!, <!!, and (thread [body] ...). The examples he shows with blocking all rely on the threaded model. Because he's discussing the package as a whole, he doesn't really distinguish between Clojure and ClojureScript as distinctly as he could, which makes it tricky to figure out how much is applicable to us.

Maybe generators do help us around this problem, but I'm not sure how your example pipe exerts backpressure upstream, as every call that hits the yield will merely suspend the call, rather than blocking execution or signaling a pause. What am I missing?

I think that something that relies on generators or ES6 promises is fine, but anything that requires TaskJS-style CPS transforms would fail just because it would fel deeply unnatural to most web platform devs, not to mention being a lot of complexity to add for one platform API. I need to look at how the go macro is written more closely, as well as how the other core.async operations rely on it. I looked at it some last night, but I was pretty tired.

I think the current plan is that await is deferred to ES7, so that's firmly in the unusable future as far as this proposal is concerned. It's just syntax, though, so that's fine.

othiym23 commented 10 years ago

This allows communicating upstream weather data is drained yet or not, if not that implies that upstream is going too fast.

I think I understand how this works for a rendezvous channel (where each put returns a promise that is resolved on the corresponding take) but I don't know what happens when you have a channel with size greater than one, or if a consumer continues to put to the rendezvous channel when it's waiting for the take. This is why I think blocking is an important part of this model, and if you can show me how to deal with these issues, that addresses a lot of my concerns around the core.async model.

Gozala commented 10 years ago

You mostly got it!

So channels usually have a precinfigured size. Put's return promises that are redolved when there's more space on a queue, in a channel with size one that would be when item put was taken but usually size is usually greater, so for n size chanbel promis is resolved whenever queued items are less then n.

On Sunday, April 6, 2014, Forrest L Norvell notifications@github.com wrote:

This allows communicating upstream weather data is drained yet or not, if not that implies that upstream is going too fast.

I think I understand how this works for a rendezvous channel (where each put returns a promise that is resolved on the corresponding take) but I don't know what happens when you have a channel with size greater than one, or if a consumer continues to put to the rendezvous channel when it's waiting for the take. This is why I think blocking is an important part of this model, and if you can show me how to deal with these issues, that addresses a lot of my concerns around the core.async model.

— Reply to this email directly or view it on GitHubhttps://github.com/whatwg/streams/issues/104#issuecomment-39673835 .

Regards

Irakli Gozalishvili Web: http://www.jeditoolkit.com/

Gozala commented 10 years ago

I

I understand that part of core.async works in ClojureScript, but a lot of Rich's talk is around constructs that aren't available, like >!!, <!!, and (thread [body] ...). The examples he shows with blocking all rely on the threaded model. Because he's discussing the package as a whole, he doesn't really distinguish between Clojure and ClojureScript as distinctly as he could, which makes it tricky to figure out how much is applicable to us.

Yeah you right he talk about >!! and <!! as well which are same as >! and <! only difference is the first ones are only available to clojure, later ones are available in everywhere and they are parking.

Gozala commented 10 years ago

I think that something that relies on generators or ES6 promises is fine, but anything that requires TaskJS-style CPS transforms would fail just because it would fel deeply unnatural to most web platform devs, not to mention being a lot of complexity to add for one platform API. I need to look at how the go macro is written more closely, as well as how the other core.async operations rely on it. I looked at it some last night, but I was pretty tired.

Task.js in fact does not do any transformation it's just a simple driver of the generator that resumes it when yielded promises are resolved. In fact it's just genesis of these kind of librarires, I have written one back in a time when only spidermonkey had generators there is now co I believe @Raynos has one and there are tons of exploration on how to improve node with these ideas:

http://dailyjs.com/2013/05/31/suspend/ http://dailyjs.com/2014/01/09/koa/

As a matter of fact async await is just a sugar to do exactly that. Any how this is just a sugar and not a "must have". In fact use of promises is also just a sugar if you look at my gist implementation it just uses plain old callbacks instead. That being said I think this sugar both promises and generators make a lot nicer to use API.

Gozala commented 10 years ago

I think I understand how this works for a rendezvous channel (where each put returns a promise that is resolved on the corresponding take) but I don't know what happens when you have a channel with size greater than one, or if a consumer continues to put to the rendezvous channel when it's waiting for the take. This is why I think blocking is an important part of this model, and if you can show me how to deal with these issues, that addresses a lot of my concerns around the core.async model.

I think I did not did a best job explaining it as it was from my reply was from a phone. Let me try over again:

  1. Channels are essentially a queues into which you can put data and take data from.
  2. As any other resource channels also have a limited size of things they can hold. Idea is that once queue is full (all the slots of it are taken) put will block / park until more space is available. Which is expressed in form of channel.put(x) : Promise. This also implies that if you put on a channel that has more empty slots there will be no blocking, meaning it will return immediately resolved promise Promise.cast(true).
  3. So channels are queues but really cool ones, meaning that you can take values from it even before they're put on them, you can also queue up putting values on them by putting even if there is no space left. This means if I do var [one, two] = [channel.take(), channel.take()]; one will be resolved with a value that is first put (already or later) on a channel and two will be resolved with a value that was put after the value which fulfilled one.
  4. So I said put's are also queued up. Let's consider following example. var [a, b] = [channel.put(1), channel.put(2)];. In this example we didn't bother to deal with backpressure (although we could). Note that if channel is full a promise will resolve on first take form the channel, but b will resolve only on second take from that channel. If channel had only one empty slot, a would resolve immediately while b would on first take. If there are more then one empty slots both a and b would resolve immediately.

Now all this properties together should explain a pipe example I hope:

let pipe = function*(input, output) {
  let message = void(0)
  // put messages onto output until falsy message is received
  // Note that `yield` in front of `put`, this way routine is parked
  // until channel has more room to take more messages. This
  // way we also apply backpressure on input as no items are
  // taken
  while (message = yield take(input))
    yield put(output, message)
}

First we wait on take(input) whenever that occurs we wait on put(output, message), which means that if there is more data put onto input it will be queued either by applying back pressure or not depending if that channel has more empty slots. Once output can handle more puts we resume and a next message form input, which frees one more slot on input which would resolve put(input, x) if there was one pending.

Now same can be expressed without generators of course:

function pipe(input, output) {
   (function next() {
     input.take().then(function(message) {
       if (message !== Channel.end)
          output.put(message).then(next)
       else
          output.put(message)
     });
   })();
}
ghost commented 10 years ago

Task.spawn can be implemented with a single simple function.

function spawn(thunk) {
  return new Promise(function(resolve, reject) {
    var gen = typeof thunk === "function" ? thunk() : thunk;
    function _next(v) { return handle(gen.next(v)) }
    function _throw(e) { return handle(gen.throw(e)) }
    function handle(result) {
      var value = result.value;
      if (!result.done && value && typeof value.then === "function") {
        return value.then(_next, _throw);
      }
      return value;
    }
    Promise.resolve(_next()).then(resolve, reject);
  });
}
othiym23 commented 10 years ago

Sorry, everybody, I conflated Task.js, which doesn't do CPS transforms, with TameJS / IcedCoffeeScript, which does. I get them confused all the time, for which I apologize.

othiym23 commented 10 years ago

I think I'm almost there (I have no idea where @domenic is with all of this :ok_woman:), but here's the last piece of the backpressure puzzle I'm trying to figure out: let's say we have a high-bandwidth incoming network connection that is feeding its data directly into a slow, computationally-intensive transformer through a stream with a channel size of 1 (just for simplicity's sake). Is the idea that the incoming connection would be doing something like the following?

function onconnection(socket) {
  var stream = new Stream({size : 1})

  function readable() {
    if (socket.isClosed()) {
      stream.close()
    }
    else {
      stream.put(socket.read()).then(readable)
    }
  }

  readable()

  return stream
}

With the idea being that the underlying low-level connection will stop taking data and cause queueing at the interface / back-pressure to be passed back to the previous node in the system? If not, how would you see this working?

Gozala commented 10 years ago

So I would expect that underlying system API providing a high bandwidth channel would respect a back-pressure meaning that if provided socket channel is full (no empty slots are available) it will wait until slot becomes available (someone took from it) before pushing more data.

Your example is pretty close in that regard, it just socket.read() (presumably socket.take()) will be a promise and there for should be awaited before putting (for more info see the pipe example)

I would also expect that users of system channels would pipe it through a buffer(n) to give some throughput leeway to a platform.

Ideally platform provided channels would actually be behind something that would join aggregated data to provide best possible throughput (see example of balance for that).

Don't know if it's clear, but I'm on a go now, I can try to explain myself better when I have more time.

Gozala commented 10 years ago

Ok so I have tried to formalize all of this in my fork: https://github.com/Gozala/streams

Gozala commented 10 years ago

I have also made a reference implementation, although I have not actually run it yet: https://github.com/Gozala/streams/blob/master/reference-implementation/lib/channel.js

I'm going to write tests and update Examples & Requirements pages when I'll get another free time slot. I'd happily collaborate with anyone wishing to help me out doing either of this.

Gozala commented 10 years ago

So at this poin I'd say implementation is complete (still need to write tests, but have being testing it from repl for some time): https://github.com/Gozala/streams/tree/master/reference-implementation

I have fully updated readme to reflect everything I was trying to suggest an to reflect implementation algorithm: https://github.com/Gozala/streams/blob/master/README.md

I have also updated examples to reflect bunch of points / questions raised here or elsewhere, there is still some work to be done on this, but I think it has already does good job of answering bunch of constraints @domenic has raised earlier: https://github.com/Gozala/streams/blob/master/Examples.md

I really hope we can get somewhere with this and just a waste of my time.

othiym23 commented 10 years ago

Sweet! I've been spending my week making presentation materials for training I'm doing next week, and I've gotta keep plowing through that this weekend, but I'll try to carve out some time this weekend to check out your implementation. Thanks, Irakli!

Raynos commented 10 years ago

@Gozala

I'm a bit concerned about your usage of a channel for errors in StreamingSocket.

If we compare existing error mechanisms

It would seem like using a channel for errors has no way of being notified (either through unhandledrejection or uncaughtexception) when an unhandled error occurs.

Raynos commented 10 years ago

@Gozala can you remove ReadableFile and below from your examples.md

It is confusing that there are examples for streams in the channels examples section.

Raynos commented 10 years ago

I created a gist ( https://gist.github.com/Raynos/10564079 ) for the error channel discussion / concern.

Raynos commented 10 years ago

I like what I see so far btw. :+1: from me.

I need to audit all the concerns @othiym23 and @domenic brought up and write examples to handle them.

Raynos commented 10 years ago

gozala circumvents the delay semantics of promises in an interesting fashion.

When you put() onto a channel you do not have to wait for put to resolve before you know whether to apply back pressure to a lower level source. His Buffer semantics allow you to pass a buffer to a Channel and then ask the buffer directly whether it is full.

The same for the take() semantics. You can specify a buffer that will return the entire buffer body upon taking. This means you can call a single take() and get all the data instead of having to call multiple take()'s, thus meaning you only have one next tick penalty.

Gozala commented 10 years ago

The same for the take() semantics. You can specify a buffer that will return the entire buffer body upon taking. This means you can call a single take() and get all the data instead of having to call multiple take()'s, thus meaning you only have one next tick penalty.

Or if you buffer does not do aggregation, you could also queue up multiple takes until buffer is full, that way they all going to be resolved on the same, next tick.

Gozala commented 10 years ago

@Gozala

I'm a bit concerned about your usage of a channel for errors in StreamingSocket.

If we compare existing error mechanisms

setting an onerror property on a low level object. When an error occurs on a raw object it will invoke the onerror function and will throw if it doesn't exist
emitting an 'error' event on a node event emitter. When an error occurs and there are no error listeners it will throw.
rejecting a promise. Promises can be globally configured with a "onunhandledrejections" listener so you know when an unhandled error occurs. Promises also have debugger integration for unhandled rejections.

It would seem like using a channel for errors has no way of being notified (either through unhandledrejection or uncaughtexception) when an unhandled error occurs.

I've added little more information in regards to error handling into Examples.md, I also tried to demonstrate with FileReader example that channels / streams are not a hammer for every nail, although one could use it as component to build something more appropriate for the task. That is also where given a context errors handling can be implemented such that it would make sense for a given context.

Gozala commented 10 years ago

Adding @dherman as he kindly spend some of his time looking into work I have being putting into introducing CSP based channel API as an alternative, which can be found here:

https://github.com/gozala/streams

At this point I have also ported most examples from original proposal and added some more of my own along:

https://github.com/Gozala/streams/blob/master/Examples.md

Given that combinatorial transforms are still not quite address by streams API, I thought it was a good idea to show off how easy it is to do with something like channels:

https://github.com/Gozala/streams/blob/master/Examples.md#combinators

Given the comment in #107 I feel like decision making process is driven by politics rather than technical merits so I'll probably suspend my efforts until there will be any real consideration of APIs that is not based of node's.

Gozala commented 10 years ago

Also adding @wycats to the discussion as I believe @dherman mentioned he was also seeking better alternative to current de facto API.

Raynos commented 10 years ago

I'm impressed with the combinators.

merge(streams) has been incredibly hard to implement correctly for node streams1 & node streams2.

The merge(channels) functions is simple and correct. This is impressive. I'd probably attribute the simplicity to the limited scope that channels have. The fact that channels have a small set of concerns means combinators are easy to implement.

jlongster commented 10 years ago

+1 on this from me, I love the channels API much more than the current stream API. I haven't had time to fully dig into this, but from what I hear of several smart people most if not all of the issues with channels can be solved, so I hope whatwg takes a truly serious look at it.

fitzgen commented 10 years ago

+1 for making streams CSP-like channels that play nice with combinators.

+1 for splitting input and output streams (rather than having a stream that you can both put and take from). You can implement either version in terms of each other, but the common case is consumer(s) != producer(s) so it should be catered to.

Gozala commented 10 years ago

+1 for splitting input and output streams (rather than having a stream that you can both put and take from). You can implement either version in terms of each other, but the common case is consumer(s) != producer(s) so it should be catered to.

In the current reference manual & implementation that's they way things are already ;)

fitzgen commented 10 years ago

Ah! My bad! I didn't read the code yet, just the discussion backlog.

domenic commented 9 years ago

So, thinking has evolved a lot since this original issue. One the one hand, the recently-merged PR introduced async read(), so that should make some people happy. On the other, we've learned a lot more about byte streams and the intricacies behind them. (I'm doing a blog series to explain the constraints.) I think it's time to close off this discussion, but thanks everyone for pushing it.