w3c / webrtc-pc

WebRTC 1.0 API
https://w3c.github.io/webrtc-pc/
Other
437 stars 115 forks source link

WHATWG streams for data channel messages #1732

Closed lgrahl closed 5 years ago

lgrahl commented 6 years ago

There is a long standing issue with large data channel messages in the existing API of RTCDataChannel. If I want to send a single message containing 1 GiB of data (for example a large file), I have to have this gigantic message in memory at the time of sending. If I receive a 1 GiB message, this message is slowly being reassembled until it's in memory and then handed out to the application. That creates backpressure and the like.

My idea is to resolve this by extending the RTCDataChannel API in the following way:

Sending

Add a .createWritableStream method which returns a WritableStream instance. With the associated WritableStreamWriter instance, one can add chunks by calling .write on the writer. Once the writer is closed, the message is considered complete.

Receiving

If .binaryType is set to stream, the event raised for onmessage contains a ReadableStream instance that is being created when the first chunk is being received. Once the whole message has been read, the reader will return EOF on a .read call (as specified by the streams API).

Edit: What should happen when a string is being received will need to be discussed.


In the meeting, I think there was a slight confusion about what streaming API I meant. Basically, I propose two streaming APIs that use WHATWG streams:

martinthomson commented 6 years ago

This is a good idea.

You could define a data channel as a stream of streams, but that might be a little more difficult than the simple addition suggested here. I would try to find a way to model this more on fetch API model if possible, but I'd need to look carefully at the API to decide if that were feasible or just more disruptive.

You can use Blobs to avoid the memory commitment for big things, but that's not a great solution.

martinthomson commented 6 years ago

See the fetch Body mixin for a model that might be an improvement, at least for receiving.

lgrahl commented 6 years ago

cc @annevk @ricea @domenic (as you might be interested on commenting this)

domenic commented 6 years ago

This would be super-great; I know developers who do both Node.js and Web RTC development, and their frustration in not having a streaming model for RTC data channels is part of what originally motivated the streams work.

The hardest part of designing this is the interaction with existing methods of reading/writing. That is, the most developer-friendly interface is (IMO) dataChannel.writable + dataChannel.readable, i.e. properties that always exist. See https://streams.spec.whatwg.org/#example-both. The issue then becomes figuring out what happens to your existing API when someone acquires a reader or writer from the stream. If possible, just making those APIs not function (e.g. never deliver events, or throw errors/reject promises) is the simplest way to do that.

Let me know how I can help; this would be very exciting to get working. I don't know much about RTC data channels at the moment myself, but perhaps I should read up.

lgrahl commented 6 years ago

That is, the most developer-friendly interface is (IMO) dataChannel.writable + dataChannel.readable, i.e. properties that always exist.

This sounds like you'd also like to wrap data channels, so handing out the messages (not the content) could also be done using the streams API? So, basically you'd have a stream that hands out messages where each message itself is a byte stream? I think this is what @martinthomson mentioned as well and it's a cool idea but I think we should start with using the streams API for the messages' content for now. (Wrapping data channels in such a way would make a neat JS lib, though.)

The issue then becomes figuring out what happens to your existing API when someone acquires a reader or writer from the stream.

Good point. This is what I think would be feasible:

@domenic Maybe you can also comment on @martinthomson's idea regarding the use of fetch Body mixin as I'm not really familiar with it. If answering that would require understanding data channels a little more, send me a mail.

lgrahl commented 6 years ago

Also, to clarify if it's not clear:

The example you've mentioned (https://streams.spec.whatwg.org/#example-both) sends a WebSocket message for each chunk and when the stream is at EOF the WS connection is being closed. So, if I had one stream associated to a file, I can send that file and then the stream is closed.

Whereas I'm proposing an API where a RTCDataChannel message remains just a message but each message is a stream. When the stream for a specific message is at EOF, the message is complete. This makes it possible to send more than one message. So, if I had one stream associated to a file, I can send that file as one message.

martinthomson commented 6 years ago

@lgrahl, a stream of streams only really makes sense if we can resolve the issue of whether messages on data channels are strictly ordered.

martinthomson commented 6 years ago

