MattiasBuelens / web-streams-polyfill

Web Streams, based on the WHATWG spec reference implementation
MIT License
289 stars 29 forks source link

[Question] TransformStream and Response as specified and implemented #143

Closed guest271314 closed 9 months ago

guest271314 commented 9 months ago

Consider this code

transformstream-response-test.js

(async () => {
  try {
    let { readable, writable } = new TransformStream();
    let writer = writable.getWriter();
    // new Response(readable).text().then(console.log);
    let encoder = new TextEncoder();
    console.log(readable, writable, writer);
    for (const data of "abcdef") {
      console.log({ data }); // {"data": "a"}
      await writer.ready;
      await writer.write(encoder.encode(data));
    }
    await writer.close();
    await writer.closed;
    console.log(
      "We never get here... if we comment line 4 and uncomment line 15",
    );
    console.log(await new Response(readable).text());
  } catch (e) {
    console.error(e);
  }
})();

I was expecting for console.log(await new Response(readable).text()); to log "abcdef" on Chromium 122.

That's not what happens.

Firefox Nightly 123 gives us a hint

(async () => {
  try {
    let { readable, writable } = new TransformStream();
    let writer = writable.getWriter();
    // new Response(readable).text().then(console.log);…
ReadableStream { locked: false }

WritableStream { locked: true }

WritableStreamDefaultWriter { closed: Promise { "pending" }, desiredSize: 1, ready: Promise { "fulfilled" } }
[debugger eval code:7:13](chrome://devtools/content/webconsole/debugger%20eval%20code)
Object { data: "a" }
[debugger eval code:9:15](chrome://devtools/content/webconsole/debugger%20eval%20code)
Promise { <state>: "pending" }

node 22 nightly tells us nothing, just exits after logging { data: 'a' }

node --experimental-default-type=module transformstream-response-test.js
ReadableStream { locked: false, state: 'readable', supportsBYOB: false } WritableStream { locked: true, state: 'writable' } WritableStreamDefaultWriter {
  stream: WritableStream { locked: true, state: 'writable' },
  close: Promise { <pending> },
  ready: Promise { undefined },
  desiredSize: 1
}
{ data: 'a' }

bun 1.0.23 exits, too

bun run transformstream-response-test.js
ReadableStream {
  locked: [Getter],
  cancel: [Function: cancel],
  getReader: [Function: getReader],
  pipeTo: [Function: pipeTo],
  pipeThrough: [Function: pipeThrough],
  tee: [Function: tee],
  values: [Function: values],
  [Symbol(Symbol.asyncIterator)]: [Function: lazyAsyncIterator],
} WritableStream {
  locked: true,
  abort: [Function: abort],
  close: [Function: close],
  getWriter: [Function: getWriter],
} WritableStreamDefaultWriter {
  closed: [Getter],
  desiredSize: [Getter],
  ready: [Getter],
  abort: [Function: abort],
  close: [Function: close],
  releaseLock: [Function: releaseLock],
  write: [Function: write],
}
{
  data: "a",
}

so does txiki.js

 tjs run transformstream-response-test.js
{ _state: 'readable',
  _reader: undefined,
  _storedError: undefined,
  _disturbed: false,
  _readableStreamController: 
   { _controlledReadableStream: [Circular],
     _queue: { _cursor: 0, _size: 0, _front: [Object], _back: [Object] },
     _queueTotalSize: 0,
     _started: false,
     _closeRequested: false,
     _pullAgain: false,
     _pulling: false,
     _strategySizeAlgorithm: [Function],
     _strategyHWM: 0,
     _pullAlgorithm: [Function: H],
     _cancelAlgorithm: [Function: G] } } { _state: 'writable',
  _storedError: undefined,
  _writer: 
   { _ownerWritableStream: [Circular],
     _readyPromise_resolve: undefined,
     _readyPromise_reject: undefined,
     _readyPromise: {},
     _readyPromiseState: 'fulfilled',
     _closedPromise_resolve: [Function],
     _closedPromise_reject: [Function],
     _closedPromiseState: 'pending',
     _closedPromise: {} },
  _writableStreamController: 
   { _controlledWritableStream: [Circular],
     _queue: { _cursor: 0, _size: 0, _front: [Object], _back: [Object] },
     _queueTotalSize: 0,
     _abortReason: undefined,
     _abortController: {},
     _started: false,
     _strategySizeAlgorithm: [Function],
     _strategyHWM: 1,
     _writeAlgorithm: [Function: A],
     _closeAlgorithm: [Function: M],
     _abortAlgorithm: [Function: L] },
  _writeRequests: 
   { _cursor: 0,
     _size: 0,
     _front: { _elements: [], _next: undefined },
     _back: { _elements: [], _next: undefined } },
  _inFlightWriteRequest: undefined,
  _closeRequest: undefined,
  _inFlightCloseRequest: undefined,
  _pendingAbortRequest: undefined,
  _backpressure: false } { _ownerWritableStream: 
   { _state: 'writable',
     _storedError: undefined,
     _writer: [Circular],
     _writableStreamController: 
      { _controlledWritableStream: [Circular],
        _queue: [Object],
        _queueTotalSize: 0,
        _abortReason: undefined,
        _abortController: {},
        _started: false,
        _strategySizeAlgorithm: [Function],
        _strategyHWM: 1,
        _writeAlgorithm: [Function: A],
        _closeAlgorithm: [Function: M],
        _abortAlgorithm: [Function: L] },
     _writeRequests: { _cursor: 0, _size: 0, _front: [Object], _back: [Object] },
     _inFlightWriteRequest: undefined,
     _closeRequest: undefined,
     _inFlightCloseRequest: undefined,
     _pendingAbortRequest: undefined,
     _backpressure: false },
  _readyPromise_resolve: undefined,
  _readyPromise_reject: undefined,
  _readyPromise: {},
  _readyPromiseState: 'fulfilled',
  _closedPromise_resolve: [Function],
  _closedPromise_reject: [Function],
  _closedPromiseState: 'pending',
  _closedPromise: {} }
{ data: 'a' }

deno 1.39.4 describes explicitly what Firefox Nightly logs as Promise { <state>: "pending" } when we comment out the (async() => {})() wrapper we only used for Firefox DevTools which doesn't implement top-level await in console

deno run -A transformstream-response-test.js
ReadableStream { locked: false } WritableStream { locked: true } WritableStreamDefaultWriter {
  closed: Promise { <pending> },
  desiredSize: 1,
  ready: Promise { undefined }
}
{ data: "a" }
error: Top-level await promise never resolved
      await writer.write(encoder.encode(data));
      ^
    at <anonymous> (file:///home/user/bin/transformstream-response-test.js:11:7)

Deno notifies use await writer.write(encoder.encode(data)); is never resolved after the first call to write().

If we comment line 15 (without (async()=>{})() wrapper, line 16 with the IIAF), before we use getWriter(), eventually "abcdef" is logged to console - except for tjs, which still just logs

{ data: 'a' } [object ReadableStream]

which is clearly a bug that I'll file over there.

What's going on here?

MattiasBuelens commented 9 months ago

The problem is backpressure.

The promises returned by writer.ready, writer.write(), writer.close() or writer.closed will only resolve once backpressure is relieved. However, by default, new TransformStream() sets the high water mark to 1 chunk for its writable side and 0 chunks for its readable side. This means that, if nobody is reading from the readable side, then the writable side will apply backpressure as soon as it has 1 chunk in its queue. Which is exactly what you see: chunk "a" is written, but then await writer.ready blocks on the next iteration.

If you uncomment line 4, then new Response(readable).text() will be actively pulling from the readable, and no chunks will remain in the writable's queue. Thus, await writer.ready resolves immediately and your loop can keep chugging along nicely.

One way to solve this would be to give the readable side an unbounded queue size:

let { readable, writable } = new TransformStream({}, {}, { highWaterMark: Infinity });

However, this is a really bad idea. You're not really "streaming" anything: first you're writing all chunks into a queue, and afterwards you're reading them all out again. Effectively, it would behave the same if you first push()ed all chunks into an array, and then looped through the array. πŸ€·β€β™‚οΈ

The best solution is still to set up the consumer as soon as possible, so that it can relieve backpressure as needed. So that means keeping new Response(readable).text() at line 4 instead of line 15. 😁

Alternatively, move your writer code into a separate async function that runs "on the side" of your consumer:

(async () => {
  try {
    let { readable, writable } = new TransformStream();
    writeData(writable).catch(() => {});
    console.log(await new Response(readable).text());
  } catch (e) {
    console.error(e);
  }
})();

async function writeData(writable) {
  let writer = writable.getWriter();
  try {
    let encoder = new TextEncoder();
    for (const data of 'abcdef') {
      console.log({ data }); // {"data": "a"}
      await writer.ready;
      // Purposefully don't await write()
      // See https://streams.spec.whatwg.org/#example-manual-write-with-backpressure
      // and https://streams.spec.whatwg.org/#example-manual-write-dont-await
      writer.write(encoder.encode(data)).catch(() => {});
    }
    await writer.close();
  } catch (e) {
    await writer.abort(e);
  }
}
guest271314 commented 9 months ago

Thanks.

On the specification side I am kind of surprised it is specification compliant to have a hanging Promise that is never going to be fulfilled. Now that I think about it I encountered this when creating a WebSocketStream I used pipeThorugh() without await else no data would be read

var {
  readable,
  writable
} = await wss.opened;
var now;
var writer = writable.getWriter();
var abortable = new AbortController();
var controller;
var {
  signal
} = abortable;
writer.closed.then(() => console.log('writer closed')).catch(() => console.log('writer closed error'));
let minutes = 0;
readable.pipeThrough(new TextDecoderStream()).pipeTo(
  new WritableStream({
    async start(c) {
      console.log(c);
      (async () => {
        while (!c.signal.aborted) {
          try {
            await new Promise((resolve, reject) => {
              c.signal.onabort = reject;
              setTimeout(resolve, 60000);
            });
            minutes++;
          } catch {} finally {
            console.log(minutes);
          }
        }
        console.log(c.signal.aborted);
      })();
      return controller = c;
    },
    write(v) {
      console.log(v);
    },
    close() {
      console.log('Socket closed');
    },
    abort(reason) {
      console.log({
        reason
      });
    }
  }), {
    signal
  }).then(() => console.log('pipeThrough, pipeTo Promise')).catch(() => console.log('caught'));

var encoder = new TextEncoder();
var enc = (text) => encoder.encode(text);

On the application side, I think I was looking for the highWaterMark: Infinity pattern for what I was working on a few days ago that prompted me to revisit this; converting a Node.js stream.Readable to a WHATWG ReadableStream. I would up using this pattern

let controller;

const webReadable = new ReadableStream({
  start(c) {
    return (controller = c);
  },
});

I'm going to re-read what you wrote, a few more times.

I just don't see how something as simple as the initial code block I posted is known to hang, and hand and exit, and still be WAI.

guest271314 commented 9 months ago

@MattiasBuelens It looks like txiki.js is using web-streams. When line 4 is un-commented and line 15 is commented we still get only {data: "a"}, so there's a bug in this repository for that case.

MattiasBuelens commented 9 months ago

I just don't see how something as simple as the initial code block I posted is known to hang, and hand and exit, and still be WAI.

There's tons of ways you can cause a program to hang with a single line of code. For example:

await new Promise((resolve) => { /* never actually call resolve() */});
console.log("never reached");

Streams apply backpressure if they have too many chunks in their queue. And yes, by default, they have a fairly low high water mark, so you'll hit that limit almost immediately. It's up to the developer to ensure that chunks can "flow" steadily through the streams, and don't end up "stuck" in a queue somewhere.

@MattiasBuelens It looks like txiki.js is using web-streams. When line 4 is un-commented and line 15 is commented we still get only {data: "a"}, so there's a bug in this repository for that case.

I haven't heard of txiki.js before... and I don't have time to build it from source. πŸ˜… It works fine in Node.js if you load the polyfill, see this live example.

Looking at their source code, they seem to be using whatwg-fetch as their fetch() implementation. That project does not support streaming fetch, see https://github.com/JakeChampion/fetch/issues/198#issuecomment-494361060. So new Response(readable) will not work in txiki.js, the Response constructor will just fall back to calling toString() on its input and you'll end up with the string "[object ReadableStream]" as your response body.

If you need streaming fetch support in txiki.js, you should talk to them instead. πŸ˜‰ Or, you should use a TextDecoderStream (or a polyfill for that) to read the chunks back into text.

guest271314 commented 9 months ago

txiki.js is pretty easy to build from source. Yeah, fetch() polyfills don't really work.

I just find the hanging Promise WAI problematic. Setting highWaterMark to Infinity is what I was looking for for the case I was working on.

Feel free to close this if you think it's not relevant here.

MattiasBuelens commented 9 months ago

Setting highWaterMark to Infinity is what I was looking for for the case I was working on.

I'm happy I could help you find a solution! Have a nice day. 😊