whatwg / streams

Streams Standard
https://streams.spec.whatwg.org/
Other
1.35k stars 161 forks source link

Transform streams and acknowledgement of writes #329

Open domenic opened 9 years ago

domenic commented 9 years ago

Spinning off from roughly https://github.com/yutakahirano/fetch-with-streams/issues/30#issuecomment-92954801, I think we want to take a more general look at how we envision transform streams working.

Here is some code for reference:

const ts = TransformStream.identity({ highWaterMark: 0 }); // speculative API

ts.writable.write("foo").then(onWritten, onWriteFailed);

setTimeout(() => {
  ts.readable.getReader().read().then(({ value, done }) => {
    // (1)
    return doSomethingAsyncWith(value);
  });
}, 1000);

In the current design, onWritten will always be called ASAP, whereas onWriteFailed will never happen. (In the more general transform stream case, onWritten will be called when the possibly-async transformation finishes, while onWriteFailed will be called if it fails.) The current design is based on how Node.js/io.js handles things.

In the linked thread, @wanderview proposes (indirectly) a design where onWritten is called only after the chunk leaves the transform stream to head for the readable stream (i.e. after 1 second, roughly around the same time as (1)). In this design onWriteFailed is also never called.

@tyoshino's read-acknowledgment proposal (#324 and previous) would allow for a third design, where onWritten would be called once doSomethingAsyncWith(value) succeeds, and onWriteFailed would be called if it fails. This would require a slight modification to the above, e.g. something like

ts.readable.getReader({ requireAcks: true }).read().then(({ value, done, waitUntil }) => {
  waitUntil(doSomethingAsyncWith(value));
});

which could be a burden on the consumer to remember to use.


There's two aspects to keep in mind with these proposals:

I think all can propagate backpressure pretty well, with minor tricks. But propagating write success/failure all the way from the ultimate sink to the original producer is only really possible with #324, I think. (Except perhaps through things like #325 which could help with special cases...)

tyoshino commented 9 years ago

When one wants to create a chain to exchange operations, they choose to use the ack functionality carefully. If they don't, they can use one without the functionality.

So, in case the consumer choose to use .read() without ack, maybe we just ack on .read() fulfillment.

