whatwg / streams

Streams Standard
https://streams.spec.whatwg.org/
Other
1.35k stars 161 forks source link

Real time source and skipping #383

Open mhofman opened 9 years ago

mhofman commented 9 years ago

Let's suppose we have a source that generates frames. These sources can be of 2 types:

Streams are a great way to unify the consumption of these sources. It works great when the consumer wants to process every frame produced by the source. However things don't go as well in the case where the consumer is not able to process all the frames of a real-time source.

What a consumer generally wants in that case is to be able to skip frames produced while it's currently processing one. Currently the back-pressure mechanism of readable streams doesn't help since it's informative only. Frames will keep getting queued into the readable stream for pure push sources, no matter the size of the queue. The source controller could do some skipping by not queuing a new frame if the queue of the stream is full, but this doesn't provide the desired behavior since we would be skipping new frames, keeping old ones in the queue. It's preferable to discard old frames and keep new ones only. Similarly, the consumer cannot drain the queue of the stream when it's done processing a frame, and keep only the last one generated, as there is no way know the status of the queue as a consumer or know if the read operation will pull from the queue or wait for a new chunk to be available.

Can anyone suggest how to solve this use case? Should be consider adding a method on readable streams to probe the size of the queue or behavior of the read? Should a caching strategy be added to readable streams that would automatically discard elements from the queue under some conditions?

wanderview commented 9 years ago

I think this can be implemented via a Transform stream that throws away frames that are behind real-time. So you would do something like

  source.pipeThrough(realTimeFilterTransform).pipeTo(sink);

A transform could also be made to re-encode stale frames to play through them quickly instead of dropping them. This would have to be encoding-specific, though.

mhofman commented 9 years ago

I thought about a Transform but I'm having a hard time seeing how that would work:

I don't see how to reconcile this with the logic in pipeTo and TransformStream: For real-time push source, the writable stream of the Transform passed to pipeTo should basically apply no back pressure so that it can consume everything produced and skip intermediate frames / chunks. But this behavior doesn't work for on-demand sources since, well the back pressure would be gone, negating their nice property of being on demand.

The consumer of the stream might not know if the underlying source is real-time / push or not.

PS: I'm using the example of video frames as it's relevant to me in this case, and is easy to comprehend, but really this applies to any problem where we want to skip intermediate values in the stream and pass through the "last-in" when pulling. "first-in" is easy as you can implement that by discarding newly read values in a transform if the destination stream's queue is full. Another use case of a real-time push source and skipping intermediate values but the latest, would be a "stream of cursor positions". If I wasn't able to listen to positions for a bit, I don't care where it was during that time, I only care where it is now, but I also don't want to wait to know that until the cursor moves again. Makes sense?

wanderview commented 9 years ago

It seems to me if your Transform resolves its .write() promise at a "real time" rate, then .pipeThrough() will run at the correct rate regardless of the underlying source. No?

No matter what your Transform is going to have to be doing timing to know what is "real time" or not.

mhofman commented 9 years ago

No, timing should have nothing to do with consumption from the source. Guesstimating framerate in the case of frame based sources is sloppy at best. It's also downright impossible in the case of variable framerate sources. Finally it doesn't make any sense for the use case of "cursor position stream".

wanderview commented 9 years ago

Ok, lets back up a second and just examine one case.

Another use case of a real-time push source and skipping intermediate values but the latest, would be a "stream of cursor positions". If I wasn't able to listen to positions for a bit, I don't care where it was during that time, I only care where it is now, but I also don't want to wait to know that until the cursor moves again. Makes sense?

This "real-time push source" presumably uses controller.enqueue() to add sensor readings at a pre-determined sample rate. Are you saying you want this thing to ignore backpressure (available in controller.desiredSize) and just queue things up? But then also don't want those values you just queued up?

It seems to me this source should respect back pressure and let the consumer determine the rate of sampling.

Similarly, if you have a push source for video you can throw away frames at the controller if the stream pipeline is applying back pressure.

Or can you provide some pseudo code describing what you want? It feels like there are some conflicting requirements for ignoring back pressure at some stages, but requiring precise rates at others.

mhofman commented 9 years ago

A cursor position stream would only provide new values when the position has changed, not constantly. When moving, new values are produced and queued in the stream. When not moving, no new values are produced. Now as a consumer, if I don't read while new values are produced, I want to be able to "fast-forward" to the latest value in the stream, without having to wait for a new value to be produced by the source. Since there is no way to know if a read from a stream will just pull from the queue or wait for a new value, this is not possible to accomplish.

