whatwg / streams

Streams Standard
https://streams.spec.whatwg.org/
Other
1.35k stars 160 forks source link

Is when pull() is called on an underlying source deterministic? #1155

Closed jan-ivar closed 3 years ago

jan-ivar commented 3 years ago

I tried writing a proof-of-concept fiddle in Canary of an underlying source that manages the lifetimes of its chunks in JS (see https://github.com/w3c/mediacapture-transform/issues/56), and am seeing some surprise behaviors. Before filing a bug, I'm wondering if its to spec.

The only signal an underlying source has to work off of here seems to be the pull method on it.

This underlying source keeps track of N number of chunks it has enqueued and calls .close() on the oldest chunk when it receives a pull of N+1 — the goal being for "closed A" to appear after "done processing A" in the immediate sink.

The trouble is determining N. It appears to be highWaterMark + 1 for any value of highWaterMark on the producer, provided all subsequent transforms have a highWaterMark = 1.

But if subsequent transforms have highWaterMark > 1 then the number of pulls on the source increase from the get-go, throwing off any assumptions about what's going on.

I'd appreciate any pointers or analysis of the spec algorithms that support what's going on in Chrome here.

MattiasBuelens commented 3 years ago

Each individual stream fills up to its own highWaterMark, pulling in chunks as necessary. So if you have a pipe chain like this:

const rs1 = new ReadableStream({
  pull(controller) {
    console.log("pull");
    controller.enqueue("chunk");
  }
}, { highWaterMark: 5 });
const rs2 = rs1
  .pipeThrough(new TransformStream({}, { highWaterMark: 2 }))
  .pipeThrough(new TransformStream({}, { highWaterMark: 1 }));

then this will log "pull" 8 times (5 + 2 + 1) to fill up all queues. If you then start reading from rs2:

const reader = rs2.getReader();
await reader.read();

then you'll get a chunk out of the last stream's queue. Since that causes its queue to dip below its high water mark (1), it'll pull a chunk from the previous stream. The same thing happens with this stream: a chunk is pulled from its queue, it dips below its high water mark (2), and pulls a chunk from rs1. rs1 immediately provides a chunk from its queue, also dips below its high water mark, and pulls from its underlying source.

In general, I don't think it's possible for a ReadableStream to know when "the final consumer" has received a previously enqueued chunk. That chunk can still be in the queue of a transform stream further down the pipe chain (as shown above), or inside the internal state of a transform() method that is currently processing the chunk. I think the only way to do this is to actually make the final consumer (a WritableStream acting as a sink, or a destructive TransformStream) manually close() the chunk when it (eventually) receives it.

On a slightly related note: what would you expect to happen when you enqueue a couple of VideoFrames and then the stream becomes canceled? The Streams standard will discard all queued chunks, but then it's up to the garbage collector to clean up any resources associated with these VideoFrames. Can that cause problems if these are not cleaned up soon enough? 🤔

jan-ivar commented 3 years ago

@MattiasBuelens thanks for the great explanation! While this matches what I see over time, my issue is how it gets there:

Each individual stream fills up to its own highWaterMark, pulling in chunks as necessary.

That would make sense (chunks shifted downstream without exceeding highWaterMarks of individual streams).

But that's not what I'm seeing. Instead I see rs1 pull in 7 chunks off the bat, shifting only one of them downstream. (I've added a 1 second delay in the transforms to help examine order).

If I change the first transform's highWaterMark from 2 to 10, rs1 pulls in 15 chunks off the bat, almost 200% over its highWaterMark.

10 of them seem in limbo, since rs1's desiredSize is 0. But things stay in this state for a whole second. Where are they?

In general, I don't think it's possible for a ReadableStream to know when "the final consumer" has received a previously enqueued chunk

To clarify https://github.com/w3c/mediacapture-transform/issues/56, that proposal is not to extend lifetime to "the final consumer", only to the immediate consumer(s). Those would need to frame.clone() to forward downstream.

make the final consumer ... manually close() the chunk when it (eventually) receives it.

https://github.com/w3c/mediacapture-transform/issues/56 is about that not working with tee().

MattiasBuelens commented 3 years ago

I noticed a bug in ChunkProcessor. You're passing the same strategy (options) as writableStrategy and readableStrategy to the new TransformStream() constructor. This effectively doubles the queue size, since both the writable end and the readable will keep requesting chunks until each of them has filled up to its HWM.

Usually, you want to keep the readableStrategy at its default (HWM = 0). So I suggest you change the code to:

return new TransformStream({
  // ...
}, options);

(The rest of the analysis assumes that this change is in place.)

That would make sense (chunks shifted downstream without exceeding highWaterMarks of individual streams).

But that's not what I'm seeing. Instead I see rs1 pull in 7 chunks off the bat, shifting only one of them downstream. (I've added a 1 second delay in the transforms to help examine order).

If I change the first transform's highWaterMark from 2 to 10, rs1 pulls in 15 chunks off the bat, almost 200% over its highWaterMark.

10 of them seem in limbo, since rs1's desiredSize is 0. But things stay in this state for a whole second. Where are they?

They're in the writable end of the first transform stream.

The first processor has HWM = 2, so it wants to pull two chunks from the producer. After it receives those two chunks, the producer will keep pulling until it has filled its own queue up to its own HWM = 5.

The second processor has HWM = 1, so it wants to pull one chunk from the first processor. To do that, the first processor must take a chunk from its writable end's queue and transform it. That's why you see this log at the start:

0: Processing A to AA (desiredSize=0)...

After one second, the chunk is transformed. The second processor pulls it from the first processor's readable end and puts it into its own writable end's queue. The second processor has now filled up to its HWM, but now the first processor has one less chunk in its queue. So it pulls a chunk from the producer's queue, and the producer has to pull from its underlying source again:

1008: ...done processing A to AA
1008: Pulling H (desiredSize=1, enqueued=7)

So after the initial setup, this is the state of the pipe chain:

If I change the first transform's highWaterMark from 2 to 10, rs1 pulls in 15 chunks off the bat, almost 200% over its highWaterMark.

There seems to be some confusion about the meaning of the "high water mark". A stream's HWM tells it how many chunks to keep in its queue if no-one else is reading from it. If the stream is being piped into another stream, then that other stream will be taking chunks out of the stream's queue and either process them immediately or put them in its own queue. This means our original stream's queue becomes smaller, and needs to produce extra chunks to fill up to its own HWM again.

So indeed, to fill both queues, you need to produce 15 (= 5 + 10) chunks.

MattiasBuelens commented 3 years ago

make the final consumer ... manually close() the chunk when it (eventually) receives it.

w3c/mediacapture-transform#56 is about that not working with tee().

I see... For Fetch, we use "tee a readable stream" which calls ReadableStreamTee with cloneForBranch2 = true. Thus, all chunks are structurally cloned before enqueuing them to the second branch.

Unfortunately, this mechanism is not exposed to web authors. ReadableStream.prototype.tee() always uses cloneForBranch2 = false. A naive solution would be to pipe branch2 through a TransformStream that manually clone()s each frame:

function teeWithClone(readable) {
  let [branch1, branch2] = readable.tee();
  branch2 = branch2.pipeThrough(new TransformStream({
    transform(frame, controller) {
      controller.enqueue(frame.clone());
    }
  }));
  return [branch1, branch2];
}

But this has a nasty problem. If the consumer of branch1 modifies or closes the frame before branch2 had a chance to clone it, then branch2 will have modified or even unusable frames. So to do this "correctly", you'd have to clone first:

function teeWithClone(readable) {
  readable = readable.pipeThrough(new TransformStream({
    transform(frame, controller) {
      controller.enqueue([frame, frame.clone()]);
    }
  }));
  let [branch1, branch2] = readable.tee();
  branch1 = branch1.pipeThrough(new TransformStream({
    transform([frame, clonedFrame], controller) {
      controller.enqueue(frame);
    }
  }));
  branch2 = branch2.pipeThrough(new TransformStream({
    transform([frame, clonedFrame], controller) {
      controller.enqueue(clonedFrame);
    }
  }));
  return [branch1, branch2];
}

...which looks very, very weird. 😛

domenic commented 3 years ago

This last post might be worth spinning off into a separate thread to discuss improving the tee() API with some cloning sugar. E.g. tee({ structuredClone: true }) or tee({ cloneCallback: x => y }).

jan-ivar commented 3 years ago

I'm closing this since new issues have been filed, and thanks to @MattiasBuelens's explanation of pull behavior. It's deterministic, but highly dependent on downstream, which sadly makes it hard to leverage to find out when chunks are dequeued for any kind of lifetime management.