whatwg / streams

Streams Standard
https://streams.spec.whatwg.org/
Other
1.35k stars 160 forks source link

pipeTo: Read fulfilled after shutdown started #1207

Open evilpie opened 2 years ago

evilpie commented 2 years ago

The spec is quite clear that after shutting down there should be no new reads. However there is an open question in Firefox's old implementation of pipeTo about what happens when a read is fulfilled after shutdown. I am quoting Waldos' comment in full below:

If |source| becomes errored not during a pending read, it's clear we must react immediately.

But what if |source| becomes errored during a pending read? Should this first error, or the pending-read second error, predominate? Two semantics are possible when |source|/|dest| become closed or errored while there's a pending read:

  1. Wait until the read fulfills or rejects, then respond to the closure/error without regard to the read having fulfilled or rejected. (This will simply not react to the read being rejected, or it will queue up the read chunk to be written during shutdown.)
  2. React to the closure/error immediately per "Error and close states must be propagated". Then when the read fulfills or rejects later, do nothing.

The spec doesn't clearly require either semantics. It requires that already-read chunks be written (at least if |dest| didn't become errored or closed such that no further writes can occur). But it's silent as to not-fully-read chunks. (These semantic differences may only be observable with very carefully constructed readable/writable streams.)

It seems best, generally, to react to the temporally-earliest problem that arises, so we implement option 2. (Blink, in contrast, currently implements option 1.)

All specified reactions to a closure/error invoke either the shutdown, or shutdown with an action, algorithms. Those algorithms each abort if either shutdown algorithm has already been invoked. So we don't need to do anything special here to deal with a pending read.

https://searchfox.org/mozilla-central/rev/3de56eb5f266f523340e739ae1b53258e0a95dfe/js/src/builtin/streams/PipeToState.cpp#454-483

When working on my new implementation I actually just ended up doing what is described as option 1 (aka Chrome's behavior). My own intuition here is that it would be better to not just drop an already read chunk onto the floor and at least try to write it. The write might of course still fail anyway, considering that we might cancel the stream during shutdown or release the writer.

Sorry if this has already been discussed somewhere else.

MattiasBuelens commented 2 years ago

But what if |source| becomes errored during a pending read?

If the source becomes errored, then all pending and future reads must also error immediately, there's no confusion there. So the problem can only happen when dest becomes errored while we have a pending read.

When working on my new implementation I actually just ended up doing what is described as option 1 (aka Chrome's behavior). My own intuition here is that it would be better to not just drop an already read chunk onto the floor and at least try to write it.

Interesting! πŸ‘€

Now that we have #1168, I think it would be better to immediately release the reader at the start of the "shutdown" and "shutdown with an action" steps, rather than as part of the "finalize" steps. That gives us the best of both worlds: we can still shutdown the pipe immediately (without waiting for a pending read), while still ensuring that any unread chunks are kept in the stream's queue and are not dropped.

I'll give this a try with the reference implementation, and see how it would affect the behavior of the existing tests.

EDIT: Hmm, that does mean that the source becomes unlocked before the pipeTo() promise resolves. You could potentially acquire a new reader while the pipe is still shutting down... Not sure if that's acceptable. πŸ€”

MattiasBuelens commented 2 years ago

I've found at least one way to trigger buggy behavior, by using pipeTo() with an AbortSignal. One way to fix this is by releasing the reader at the start of the shutdown process, see #1208.

I'm not sure if I can trigger a similar bug by erroring the destination. We only wait for pending writes to complete if the destination is still "writable", so we won't do that in this case. πŸ€”

EDIT: Okay, so it is indeed possible to trigger a similar bug. The problem is that it's very sensitive to microtask timings, because the reference implementation uses the writer.closed promise to detect when the destination becomes errored. There's a few microtasks of delay between when you call controller.error() and when writer.closed becomes rejected. Any chunks enqueued between those events will be dropped: the pipe will read them (because it hasn't yet released its reader), but will be unable to write them to the destination (because it's already errored).

evilpie commented 2 years ago

Thank you for investigating this! I certainly assumed it's possible, but had some trouble coming up with a test case myself. What I came up with was using setTimeout and wasn't really reliable.

Any chunks enqueued between those events will be dropped: the pipe will read them (because it hasn't yet released its reader), but will be unable to write them to the destination (because it's already errored).

That is exactly what I was concerned about. I don't really have any ideas on how to fix this myself. I guess we really don't want to wait on the last read as well?

MattiasBuelens commented 2 years ago

The only option I see is to let the pipe synchronously react to the writable becoming errored. So instead of attaching a callback to the writer.closed promise (which always runs async), we would need some sort of "error callbacks" list (which we can run synchronously).

domenic commented 2 years ago

Would it also work to check if dest.[[state]] is "errored" before doing any reads from source?

It's unfortunate that both that solution (if it works) and the error callbacks one would make pipeTo unimplementable using public APIs. It was always a nice story that pipeTo was a higher-level combinator and the fundamental reader/writer APIs had just as much power. But maybe we have already crossed that bridge; I'm not sure...

I can't really see a way around it, though, at least with the current set of public APIs. The problem of those in-between microtasks seems pretty fundamental.

MattiasBuelens commented 2 years ago

Would it also work to check if dest.[[state]] is "errored" before doing any reads from source?

I think that might work, yes. Sounds doable. πŸ‘

It's unfortunate that both that solution (if it works) and the error callbacks one would make pipeTo unimplementable using public APIs. It was always a nice story that pipeTo was a higher-level combinator and the fundamental reader/writer APIs had just as much power. But maybe we have already crossed that bridge; I'm not sure...

You can still get 99% of the way there with only the reader and writer APIs. It's just that for that final 1%, you need the ability to synchronously read the stream's state, which is currently not possible with just those public APIs. (And believe me, I tried! See exhibit 1 and 2. πŸ˜…)

MattiasBuelens commented 2 years ago

Would it also work to check if dest.[[state]] is "errored" before doing any reads from source?

I think that might work, yes. Sounds doable. πŸ‘

Thinking about it some more, I don't think that will work. I think I confused myself while I wrote my previous reply. 😬

The problem is not that we start a new read after the destination has become errored. We're already awaiting writer.ready which is always rejected immediately in step 8 of WritableStreamStartErroring. It might be possible that the destination becomes errored during the same microtask that writer.ready becomes resolved, so a synchronous check may still be appropriate.

The real problem is when there's already a pending read, and then the destination becomes errored at the same time that a chunk is enqueued onto the source. So:

writableController.error("πŸ’₯");
readableController.enqueue("a");

Erroring the writable will asynchronously reject writer.closed, but enqueuing a chunk will synchronously call the read request's chunk steps. Even if we added a synchronous check here:

ReadableStreamDefaultReaderRead(
  reader,
  {
    chunkSteps: chunk => {
      if (dest._state !== 'writable') {
        // ...now what? πŸ€·β€β™‚οΈ
      }
      currentWrite = transformPromiseWith(
        WritableStreamDefaultWriterWrite(writer, chunk), undefined, () => {}
      );
      resolveRead(false);
    },
    closeSteps: () => resolveRead(true),
    errorSteps: rejectRead
  }
);

...we would still be unable to do anything useful with chunk. Ideally, we'd put it back at the start of the source's queue, but we don't have a way to do that.

The only way we can avoid this is if we can synchronously release the reader as soon as the destination becomes errored, i.e. attach an error callback to the writer.

(Note that doing enqueue() followed by error() can result in the newly enqueued chunk being successfully written to the destination before the destination becomes errored. The read request's chunk steps synchronously call writer.write(), which can synchronously call sink.write() and complete the write. So we don't "accidentally" drop a chunk in this case.)