Similarly the controller of the readable stream cannot correctly respect the back pressure since it would always have to enqueue at least one value to receive the back pressure signal, and skip new values. At that point, the value that will be read next from the stream will be that "old" value that was queued, not any newer value that might have been generated by the source since then. This is the "first-in" case that I mentioned, which is not the desired behavior.

Regardless, this requires the client to ask for a special stream with "skipping" behavior. This behavior cannot be driven by the consumer of the stream after the stream already exists.

mhofman commented 9 years ago

Flow

What I'm looking for is a way to get value3 in the second read from the stream, not value2.

I'd be ok having to call .read() once more to drain the stream, but I would need to know that the read wouldn't result in waiting for a new value to be generated by the source.

wanderview commented 9 years ago

I would try to implement a latch using TransformStream for this use case. I see that the current TransformStream does not permit this, though. It needs a way to let transform.readable function as a pull source controlled by the transform function.

wanderview commented 9 years ago

I write issue #384 describing what I think would need to change to implement a latching mechanism in TransformStream.

mhofman commented 9 years ago

The problem with using a Transform to accomplish this is that the transform's writable stream would have to apply no back pressure to consume new values, which wouldn't work if piping from a stream wrapping a pull source or a push source with back pressure support. Whatever mechanism we come up with needs to be available in ReadableStream.

wanderview commented 9 years ago

The problem with using a Transform to accomplish this is that the transform's writable stream would have to apply no back pressure to consume new values, which wouldn't work if piping from a stream wrapping a pull source or a push source with back pressure support. Whatever mechanism we come up with needs to be available in ReadableStream.

I don't see any back pressure in your diagram. You're explicitly asking for no back pressure there as far as I can see.

mhofman commented 9 years ago

Correct, in this diagram the source has no back-pressure support. The mouse position example is equivalent to a variable frame rate frame push source. In the frame source case, you can have sources that have back-pressure support. But as a consumer of the readable stream, I have no idea if the source has back pressure or not. All I want is to get the latest frame when I'm ready for it.

wanderview commented 9 years ago

Can you draw another diagram? It sounds like you are talking about a completely different use case and source now.

If the source in the previous diagram is pull, then you just get the sensor value when the consumer is ready for it. Works exactly the same using the latch transform. Last available sensor reading is provided to consumer.

mhofman commented 9 years ago

What I don't understand is how you would implement your latch transform without un-necessarily pulling from the source, i.e. keep the back pressure info through the transform.

The diagram really looks like any other read from a pull-source backed stream. What's important is that the consumer doesn't care about the type of source, it's logic should be the same.

pull source

wanderview commented 9 years ago

We need a mechanism like I describe in #384. Imagine something like this:

var latch = new TransformStream({
  latchedValue: null,
  pendingRead: null,

  transform: function(chunk, enqueue, done) {
    if (this.pendingRead) {
      var pr = this.pendingRead;
      this.pendingRead = null;
      pr(chunk);
      return;
    }

    this.latchedValue = chunk;
  },

  // this hook doesn't exist yet
  pull: function() {
    var self = this;
    return new Promise(function(resolve, reject) {
      if (self.latchedValue) {
        var lv = self.latchedValue;
        self.latchedValue = null;
        resolve(lv);
        return;
      }

      if (self.pendingRead) {
        reject('overlapping reads!');
        return;
      }

      self.pendingRead = resolve;
    });
  }
});

source.pipeThrough(latch).read(); // this triggers pull() function above

Although I see what you mean now. The .pipeThrough() will pull values from source that wouldn't have to be produced under normal back pressure mechanisms.

Maybe a wrapped ReadableStream would work. I don't have time to work through the pseudo code right now, unfortunately. It seems that should be possible, though.

mhofman commented 9 years ago

I'm curious to see what you mean by wrapped ReadableStream when you get a chance.

wanderview commented 9 years ago

In the meantime, do you have a proposal API change that you think would solve your problem? Can you show some example code that you want to be able to write?

mhofman commented 9 years ago

As a user, I see 2 possible APIs:

The advantage of the second approach is that there would be no extra buffering if the consumer is not interested in the intermediate values. The first approach requires to keep values in the queues until the consumer comes around and discards them manually.

I need to give some thought to a proper API change proposal, but that's my thinking so far.

wanderview commented 9 years ago

Would it be enough to provide an "opportunistic read()"? This call would return any enqueued chunks, but not trigger a new pull on the underlying stream.

When your downstream consumer needs its next frame, you could then do opportunistic reads until the queue is empty to get the last available value. If no values are available at all, then you do a full read() to trigger a pull.

domenic commented 9 years ago

@mhofman thank you for opening this very interesting thread, and my apologies for not engaging in it earlier (last week was TC39). I agree that in general this is a tricky issue as streams are generally designed for sequences in which every chunk is meaningful and none should be missed. But I am confident we can figure it out with some work.

on demand, they generate a new frame only when needed

Can you give an example of such a scenario? Is the idea e.g. reading a movie file from a disk?

The source controller could do some skipping by not queuing a new frame if the queue of the stream is full, but this doesn't provide the desired behavior since we would be skipping new frames, keeping old ones in the queue

A size-one queue gives very close to correct behavior. But, it would keep a single old frame before skipping to new ones.

Similarly, the consumer cannot drain the queue of the stream when it's done processing a frame

Related: #324, the idea that a consumer should signal to the readable stream when it's actually processed a chunk.

Should be consider adding a method on readable streams to probe the size of the queue or behavior of the read?

I don't really understand how this would solve the issue.

Should a caching strategy be added to readable streams that would automatically discard elements from the queue under some conditions?

This seems like a more interesting approach. If it weren't for the exclusive reader semantics, I think it could be accomplished already: the stream would simply read from itself if controller.desiredSize <= 0, in order to drop the oldest chunk. I think that is what @wanderview is alluding to with a transform stream or similar wrapper.

I thought about a Transform but I'm having a hard time seeing how that would work:

It seems like you'd need different transforms for each type of source. This comes back to my question of what use cases an on-demand stream is. It doesn't seem unreasonable to distinguish between webcams and movie files. I agree it would be nice if you didn't have to, though.

PS: I'm using the example of video frames as it's relevant to me in this case, and is easy to comprehend, but really this applies to any problem where we want to skip intermediate values in the stream and pass through the "last-in" when pulling. ... Another use case of a real-time push source and skipping intermediate values but the latest, would be a "stream of cursor positions".

This kind of talk in general makes me unclear whether streams are really a correctly matched solution to your problem. The primitive here is something more like "changing value," which has a single "latest value". Maybe it also optionally has a stream (or something) of notifications that the value has changed. The consumer can then go check the latest value, either based on the notifications (if they are present) or based on polling (which seems like a more likely architecture). See also are readable streams appropriate for my use case?

Now as a consumer, if I don't read while new values are produced, I want to be able to "fast-forward" to the latest value in the stream, without having to wait for a new value to be produced by the source. Since there is no way to know if a read from a stream will just pull from the queue or wait for a new value, this is not possible to accomplish.

Again, this seems based on flawed architectural assumptions. To render video, you have a frame-budget (24 fps, or 60 fps, or whatever). Whatever the latest frame is that's come in during that time, you want to use that. So I'd expect something like

let currentFrame;

// shorthand for the usual async loop, except we cap it
// to not pull more than 60 times per second.
pumpStream(rs, chunk => currentFrame = chunk);

// elsewhere in the code
requestAnimationFrameLoop(() => render(currentFrame));

You seem to have a very different architectural model in mind, something like

let currentFrame;

requestAnimationFrameLoop(() => {
  if (!rs.wouldPull) {
    currentFrame = rs.read();
  }

  render(currentFrame);
});

rs.rememberToDropFramesIfIDontAskForThemInTime();

But this seems to unnecessarily couple your rendering loop with your consumption loop.


I'll avoid getting in to the question of specific API proposals to address your use case until I better understand it. But there are several that sound workable on the table right now. I am curious how you respond to my pseudo-code though.

domenic commented 9 years ago

One last thing. Perhaps the mismatch comes from

real-time, they generate frames constantly no matter what, e.g. a webcam. These sources would probably use push semantics

It depends on whether the webcam stream is "recording" or "live". If it is recording, then the whole sequence is valuable, and you don't want to throw away any data, so much of this discussion is moot. If it is live, then I would expect its source to be implemented much more like an on-demand stream, like so:

new ReadableStream({
  start(c) {
    rawWebcam.onNewFrame = frame => {
      this.currentFrame = frame;
    };
  }

  pull(c) {
    c.enqueue(this.currentFrame);
  }
});

