whatwg / streams

Streams Standard
https://streams.spec.whatwg.org/
Other
1.34k stars 159 forks source link

Finalize pull/pullInto behavior #423

Closed tyoshino closed 8 years ago

tyoshino commented 8 years ago

(Branched from https://github.com/whatwg/streams/issues/379#issuecomment-171823158)

Current situation

Based on the result of the discussion at https://github.com/whatwg/streams/issues/363, we've introduced pullInto for ReadableByteStream.

The pull and pullInto in the ReadableByteStream were designed to be invoked in response to read() and read(view) call on the stream. It was temporary design.

Now, in the PR https://github.com/whatwg/streams/pull/418, I'm converging ReadableStream and ReadableByteStream. Based on the conclusion on https://github.com/whatwg/streams/issues/353, ReadableByteStream has a queue in it as well as ReadableStream. I've also ported the strategy based queuing control mechanism to ReadableByteStream for converging. So, now the 1:1 correspondence between read()/read(view) and pull/pullInto is impossible.

I'm trying to give a reasonable semantics to them.


Role of pullInto

Currently, pullInto has a role of notifying the underlying byte source of the outstanding TypedArray (and region on it) to which the underlying byte source should write generated data. So, whenever:

the controller should invoke a pullInto.

Unlike pullInto, the signal of pull is just that "the amount in the queue has changed". This is probable by looking at desiredSize.


Plan taken in the PR

My plan was:

I planned to do nothing on invocation of controller methods. For example, after the source calls controller.enqueue() in pull (or outside pull), the source can probe how much more data is required to fill the queue by accessing controller.desiredSize. But I realized it's more complicated. After calling enqueue() in response to pullInto (either in pullInto or async to pullInto), it's possible there's more read(view)s are waiting.

It seems some more redesign should be made.


Invocation timing and repetition

ReadableByteStream has a new pull/pullInto invocation scheme.

tyoshino commented 8 years ago

How about this?

tyoshino commented 8 years ago

Sorry, I forgot to include this in the OP. In RBS I adopted the way that we ignore the result of pull. RS looks at it and delay pull call if the pull returned a pending promise.

See also https://github.com/whatwg/streams/issues/354.

I'd like to revisit this point too.

domenic commented 8 years ago

So just to be clear,

Plan taken in the PR

is in the PR, but you think it is not quite right, and

Invocation timing and repetition

is the proposed fix? Assuming that, I am still pretty confused...

pull is required when the current call stack contains any pull/pullInto

I am not sure exactly what this means...

pull/pullInto invocation code is in (or followed by) a while loop

This basically means, once a pull happens, you keep doing pull until desiredSize = 0? And, once a pullInto happens, you keep doing pullInto until .... when?

How about this?

Hmm what is the advantage of byobRequest over an argument?

I'd like to revisit this point too.

What would you like to revisit about it?

tyoshino commented 8 years ago

is in the PR, but you think it is not quite right, and

Right

is the proposed fix? Assuming that, I am still pretty confused...

Sorry for confusing. I did bad in organizing the sections. "Invocation timing and repetition" section is related to the desiredSize design discussed in the "Plan taken in the PR" section, but a separate problem from it.

Actually, https://github.com/whatwg/streams/issues/423#issuecomment-171963395 is the proposed fix. I wanted to allow the underlying byte source to probe what to do next inside pull/pullInto function without waiting for the next pull/pullInto function to be invoked. Via desiredSize accessor, the stream tells the underlying source how much data is needed. But as we introduce the BYOB reading in addition to the non-BYOB one, desiredSize is no longer enough to probe what to do next.

tyoshino commented 8 years ago

Hmm what is the advantage of byobRequest over an argument?

With desiredSize, an underlying source can probe how much data there is in the queue. So, the source can fill the queue in a single pull().

c.enqueue();
if (c.desiredSize < ...) {
  // enqueue more data
}

The source doesn't have to wait for another pull() to know the queue still has free space. This was previously realized by the return value of the enqueue() method, and now is implemented by desiredSize.


Suppose that we're in a pullInto invocation. We just enqueue()-ed or respond()-ed the pullInto. If highWaterMark is set to e.g. 1024 byte, desiredSize returns 1024 byte. But now the stream still has one more read(view) pending. The source's policy is that it wants to fill a given ArrayBuffer (provided via pullInto args) if any, but otherwise, wants to fill the queue by enqueue(). Without byobRequest, we need to invoke pullInto again to notify the source that there's one more read(view). The source needs to wait for 1 micro task to see whether such pullInto occurs or not to decide whether it should use enqueue() or not (use respond() in response to pullInto).

tyoshino commented 8 years ago

This basically means, once a pull happens, you keep doing pull until desiredSize = 0? And, once a pullInto happens, you keep doing pullInto until .... when?

We keep doing pull if some operation that suggests running pull again happens inside that pull.

For example, maybe it's based on my old suggestion (I said that an underlying source should be notified of any kind of event that affects queue), controller.enqueue() would suggest re-running pull.

pull() {
  controller.enqueue(x);
}

The enqueue() above would set _pullAgain.

Actually, currently, in the ReadableStream with non-byte source, EnqueueInReadableStreamController() does that. It sets pullAgain, and in the pullPromise handler, ReadableStreamControllerCallPullIfNeeded() is ran again. ReadableStream with byte source does this by using a while loop.

If any operation (which suggests running pull/pullInto again) doesn't occur inside a pull/pullInto invocation, we stop the loop.

For example, when stream.read(view) is invoked and view is a Uint32Array,

pullInto(<view>);
// enqueue() was called with 1 byte. Call pullInto again to notify that more data is needed
pullInto(<view with new region (the 1st byte was filled)>);
// enqueue() was called with 1 byte. One more.
pullInto(<view with new region (2 bytes has been filled)>);
// Nothing happened this time. Stop running pull() and wait for the source to generate more data
tyoshino commented 8 years ago

What would you like to revisit about it?

Whether or not we use the return value of pull call to suppress pull invocation (if pending promise is returned, suppress pull invocation until the promise gets resolved).

domenic commented 8 years ago

OK, I think I understand. And apologies for the long response time; it has been a crazy week.

Suppose that we're in a pullInto invocation. We just enqueue()-ed or respond()-ed the pullInto. If highWaterMark is set to e.g. 1024 byte, desiredSize returns 1024 byte. But now the stream still has one more read(view) pending. The source's policy is that it wants to fill a given ArrayBuffer (provided via pullInto args) if any, but otherwise, wants to fill the queue by enqueue(). Without byobRequest, we need to invoke pullInto again to notify the source that there's one more read(view). The source needs to wait for 1 micro task to see whether such pullInto occurs or not to decide whether it should use enqueue() or not (use respond() in response to pullInto).

I started writing up an argument against this. In particular I don't think the microtask is a big deal. But the moment I wrote some example code, it turns out that .byobRequest is really really nice. But also exposes some potential problems. Let me explain.

Remember the distinction between pull vs. push sources, which in the end comes down to files vs. sockets.

For files, you want to write your underlying byte source as a slightly modified version of the spec's existing example:

pullInto(controller) {
  const buffer = controller.byobRequest;
  return fs.read(fd, buffer, 0, buffer.byteLength, position).then(bytesRead => {
    if (bytesRead === 0) {
      return fs.close(fd).then(() => controller.close());
    } else {
      position += bytesRead;
      controller.respond();
    }
  });
}

In this example, it doesn't make much difference whether it's an argument to pullInto or whether it's controller.byobRequest. I think it's nicer as an argument, but it doesn't really matter much.

In contrast, the socket example is very different. You want to write your byte source like this (based off the spec's existing example, but using the socket API from the blog post):

start(controller) {
  socket.on("readable", () => {
    // At this point the socket has data ready for us, so we need to do
    // one of two things:
    // 1. If there's a pending BYOB request, fill it up.
    // 2. Otherwise, enqueue it in the internal queue, so as to get it out of the
    //    kernel and ensure the kernel buffer doesn't ever overflow.
    let bytesRead;
    if (controller.byobRequest) {
      bytesRead = socket.readInto(controller.byobRequest, 0, controller.byobRequest.byteLength);
      controller.respond();
    } else {
      const buffer = new ArrayBuffer(1024); // TODO pick a better number than 1024
      bytesRead = socket.readInto(buffer, 0, buffer.byteLength);
      controller.enqueue(buffer);
    }

    if (controller.desiredSize <= 0) {
      // This implies there are zero BYOB requests *and*
      // the internal queue has filled up to the internal queue HWM.
      socket.readStop();
    }

    if (bytesRead === 0) {
      controller.close();
    }
    // maybe -1 is error too
  });
}

pull() {
  // Called if the internal queue is emptied (and there are no BYOB requests?)
  socket.readStart();
}

The socket example doesn't actually want to wait for pullInto at all. Does this seem right to you? (I'm really really sorry if we've discussed this before and are going in circles :()

The socket example exposes a few issues.

I know a lot of your examples were concerned with "what if you can't fill up the entire buffer", or "what if there are multiple queued byob requests". I think in those cases it's actually fine to do another pull call (or wait for another socket.on("readable" in start). The microtask isn't a big deal. But thinking about the kind of main-stream scenarios here, like I've outlined above, I think byobRequest solves them in an important way.

Whether or not we use the return value of pull call to suppress pull invocation (if pending promise is returned, suppress pull invocation until the promise gets resolved).

I don't really understand the argument against the current behavior yet. Maybe it will change after we reconsider the above. But: the most important part of the current behavior is the eror propagation. I agree that in all the examples so far you always call controller.enqueue() or controller.close() at basically the same time the promise fulfills. So if there are examples where using the promise timing to suppress further calls is bad, I'm open to changing it, as long as we keep error propagation.

tyoshino commented 8 years ago

The socket example doesn't actually want to wait for pullInto at all. Does this seem right to you? (I'm really really sorry if we've discussed this before and are going in circles :()

Oh, nice analysis! Yeah, the socket API has the following characteristics:

This matches the byobRequest model well since read request notification and outstanding request getter are separate.

First, maybe it should be c.byobRequest.buffer + c.byobRequest.respond() or something, just for clarity.

Good idea.

But more importantly, it means that in some cases an underlying byte source might not have pullInto. I guess the fix for this is to add an explicit boolean like { start(), pull(), byob: true }. And either pullInto present implies byob: true or you get an error if you forget byob: true but also supply pullInto.

OK! The number of arguments increases, but it's clearer how the byte / non-byte distinction is made.

Maybe we just merge pull and pullInto??

I'll give it a try.

microtask isn't a big deal

Just to be clear, I'm not concerned with the cost (time) of the microtasks, but API cleanness. We need to manually enqueue a microtask to see if the pullInto happens or not. If such pullInto doesn't happen, the manually created task will be executed next. You know that pullInto didn't happen and proceed to run controller.enqueue(). My concern was that we shouldn't ask the implementor to wait for 1 microtask "manually".

tyoshino commented 8 years ago

I don't really understand the argument against the current behavior yet.

I rethought about this. It seems this is not a big deal after introducing the byobRequest. I'll check.

as long as we keep error propagation

Ah, OK. This is important. I forgot to discuss this point.

tyoshino commented 8 years ago

Finished at https://github.com/whatwg/streams/commit/55dca720b35e28c497b30f5de0b391bd74d202e7