Actually, belay that, lack of sleep. I assume that we would only be able to produce a stream of streams if the data channel were ordered. Though perhaps it's OK if there is no requirement for messages to be complete before the next message is delivered. There might be some caveats regarding back pressure in that case though.

If A is sent before B, but the first octets of B are delivered first, we would manifest a new message on the top-level stream. But A might be completed prior to completing B. For instance, the stream might deliver B2, B1, A1, A2, A3, B3. B1 and B2 would have to be reordered by the browser for delivery (messages themselves are always ordered internally), but it has no obligation to hold B until A arrives. In an ordered stream, the browser would have to hold all of B until A arrives.

(It seems like there's a memory exhaustion attack somewhere in there, but I'm sure our SCTP friends will point out the natural defense that applies.)

lgrahl commented 6 years ago

I'm getting confused by the term stream of streams. Maybe we can call this a sequence of data channel messages where each message is a stream of bytes?

SCTP (with the ndata extension) can interleave messages on different streams but not on the same stream. That means messages on a data channel are strictly ordered (even though messages can be abandoned when using partially reliable delivery - this doesn't affect order).

lgrahl commented 6 years ago

I'm adding some examples here, so it's more clear how my idea would look like from a user's perspective:

Today's API

Peer A's code:

const dc = pc.createDataChannel(...);
dc.send(largeFile);

largeFile may be a Blob in which case the data doesn't have to be in memory at the time of sending (IIRC, please correct me if I'm wrong) or an ArrayBuffer in which case the data will need to be in memory.

Peer B's code:

const dc = pc.createDataChannel(...);
dc.binaryType = 'blob'; // ... or 'arraybuffer', it doesn't really matter here
dc.onmessage = (event) => {
    // Note: Ignoring the string case here.
    // ... write 'event.data' to disk
};

When the event has been raised, the large file's data has been transmitted entirely which can take a very long time. Also, the data will be in memory at that point.

Streamed (assuming the file is not a stream)

Peer A's code:

const dc = pc.createDataChannel(...);
const writableStream = dc.createWritableStream();
const writer = writableStream.getWriter();

while (true) {
    await writer.ready;
    // Note: Once a WritableStreamBYOBWriter exists, we could reuse the same buffer
    // over and over again.
    const length = Math.min(
        myPreferredChunkSize, remainingLength, writer.desiredSize)
    const buffer = new Uint8Array(length);
    // ... copy the file's chunk here into 'buffer'
    //    (or create a view if you already have the chunk in memory)
    writer.write(buffer);
    // ... continue until all chunks have been written and then break
}
await writer.close();

(Edit: I fixed the example above to not await the call to .write(...) but rather await the .ready attribute on the writable stream writer.)

If something goes wrong, you'd call writer.abort() which would abort sending the message. (We would have to discuss what .abort triggers but it should probably close the channel.)

Peer B's code:

const dc = pc.createDataChannel(...);
dc.binaryType = 'stream';
// We only need one small buffer per channel
const buffer = new Uint8Array(myPreferredChunkSize);
let view = new DataView(buffer.buffer, 0);

// Note: The function is declared async!
dc.onmessage = async (event) => {
    // Note: Ignoring the string case here.
    const readableStream = event.data;
    // You could also use the default reader but then the reader would not
    // copy directly into our buffer.
    const reader = readableStream.getReader({ mode: 'byob' });
    while (true) {
        const chunk = await reader.read(view);
        if (chunk.done) {
            break;
        } else {
            view = chunk.value; // We reclaim the buffer here
            // ... write the view's data to disk
        }
    }
};

The reader can call reader.cancel() to abort receiving data. (We would have to discuss what .cancel triggers but it should probably close the channel.)

This of course has the advantage that the file's data doesn't need to be in memory at once for both peers and that backpressure is being taken care of. Furthermore, writing to disk can be started with the first chunk received.

Streamed (assuming the file is also a stream)

So, the last example looked a lot more complicated than what we have now. But if the file could be read from and written to in form of a stream, it becomes pretty easy.

Peer A's code:

const file = ...;
const dc = pc.createDataChannel(...);
const writableStream = dc.createWritableStream();
await file.readableStream.pipeTo(writableStream);

Peer B's code:

const file = ...;
const dc = pc.createDataChannel(...);
dc.binaryType = 'stream';