This will mean that every read() corresponds to a pull which immediately (synchronously) enqueues the current frame, allowing it to be read by the consumer instantly. Frames are skipped, but that's fine.

mhofman commented 9 years ago

@wanderview an opportunistic read would work, and is basically what I called "an option for a non pulling read". However as I mentioned, I believe a "caching strategy" option on the stream would be a better approach as the consumer could just configure the desired behavior for the stream without having to implement the logic of pulling and discarding. The main difference is that intermediate values wouldn't have to be queued and can be discarded immediately with the second approach.

@domenic here are some comments . Sorry myself for the delayed answer.

on demand, they generate a new frame only when needed

Can you give an example of such a scenario? Is the idea e.g. reading a movie file from a disk?

An example of a stream that could be "on-demand" is a MediaStream for a screen capture source. The logic in the screen capture could be to only take a capture of the screen when the consumer actually has a need for a new frame, instead of as fast as possible. The operation can be quite expensive. This is different from a MediaStream attached to a webcam source, which would generate frames no matter what. Note that both streams are "live" streams, and are not tied to a seekable source.

Should be consider adding a method on readable streams to probe the size of the queue or behavior of the read?

I don't really understand how this would solve the issue.

Being able to know if the read operation would pull from the the internal queue of the stream or pull from the source would allow to read from the stream until the latest queued value, without the risk of making a read call that would block until a new value is actually generated by the source. An "opportunistic read" where we ask that the read operation only returns a queued value would work too.

I thought about a Transform but I'm having a hard time seeing how that would work:

It seems like you'd need different transforms for each type of source. This comes back to my question of what use cases an on-demand stream is. It doesn't seem unreasonable to distinguish between webcams and movie files. I agree it would be nice if you didn't have to, though.

In the case of screen sharing and webcam streams, the consumer might not know if the source is on-demand or not, since that would be an internal browser implementation detail. We wouldn't know what type of Transform would work for the source.

This kind of talk in general makes me unclear whether streams are really a correctly matched solution to your problem. The primitive here is something more like "changing value," which has a single "latest value". Maybe it also optionally has a stream (or something) of notifications that the value has changed. The consumer can then go check the latest value, either based on the notifications (if they are present) or based on polling (which seems like a more likely architecture). See also are readable streams appropriate for my use case?

While I agree that streams are not appropriate for all use cases, the part that confuses me is that the stream makes assumptions about what the consumer intends to do with the values, i.e. take and concatenate all of them. While this is a probably the most common use case, it's not the only one. For a same source / stream, one consumer might want to process all values, while another consumer might want to process only some values based on some external logic. The producer of the stream might not know ahead of time what type of consumers it will have. What you're suggesting is for the producer to provide 2 different APIs to be used by different types of consumers. It feels like with a little bit of work, we could accommodate both types of consumers with a single stream API abstraction.

real-time, they generate frames constantly no matter what, e.g. a webcam. These sources would probably use push semantics

It depends on whether the webcam stream is "recording" or "live". If it is recording, then the whole sequence is valuable, and you don't want to throw away any data, so much of this discussion is moot. If it is live, then I would expect its source to be implemented much more like an on-demand stream, like so:

Again, the "recording" or "live" aspect you're referring to in this case is what the consumer decides to do with the stream. The source and stream implementation should be the same, i.e. it provides the video frames captured by the webcam sensor.

Regarding the pseudo code, just pulling and caching the latest value is a little too simple to illustrate my problem. The issue really arises when the consumer needs to some work with the value pulled, work that potentially takes longer than the time it takes to generate a new value. Here is some pseudo-code I would like to write:

let mediaStreamVideoTrack = ... ; // We got the MediaStreamTrack from somewhere
let stream = createStreamOfFramesFromVideoTrack(mediaStreamVideoTrack);
let reader;

function processFrame(frame) {
  return new Promise((resolve, reject) => {
    // Do some heavy work
  });
}

function processNext () {
  return reader.read().then(processFrame).then(processNext);
}

reader = stream.getReader({highWaterMark: 1, cachingMethod: "keepLast"});
processNext();

Without a "caching method", you would have to manually pull like this:

let latestFrame;

function processNext () {
  if (!latestFrame || reader.queueSize > 0) {
    reader.read().then((frame) => {
      latestFrame = frame;
      processNext();
    });
  } else {
    let frame = latestFrame;
    latestFrame = null;
    processFrame(latestFrame).then(processNext);
  }
}