Regarding the buffer reusable .write() (https://github.com/yutakahirano/fetch-with-streams/issues/30#issuecomment-92942922 https://github.com/yutakahirano/fetch-with-streams/issues/30#issuecomment-92955087), I think in case the user chooses readable-without-ack while the writable is buffer-reusable, we just cannot avoid copying contents. I.e., when fulfilling .read() (without ack functionality), we create a new view, return the view to the consumer and return the original view back to the producer. If the consumer really care efficiency, the consumer can choose to use .read() with ack functionality to return used buffer to the readable.

yutakahirano commented 9 years ago

This comment is for byte streams.

const ts = TransformStream.identity({ highWaterMark: 0 });
const ws = ts.writable;
const rs = ts.readable;
const writer = (get a reader);
const reader = (get a writer);

Let me define readers as:

who allocates a buffer has read ack ?
default source N
BYOB user N
with-ack source Y

Let me define writers as:

returns buffer when write-fulfilled?
default N
retaining Y

Is the below behavior reasonable?

w \ r default BYOB with-ack
default pass buffer from the writer user to the reader user buffer the writer user provided buffer and copy it to the reader user provided buffer pass buffer from the writer user to the reader user
retaining Copy the writer user provide buffer and pass it to the reader user. Return the original buffer to the writer user. Buffer the writer user provided buffer and copy it to the reader user provided buffer. Return the original buffer to the writer user. Pass buffer from the writer user to the reader user. When the reader user says it's OK to return it, return it to the writer user.
domenic commented 9 years ago

It seems reasonable... I can't think of any other good behaviors at least. It's a bit surprising how BYOB becomes a footgun. I wonder if user-allocated buffer + has read ack would be better? If so maybe we should not have user-allocated buffer without ack at all?

Hmm, but I wonder if MSE for example will never be done with the buffer and so would just never ack. Maybe that's OK?

yutakahirano commented 9 years ago

It's a bit surprising how BYOB becomes a footgun.

Yeah, both two writers above provide buffers, so they don't work well with BYOB which provides its buffer for reading. If there is a writer that takes a function which will be called with a buffer provided via BYOB reader, it will work well with BYOB reader (But I think it is a bad idea).

Stating the reading way in the spec may be beneficial ("The fetch operation will read from the given stream with a with-ack reader" for example).

tyoshino commented 9 years ago

If there is a writer that takes a function which will be called with a buffer provided via BYOB reader, it will work well with BYOB reader (But I think it is a bad idea).

In the operationStream branch, I prototyped similar one. With some modification, that would be,

  1. obtain a writer. writer.beginWrite() returns a promise which will be fulfilled with an ArrayBufferView representing a memory region to which the producer should write data
  2. the producer writes bytes into the region
  3. the producer notifies that it finished writing by calling writer.done(view) where view is backed by the same ArrayBuffer as one returned by writer.beginWriter() (possibly ArrayBuffer.transfer()-ed) but pointing at a region to which the producer has written generated bytes.

This also works well with BYOB-style reader side. The returned view will be returned through the BYOB interface.

tyoshino commented 9 years ago

A row describing ideal behavior for the beginWrite() approach to add to https://github.com/whatwg/streams/issues/329#issuecomment-95052407.

w \ r default BYOB with-ack
beginWrite id allocates a buffer, fulfills beginWrite() with the buffer. Once completed, give it to the consumer Fulfill beginWrite() with the buffer provided to read(view). Fulfill read(view) with the buffer returned by the producer Same as the left most. Ack is ignored
tyoshino commented 9 years ago

Ownership movement

w \ r default BYOB with-ack
default (A) producer -> id -> consumer (B) producer -> id. consumer -> id -> consumer (C) producer -> id -> consumer. (Ack ignored)
retaining (D) producer ->id -> producer. id -> consumer (E) producer -> id -> producer. consumer -> id -> consumer (F) producer -> id -> consumer -> id -> producer
beginWrite (G) id -> producer -> id -> consumer (H) consumer -> id -> producer -> id -> consumer (I) id -> producer -> id -> consumer. (Ack ignored)
tyoshino commented 9 years ago

Let's also think about what ideal data transfer algorithms are for various combination of source and dest.

producer \ consumer give me a buffer. I'll consume it. I'll take it. (WritableStream allows this if we don't give "return" meaning to fulfillment of writePromise) (ReadableStreamController interface) write here. I'll consume it (Corresponds to RBS BYOB reader customer, and beginWrite-style writer) give me a buffer. I'll consume it. I'll return it when done (realizes domenic's desire of error propagation. WritableStream's writePromise allows this)
I'll give you a buffer containing generated data. Take it. (Corresponds to RBS normal reader) (a) producer -> algorithm -> consumer (b) producer -> algorithm. consumer -> algorithm -> consumer (c) producer -> algorithm -> consumer. Throw away returned one
I'll give you a buffer containing generated data. Please return it when done. (Corresponds to RBS extended as proposed in https://github.com/whatwg/streams/issues/324) (d) producer -> algorithm -> producer. algorithm -> consumer (e) producer -> algorithm -> producer. consumer -> algorithm -> consumer (f) producer -> algorithm -> consumer -> algorithm -> producer
give me a buffer. I'll generate into it. (Corresponds to RBS BYOB reader) (g) algorithm -> producer -> algorithm -> consumer (h) consumer -> algorithm -> producer -> algorithm -> consumer (i) algorithm -> producer -> algorithm -> consumer. THrow away returned one
I'll show you a buffer containing generated data. Please copy the bytes synchronously. I don't give you its ownership
tyoshino commented 9 years ago

Note that beginWrite style writer cannot work well with push-style source.

tyoshino commented 9 years ago

Observations

tyoshino commented 9 years ago

I guess we should clearly distinguish write-with-ack and write-and-forget interface by providing two different getters. Otherwise, we cannot determine whether the stream should return the given buffer or not. If it's not required to return it to the producer, the stream can pass it to the consumer ((a) in https://github.com/whatwg/streams/issues/329#issuecomment-106719172). We should ask the producer to declare what he/she is expecting.

class IdentityTransformByteStreamWritableSide {
  getWriter()
  getBufferBorrowingWriter()
  getBufferProvidingWriter()
}
class IdentityTransformByteStreamReadableSide {
  getReader()
  getByobReader()
  getBufferLendingReader()
}
domenic commented 9 years ago

This is a great analysis @tyoshino. Thank you so much for digging in to it.

The breakdown of default/borrowing/providing and default/byob/lending is really insightful. I am concerned about the ergonomics for authors, regarding choosing between all of these. But maybe we can hide that so that pipeTo and pipeThrough make the smart decisions automatically? Authors end up using getReader() and getWriter() almost all of the time, except sometimes they use getByobReader() when they want read(n)-type functionality.

It feels a like we might be over-engineering... But it's also important to do acknowledgement and piping right... hmm. Hopefully we can do this in layers: ReadableStream -> ReadableStream with BYOB reader (to allow read(n) functionality if nothing else) -> full flexibility.

getBufferLendingReader()

This works for more than just buffers though, right?


Still lots to think on here. But I am hopeful we are on the right track...

tyoshino commented 9 years ago

But maybe we can hide that so that pipeTo and pipeThrough make the smart decisions automatically

Yeah. At least for pipeTo, we can just implement the right thing to do.

Authors end up using getReader() and getWriter() almost all of the time, except sometimes they use getByobReader() when they want read(n)-type functionality.

I agree that people who want to use the most basic one shouldn't be bothered to understand the other extended ones. I need to explore more to realize it.

Hopefully we can do this in layers: ReadableStream -> ReadableStream with BYOB reader (to allow read(n) functionality if nothing else) -> full flexibility

Interesting. Yeah, actually we're allowing read(n) by BYOB. We can think of a variant of read(n) family that does not take a buffer but read only up to n. And, we can easily think of that for ReadableStream.

getBufferLendingReader()

This works for more than just buffers though, right?

Ah, yes. The transform stream waits for ack and if the producer side is using buffer-borrowing-writer, it forwards the ack signal. Not just a buffer.

tyoshino commented 9 years ago

Merged (c) of #323 to https://github.com/whatwg/streams/issues/329#issuecomment-106719172.

tyoshino commented 9 years ago

I agree that people who want to use the most basic one shouldn't be bothered to understand the other extended ones. I need to explore more to realize it.

Here's the algorithm.

action happened current state things to do
writer.write(x) no pending read queue
bufferBorrowingWriter.write(x) no pending read leave it pending
bufferProvidingWriter.beginWrite() no pending read leave it pending
reader.read() no pending write leave it pending
byobReader.read(x) no pending write leave it pending
bufferLendingReader.read() no pending write leave it pending
writer.write(x) pending reader.read() fulfill read() with x
reader.read() x in queue Same
writer.write(x) pending byobReader.read(y) copy bytes from y to x; queue.push(xRemaining)
byobReader.read(y) x in queue Same
writer.write(x) pending bufferLendingReader.read() fulfill read() with x (no-op (GC) on ack)
bufferLendingReader.read() x in queue Same
bufferBorrowingWriter.write(x) pending reader.read() clone x and fulfill read() with xClone; return x
reader.read() pending bufferBorrowingWriter.write(x) Same
bufferBorrowingWriter.write(x) pending byobReader.read(y) copy bytes from y to x; return x or borrowedQueue.push(xRemaining)
byobReader.read(y) pending bufferBorrowingWriter.write(x) Same
bufferBorrowingWriter.write(x) pending bufferLendingReader.read() fulfill read() with x (return x on ack)
bufferLendingReader.read() pending bufferBorrowingWriter.write(x) Same
bufferProvidingWriter.beginWrite() pending reader.read() allocate buffer; give buffer to producer; fulfill read() with buffer on ack
reader.read() pending bufferProvidingWriter.beginWrite() Same
bufferProvidingWriter.beginWrite() pending byobReader.read(x) give x to producer; fulfill on ack
byobReader.read(x) pending bufferProvidingWriter.beginWrite() Same
bufferProvidingWriter.beginWrite() pending bufferLendingReader.read() allocate buffer; give buffer to producer; fulfill read() with buffer on ack; no-op (GC) on reader side ack
bufferLendingReader.read() pending bufferProvidingWriter.beginWrite() Same
tyoshino commented 9 years ago

In PR #361, we can say that: