whatwg / streams

Streams Standard
1.35k stars 160 forks source link

Transferable streams: the double transfer problem #1063

Open ricea opened 4 years ago

ricea commented 4 years ago

The original transferable streams issue has been closed now that support has been landed, however the discussion of the double transfer problem that started at https://github.com/whatwg/streams/issues/276#issuecomment-482797085 and consumed the rest of the thread is not concluded.

Summarising the issue, the following code works fine:

const worker = new Worker(`data:text/javascript,onmessage = evt => postMessage(evt.data, [evt.data]);`);
const rs = new ReadableStream();
worker.postMessage(rs, [rs]);
let transferred_rs;
worker.onmessage = evt => { transferred_rs = evt.data; };

However, if you subsequently run worker.terminate() then transferred_rs will start returning errors. For other transferable types, no connection remains with any previous realms that the object was passed through, but in the case of streams, data is still being proxied via the worker.

See the linked thread for why this is hard to fix.

jimmywarting commented 4 years ago

left a ball of roller coster and created more issues thats need to be fixed, don't sound like an easy task to write spec and working cross multiple threads and pipes 😅

so here is rough workaround for those who stubble up on this issue and seeks for a solution that works right now

MessageChannel solution ```js // this method pass the readableStream into a messageChannel in order to create a more direct communication var worker = new Worker(`data:text/javascript,onmessage = evt => postMessage(evt.data, [evt.data]);`); var rs = new ReadableStream({ start(c) { c.enqueue('a') c.enqueue('b') c.enqueue('c') c.close() } }) // send the transferable readableStream // to a messageChannel port var mc = new MessageChannel() mc.port1.postMessage(rs, [rs]) mc.port1.close() // post the port instead of the transfered stream worker.postMessage(mc.port2, [mc.port2]) worker.onmessage = evt => { var port = evt.data // now you can terminate the worker since you now got a messageChannel port that you can listen to instead. // the web worker don't have anything to do with the transfered readableStream anymore worker.terminate() port.onmessage = evt => { port.close() port.onmessage = null // sucess evt.data.pipeTo(new WritableStream({ write(x) { console.log(x) }, close() { console.log('closed') } })) } } ```
alvestrand commented 3 years ago

I think the proper model here is that transferring a ReadableStream transfers the ability to read from the stream - the end that writes to the stream is not transferred, and the connection between the writing end and the reading end remains in place.

Isn't this the same model as for message channesl?

MattiasBuelens commented 2 years ago

I think the proper model here is that transferring a ReadableStream transfers the ability to read from the stream - the end that writes to the stream is not transferred, and the connection between the writing end and the reading end remains in place.

That's the idea, yes.

The problem is mostly technical: we have to figure out how to make that work. There's a whole discussion on the original issue about all the peculiarities we have to deal with, such as:

ricea commented 2 years ago

Here's a sketch of an approach which reconciles the atomic nature of transfer with the asynchronous nature of streams.

I'm going to talk about the WritableStream case because I think it is the harder of the two.

  1. We give up the nice abstraction of the "cross-realm identity transform" because it makes the following stuff harder. Instead we have separate transfer logic for ReadableStream and WritableStream.
  2. We have a stream with the underlying sink in the original realm O and the transferred writable stream in realm A.
  3. We want to transfer from realm A to realm B.
  4. The atomic part of the transfer works much the same as before (it's a bit ugly because we can no longer use the piping logic to synchronise the states).
  5. The new WritableStream sends a message back to A asking for a new message pipe. It starts queuing writes rather than sending them to A (this may result in backpressure being applied).
  6. The WritableStream in A waits until its write queue is empty, and then forwards the "new message pipe request" back to O.
  7. O constructs a new message pipe and sends it to A, which passes it on to B.
  8. B starts sending writes down the new message pipe directly to O.
  9. B sends a message to A to close itself.

After step 9, realm A has been "unhooked" and can safely be destroyed.

There can be an arbitrary delay for queued writes to complete before A is "unhooked". Maybe we can force the queued chunks from A into O's queue by ignoring backpressure to make this delay as short as possible?

youennf commented 2 years ago

There can be an arbitrary delay for queued writes to complete before A is "unhooked".

That is an ergonomic issue: one calls postMessage, everything is fine so one navigates away thinking everything is good, but too quickly so that the unhooking does not happen. The current spec approach stays on the safe side so that there is no surprise.

MattiasBuelens commented 2 years ago

Hmm, delaying the transfer of the queued chunks is indeed quite risky.

While the proposed solution could work for WritableStream, I'm not so sure it'd work for a ReadableStream. The current spec tries to avoid sending chunks from the original stream to the transferred stream that are not yet being requested, so ideally the transferred stream's queue is always empty. However, other spec changes might make it possible for that queue to become non-empty. For example, with #1103, you might do this:

const controller = new AbortController();
const reader = readable.getReader({ signal: controller.signal });
reader.read(); // causes the cross-realm readable to send a "pull" message
controller.abort(); // discards the pending read request
// At this point, we have no pending read requests, but we are already pulling...

// After some time, we receive a "chunk" message and put the chunk in the queue.
// Which means that if you now transfer the stream...
worker.postMessage(readable, { transfer: [readable] });
// ...we have to do something with the chunks in the queue first.

I think it's better if we transfer the entire queue synchronously as part of the transfer steps. In the transfer-receiving steps, we would re-enqueue those chunks with controller.enqueue() and writer.write().

  1. We give up the nice abstraction of the "cross-realm identity transform" because it makes the following stuff harder. Instead we have separate transfer logic for ReadableStream and WritableStream.

So the transfer steps for ReadableStream would acquire a reader, and for WritableStream they would acquire a writer? I think I would like that better than the current solution with pipeTo(), actually. 😛

After step 9, realm A has been "unhooked" and can safely be destroyed.

Is step 9 needed? Can't A close itself immediately after step 7?