whatwg / streams

Streams Standard
https://streams.spec.whatwg.org/
Other
1.35k stars 160 forks source link

Do the abort steps of ReadableStreamPipeTo really guarantee the abort callback to be called before cancel? #1229

Open saschanaz opened 2 years ago

saschanaz commented 2 years ago

Step 14 of the https://streams.spec.whatwg.org/#readable-stream-pipe-to is basically Promise.all([dest.abort(), source.cancel()]), assuming the states are writable/readable respectively. One WPT test for this asserts that abort() is called before cancel(): https://github.com/web-platform-tests/wpt/blob/285addceabb4443562a9b93d789b17230c3d6e20/streams/piping/abort.any.js#L215-L237

Blink passes this test, but not sure how. A slightly modified test without AbortSignal shows that the abort callback is called after the cancel callback (because the latter is called synchronously while the former is not), so I'd expect same for the AbortSignal test:

promise_test(t => {
  const events = [];
  const rs = new ReadableStream({
    pull(controller) {
      controller.error('failed to abort');
    },
    cancel() {
      events.push('cancel');
      return Promise.reject(error1);
    }
  }, hwm0);
  const ws = new WritableStream({
    abort() {
      events.push('abort');
      return Promise.reject(error2);
    }
  });
  return promise_rejects_exactly(t, error1, Promise.all([ws.abort(), rs.cancel()]), 'The cancel rejection happens first in this case')
      .then(() => assert_array_equals(events, ['cancel', 'abort'], 'cancel() is called first in this case'));
}, '');

Am I understanding something wrong?

domenic commented 2 years ago

This seems extremely related to https://github.com/whatwg/streams/pull/1208 and similar things @MattiasBuelens was working on... but the fact that the reference implementation passes all WPTs implies the current spec should be good enough.

Does https://github.com/whatwg/streams/blob/e9355ce79925947e8eb496563d599c329769d315/reference-implementation/lib/abstract-ops/readable-streams.js#L150-L177 help? It doesn't really look like Promise.all([dest.abort(), source.cancel()]) so I am not sure where you are getting that...

MattiasBuelens commented 2 years ago

This happens because the writable stream is not yet started. If you add a delay before you call abort and cancel, you get the events in the expected order:

promise_test(async t => {
  const events = [];
  const rs = new ReadableStream({
    pull(controller) {
      controller.error('failed to abort');
    },
    cancel() {
      events.push('cancel');
      return Promise.reject(error1);
    }
  }, { highWaterMark: 0 });
  const ws = new WritableStream({
    abort() {
      events.push('abort');
      return Promise.reject(error2);
    }
  });
  await flushAsyncEvents(); // <<< the important bit
  await promise_rejects_exactly(t, error2, Promise.all([ws.abort(), rs.cancel()]), 'The abort rejection happens first in this case');
  assert_array_equals(events, ['abort', 'cancel'], 'abort() is called first in this case');
}, '#1229');

I agree that this is surprising: we don't wait for the readable stream to be started before we call its cancel method. But there's a reason for this: the start() method may a long-running async producer.

let abortController = new AbortController();
const rs = new ReadableStream({
  async start(controller) {
    while (!abortController.signal.aborted) {
      controller.enqueue("a");
      await new Promise(r => setTimeout(r, 1000));
    }
  },
  cancel(reason) {
    controller.abort(reason);
  }
});

See also https://github.com/whatwg/streams/pull/1208#discussion_r793942484.

MattiasBuelens commented 2 years ago

Oh wait, hang on. That doesn't actually explain why pipeTo() works even without the await flushAsyncEvents()... πŸ˜…

It looks like the problem is that, even when the pipe is immediately aborted (with an aborted AbortSignal), the reference implementation doesn't execute the abort actions in the first microtask. That's because we enter this if branch: https://github.com/whatwg/streams/blob/e9355ce79925947e8eb496563d599c329769d315/reference-implementation/lib/abstract-ops/readable-streams.js#L293-L294

The implementation of waitForWritesToFinish() takes at least one microtask to transform the currentWrite promise, and then we add another microtask because of the uponFulfillment() itself. By that time, the writable stream in this WPT test has become started, and the abort() call is handled synchronously.

I've tried fixing this for the case where the pipe didn't start any writes at all (https://github.com/whatwg/streams/pull/1208#discussion_r793162698), but that caused even more problems... πŸ˜›

saschanaz commented 2 years ago

Thanks, #1208 looks indeed very much related! The current Gecko implementation does not wait at all there and thus calls the shutdown actions synchronously, while both the reference impl and Blink wait there per the following code:

var rs = new ReadableStream({ cancel() { console.log("canceled") } });
var ws = new WritableStream({ abort() { console.log("aborted") } });
var abortController = new AbortController();
var signal = abortController.signal;
abortController.abort();
rs.pipeTo(ws, { signal });
console.log("foo")

// Reference: foo aborted canceled
// Blink: foo aborted canceled
// Gecko: canceled foo aborted

Will #1208 change the behavior here as Gecko does?

MattiasBuelens commented 2 years ago

Will #1208 change the behavior here as Gecko does?

In its current state, no.

Personally, I think we should make pipeTo() do more things synchronously when possible, especially when the timing and results of those actions are observable by author code (i.e. when the readable and/or writable stream is constructed by author code). See https://github.com/whatwg/streams/pull/1208#discussion_r793955072 for Domenic's idea on how to make that happen. But that should probably go into a separate PR, since #1208 is already quite large as it is. πŸ˜›