reader = stream.getReader();
processNext();

The drawback of that "draining" approach, on top of being more complicated for the consumer, is that the stream caches intermediate values. In the case of video frames, this can be memory consuming. If the processFrame operation is truly non-blocking, the consumer might be able to schedule regular reads from the stream to drain it, but that would make the consumer logic even more complex.

If you want some backgroud on how this issue came up to be, have a look at the thread called "Re: Add "MediaStream with worker" for video processing into the new working items of WebRTC WG" on the W3C media-capture list.

domenic commented 9 years ago

An example of a stream that could be "on-demand" is a MediaStream for a screen capture source. The logic in the screen capture could be to only take a capture of the screen when the consumer actually has a need for a new frame, instead of as fast as possible.

:-/ this seems like part of the confusion where MediaStream is used as an opaque handle to anything vaguely media-related, despite actually very different underlying behavior. I don't think it's going to be possible to generically adapt all MediaStreams into a ReadableStream using a single algorithm. For example, my MediaStreamRecorder is focused around an algorithm for recording of real-time MediaStreams, and is probably less appropriate for screen capture or video-file backed MediaStreams.

Honestly, what you describe barely sounds like a stream at all. It sounds like a function call: takeScreenshot(). Repurposing ReadableStream's read() method as your takeScreenshot() seems a bit dodgy.

Being able to know if the read operation would pull from the the internal queue of the stream or pull from the source would allow to read from the stream until the latest queued value, without the risk of making a read call that would block until a new value is actually generated by the source. An "opportunistic read" where we ask that the read operation only returns a queued value would work too.

These both seem like they're exposing way too much of the internal implementation details to the consumer. They shouldn't care about whether they're reading from the queue or from the underlying source. The entire point of the queue is to encapsulate the fact that some underlying sources can't keep data around forever, like a stream can. It's not meant to be exposed.

I'm much more interested in solutions that involve either additional hooks to the underlying source, or parameters to the creation of the stream, to allow the producer to decide what it puts into the queue. So again, we'd likely end up with different types of ReadableStream for the different use cases: if the consumer wants no-data-loss recording, they use createMediaStreamRecorder(). If they want sporadic snapshots, they use createMediaStreamSnapshotter(). Both can return ReadableStreams, or they can return appropriate subclasses of ReadableStream.

While I agree that streams are not appropriate for all use cases, the part that confuses me is that the stream makes assumptions about what the consumer intends to do with the values, i.e. take and concatenate all of them. While this is a probably the most common use case, it's not the only one.

Well, it's kind of definitional. Saying that there are other use cases for streams is actually saying "there are other use cases for this thing that I think of as a 'stream'". It doesn't really say anything about streams as designed here. At some point we do have to draw a line and say what streams are.

That said, I doubt we're really that far off that your use case falls outside streams entirely... I just wanted to illustrate that the changing-single-value actually is a different type of "thing" than a stream:

For a same source / stream, one consumer might want to process all values, while another consumer might want to process only some values based on some external logic

This is fine and works fine with streams. The second consumer just has to be ready to read from the stream and throw away the values it doesn't care about. Not a problem.

That's a different story than the changing-value paradigm, where logically there isn't even a stream of values, there's a single value and there's updates to that value. Such scenarios do deserve separate APIs, IMO.

Again, the "recording" or "live" aspect you're referring to in this case is what the consumer decides to do with the stream.

Again, I disagree. I think different consumers would create different types of streams for these use cases. new MediaStreamRecorder(mediaStream) (from the above draft) vs. new MediaStreamSnapshotter(mediaStream) (from some other yet-to-be-written draft).

Without a "caching method", you would have to manually pull like this:

Not necessarily. The producer could take care of this for you, using very similar code. Yeah, it's a bit of a bummer you don't end up using the internal queue. But at least this way the internal queue properly reflects the actual stream of data that the stream consumer sees, which is how it was designed to be used.

If you want some backgroud on how this issue came up to be, have a look at the thread called "Re: Add "MediaStream with worker" for video processing into the new working items of WebRTC WG" on the W3C media-capture list.

Oh, very cool, I didn't realize this was connected to that proposal! I'm really happy you are looking in to this, and sorry if I come across as uncooperative. I promise, I really do want to make this work, and am just dialoguing to try to work out the apparent impedance mismatch. I appreciate your advocacy for a streams-based solution over there! When I saw the draft, full of workers and such, I was unsure how to integrate streams, but you've given me hope :).