// Note: The function is declared async!
dc.onmessage = async (event) => {
    // Note: Ignoring the string case here.
    const readableStream = event.data;
    await readableStream.pipeTo(file.writableStream);
};

Combination

Of course, peer A and B can just as well use the existing API on one side. It's entirely up to them.


There are several other ways to use WHATWG stream reader/writer instances. I've only shown two of them.

Comments? :)

ricea commented 6 years ago

@lgrahl This looks good. I think in general it's better to have reusable wrappers for things like file APIs so that the simpler and easier pipeTo() style can be used.

aboba commented 6 years ago

@lgrahl The Streams API provides an example of creating readable streams by wrapping a WebSocket. Yet it appears that no text was required in the WebSockets API to enable this.

So is this something we actually need to change the WebRTC 1.0 document to enable?

lgrahl commented 6 years ago

The mentioned example for WebSocket has several requirements not applicable to data channels and some problems (which the authors of the streams API couldn't address because the WebSocket API is just how it is):

(And yeah, I think the API proposed here would also make sense for WebSocket if feasible.)

Edit: I forgot to answer your question.

So is this something we actually need to change the WebRTC 1.0 document to enable?

Yes, the first two points aren't fixable without changes to the spec. (One could argue about the third point.)

domenic commented 6 years ago

I never understood the second point, but multiple people have brought it up, so I am probably the one in the wrong. But from what I can see, the wrapper creates a 1:1 mapping from WebSocket message to ReadableStream chunk, so there is no loss of expressiveness.

lgrahl commented 6 years ago

I will try to explain why the second point is important (even for WebSocket) because I believe you're not alone with that perception. Consider the following scenario: I want to receive two (or more) files.

Let's take this example as a starting point...

const file = <some stream>;
const readableStream = makeReadableWebSocketStream('wss://example.com:443/', 'protocol');
readableStream.pipeTo(file.writableStream);

Let's assume the sender will send each file as a separate stream. So, this is going to store the first file but after that the WS connection is closed. What can we do? Well, we could persuade the sender to send both files inside the same stream...

const file = <some stream>;
const readableStream = makeReadableWebSocketStream('wss://example.com:443/', 'protocol');
readableStream.pipeTo(file.writableStream);

Meh, now we've stored both files as one file. Unfortunate. What can we do? Well, we could add a header and include the file's size... or we add a header (or footer) to each chunk that tells us whether the file is EOF or not. Usually, we will prefer the latter because the first idea requires us to know the file's size. Something we may not always know (at the latest when we want to pipe another stream that doesn't follow our framing protocol). So, let's go for the latter...

const readableStream = makeReadableWebSocketStream('wss://example.com:443/', 'protocol');
const reader = readableStream.getReader();
let file = <some stream>;
let writer = file.writableStream.getWriter();

while (true) {
    await writer.ready;
    const chunk = await reader.read()
    if (chunk.done) {
        break;
    }
    const header = new DataView(chunk.value.buffer);
    const eof = header.getUint8(0) !== 0;
    writer.write(new Uint8Array(chunk.value.buffer, 1));
    if (eof) {
        await writer.close();
        file = <some stream>;
        writer = file.writableStream.getWriter();
    }
}
await writer.close();

(Edit: I fixed the example above to not await the call to .write(...) but rather await the .ready attribute on the writable stream writer.)

To recap what we've done right now: We have added a proprietary framing protocol on the application layer which essentially encapsulated multiple streams (one for each file and some metadata which is why we couldn't use .pipeTo anymore). This added complexity and required us to inspect the chunks because it is yet another protocol. It also required the sender to copy the header (or footer) into the buffer. It will have quite an impact on throughput (ping me if you want to know more, I've written some tests a while ago) and it was also unnecessary because the underlying transport already provided such a framing protocol we could have used for this exact purpose. It just wasn't doable with the existing API.

(I have completely ignored that this of course wouldn't work for data channels that aren't reliable and ordered. For this, you will need to add a more complicated framing protocol such as this.)

With the approach suggested in this issue carried over to the WebSocket API, it would be as simple as this...

const ws = new WebSocket('wss://example.com:443/', 'protocol');
ws.binaryType = 'stream';
ws.onmessage = async (event) => {
    // Note: Ignoring the string case here once again.
    const file = <some stream>;
    const readableStream = event.data;
    await readableStream.pipeTo(file.writableStream);
};

Was that helpful? :smiley:

aboba commented 6 years ago

@lgrahl What you are proposing is that a stream be encapsulated in an RTCDataChannel message. That would work for implementations that support large messages, but unless we were to require that all browsers support a large or infinite maxMessageSize, you'd still need to support mapping from an RTCDataChannel message to ReadableStream chunk.

lgrahl commented 6 years ago

@aboba I can't see why this wouldn't work with a message size limitation. Once the stream reaches a total size that would go above that limit, the stream could be aborted (which will probably mean the data channel will be closed). Alternatively, an exception could be raised for the writer's .write call, so the user is theoretically able to choose what to do (close early or abort the stream, really depends on whether this would break .pipeTo - maybe @domenic can comment on that).

So, I don't see maximum message size being a blocker here. But IIRC, the maximum message size limitation exists because people were concerned about stream monopolisation by large data channel messages. AFAIK, Randell's suggestion of using PPID-based fragmentation/reassembly has not been accepted by other parties and so we ended up with this limitation in the SDP and this bubbled up into this spec. With the SCTP ndata extension, stream monpolisation is not an issue anymore. With this API change, backpressure would not be a concern anymore. So, as far as I'm concerned, maximumMessageSize can go (well, it should stay in the API for legacy reasons but just say infinite) as it was planned, see this IETF discussion. If there is another reason why we have this, let me know.

alvestrand commented 6 years ago

It seems that blobs contain the magic needed for connection of the API, so we don't have to change anything in the WebRTC API - we're already able to send blobs. Now we need a demo.

annevk commented 6 years ago

Well, that is not entirely true. A blob can only be allocated once you have received the entire message (since you might encounter an error at some point). With a stream you could start processing bytes right away and error the stream the moment you encounter an error in the message. So streams would be a lower-level primitive on which you could do blobs.

lgrahl commented 6 years ago

@alvestrand What @annevk said.

Blob is too high level and will create backpressure on the receiver side because the data is gathered in RAM until it's complete. One can argue about the sender side but it very likely will also create backpressure there.

Streams on the other hand are able to handle backpressure, so the underlying transport can be paused and resumed. They are high-level when possible (using .pipeTo) and low-level when required (using reader/writer). Data can be handed out with the first bytes received. For example it would be easy to upload a large file efficiently using the fetch API to make a HTTP request coming from a data channel's message directly without having to store the data in RAM at once.

alvestrand commented 6 years ago

ack. So it seems that we need a send() call on DataChannel that takes a WritableStream argument, and a form of the message event (which sends a MessageEvent, where the data type is Any) that allows us (probably with a new binaryType value) that generates a ReadableStream object when a message comes in.

These are new API surfaces. We can't add them by shim, although we can emulate them in a shim to some degree for small messages (by buffering the whole message before we hand it off to its recipient). I think I understand what we need to do now.

lgrahl commented 6 years ago

A little nit: The WritableStream instance needs to be created by RTCDataChannel, so using .send(myWritableStream) wouldn't work. But take a look at the initial posting - that API proposal should work.

Okay, cool. So, I interpret this as positive feedback and will happily create a PR/draft for it.

ricea commented 6 years ago

send() will actually need to take a ReadableStream argument, the same way Request does.

send() could return a WritableStream, but I assume that would break the API.

If your code wants a WritableStream you can use a TransformStream as an adaptor as this example demonstrates.

stefhak commented 6 years ago

Regarding https://github.com/w3c/webrtc-pc/issues/1732#issuecomment-361654117: would we be able to support ReadableStreams without any API change (as this example does for WebSocket)?

annevk commented 6 years ago

No, the idea is to support streams for individual messages. As such they're strictly lower-level than anything the API currently supports as per https://github.com/w3c/webrtc-pc/issues/1732#issuecomment-360736519 and also OP.

lgrahl commented 6 years ago

@ricea Okay, if that works - even better because it's even more elegant. I'll join the WHATWG IRC so we can work out the details as you're the experts when it comes to streams.

@stefhak Sadly not. I explained why in https://github.com/w3c/webrtc-pc/issues/1732#issuecomment-358418156. (Edit: @annevk was faster 😄 )

stefhak commented 6 years ago

@lgrahl sorry I'm having a bad day. Exactly what in https://github.com/w3c/webrtc-pc/issues/1732#issuecomment-361654117 can WebSocket do, but not the DataChannel (you can configure a data channel to be reliable and ordered AFAICU)? There must be something as this example shows a WebSocket sourced ReadableStream, but that is not possible using a WebTC DataChannel.

lgrahl commented 6 years ago

@stefhak I'm sorry but that comment is confusing the hell out of me. 😅 Can you rephrase it?

annevk commented 6 years ago

@stefhak you're focusing on representing a sequences of messages as a stream, whereas this proposal is about representing a single message as a stream.

stefhak commented 6 years ago

@lgrahl sure :) This example shows a WebSocket sourcing a ReadableStream. I thought you should be able to do the same with a DataChannel, but understand from https://github.com/w3c/webrtc-pc/issues/1732#issuecomment-361890393 that it's not possible. Why?

lgrahl commented 6 years ago

@stefhak Thanks! The main point is what @annevk just said. There are some other issues I've described in my answer towards Bernard (who brought up the same example), see https://github.com/w3c/webrtc-pc/issues/1732#issuecomment-358418156 (the links look all the same but point to different comments - maybe that's why you haven't seen it from my previous answer).

aboba commented 6 years ago

@annevk @lgrahl Is the goal to represent a single message as a stream or to allow a (large) stream to be carried in a single message? The latter would require browsers to support a much larger maxMessageSize than they do today.

lgrahl commented 6 years ago

@aboba The latter - represent a single message as a stream.

Edit: Actually, I don't see the difference between

represent a single message as a stream

and

allow a (large) stream to be carried in a single message

Can you elaborate what would be the distinction for you?

I've commented on maxMessageSize earlier: https://github.com/w3c/webrtc-pc/issues/1732#issuecomment-360493556 (2nd paragraph).

alvestrand commented 6 years ago

Reminding everyone of the -rtcweb-transports draft:

For data transport over the WebRTC data channel [I-D.ietf-rtcweb-data-channel], WebRTC endpoints MUST support SCTP over DTLS over ICE. This encapsulation is specified in [I-D.ietf-tsvwg-sctp-dtls-encaps]. Negotiation of this transport in SDP is defined in [I-D.ietf-mmusic-sctp-sdp]. The SCTP extension for NDATA, [I-D.ietf-tsvwg-sctp-ndata], MUST be supported.

lgrahl commented 6 years ago

FYI I'm planning to create a PR for this but it will take some time. :)

jan-ivar commented 6 years ago

I've also been meaning to put together an extension spec for this, but have been occupied with other matters thus far. Hope to get to it at some point

murillo128 commented 5 years ago

How this API would be applicable to unreliable DC?

My wild guess is that either when the SCTP transports reach the maxPacketLifeTime or maxRetransmits limits, an exception on the sender will be raised in the writable stream ready/send, and also on the receiver read stream when the abandon SACK is received? Would it be possible to abort\abandon an ongoing message?

Also, I assume that write/recv chunks boundaries are not preservesd(which is fine), right?

lgrahl commented 5 years ago

Your wild guess is correct. See the class definition for WritableStreamDefaultWriter.

Also, I assume that write/recv chunks boundaries are not preservesd(which is fine), right?

Yep.

murillo128 commented 5 years ago

Async iteration has landed to the stream specs: https://streams.spec.whatwg.org/#rs-asynciterator

So it could be possible to write the following code:

const dc = pc.createDataChannel(...);
dc.binaryType = 'stream';
// We only need one small buffer per channel
const buffer = new Uint8Array(myPreferredChunkSize);
let view = new DataView(buffer.buffer, 0);

// Note: The function is declared async!
dc.onmessage = async (event) => {
    for await (const chunk of event.data) {
       view = chunk.value; // We reclaim the buffer here
            // ... write the view's data to disk
    }
};`
aboba commented 5 years ago

Here is a recent proposal along these lines for WebSockets: https://groups.google.com/a/chromium.org/forum/#!topic/blink-dev/X7rWpAkMCyg

aboba commented 5 years ago

Moved to the WebRTC-NV Use Case repo: https://github.com/w3c/webrtc-nv-use-cases/issues/44