MatrixAI / js-ws

Websocket Transport Library for TypeScript/JavaScript Applications
https://matrixai.github.io/js-ws/
Apache License 2.0
2 stars 0 forks source link

Support stream multiplexing for websockets #2

Closed tegefaulkes closed 1 year ago

tegefaulkes commented 1 year ago

Specification

To avoid the overhead of creating and negotiation separate websocket connections per RPC call. We need to add the ability to multiplex streams over a single websocket stream.

Since webstreams are a message based protocal, we can wrap each message in a header to add data for multiplexing. Worst case we can searalized and deseralize JSON in a stream, the same way we do for the JSONRPC messages.

This will be abstracted on top of webstreams. We will have a multiplexing transform that will implement the multiplexing and a demux transform that does the reverse.

Message structure

A streamed websocket message is constructed in three parts:

+----------------------+----------------+------------------------+
|   QUIC-style Varint  |  Message Type  |       Payload          |
+----------------------+----------------+------------------------+

The QUIC-style Varint is used to represent the monotonic ID of the up to the limits of a 62 bit unsigned integer (stored as big-endian).

The message type is a byte that represents the type of the message received:

enum MessageType {
  DATA = 0x00,
  ACK = 0x01,
  ERROR = 0x02,
  CLOSE = 0x03,
}
Variable-Length Integer Encoding

This is to be done similar to QUIC. The Variable-Length Integer Encoding, or VarInt is an unsigned integer represented by anywhere from 1 to 8 bytes. It should use the first two bits of the initial byte to determine how many bytes the VarInt uses. Hence the usable bits of an encoded VarInt is 2 bits smaller than that of a u8, u16, u32, u64 number.

VarInt Prefix Size (bytes) Usable Bits
0b00 1 6
0b01 2 14
0b10 4 30
0b11 8 62

As the maximum unsigned integer that can be stored is 62 bits, I am using a bigint to represent the decoded varint.

DATA Payload

In the case where the message is of type DATA, the payload is arbitrary data written through the sender's WritableStream.

ACK Payload

In the case where the message is of type ACK, the payload represents the remaining empty buffer space of the receiver. This is the desiredSize. This is represented as a 32bit big-endian integer (4 bytes).

ERROR and CLOSE Payload

In the case where the message is of type ERROR or CLOSE. The initial byte will represent whether the ReadableStream or the WritableStream of the receiver should be closed. This is represented by this enum:

enum StreamShutdown {
  Read = 0,
  Write = 1,
}

The remaining bytes in an ERROR frame should be a Var-Int with QUIC-Style Variable-Length Integer Encoding to represent an error code. This error code should be converted by the user of the library into a useful error by passing in a function into the constructor of WebSocketStream.

image

The handling of the payloads should be so that the closing/erroring of the sender's WritableStream should close the receiver's ReadableStream and vice versa. This will be facilitated by the properties on the connection readableEnded and writableEnded. For example, closing of a sender's writable stream should set the sender's writableEnded to true, and then send an ERROR/CLOSE message with StreamShutdown.Read to the receiver. The receiver will then handle the message by erroring on the ReadableStream's controller, and setting readableEnded to true.

All application level errors produced by the user of the API will be converted by injected codeToReason and reasonToCode functions.

Lifecycle

Error Codes

These are the following non-application errors:

const enum StreamErrorCode {
  Unknown = 0,
  // an error parsing the structure of the message
  ErrorReadableStreamParse = 1,
  // an error occurring from the buffer of a readablestream going over the maximum size
  ErrorReadableStreamBufferOverflow = 2,
}

All non-application errors will not be handled by the injected codeToReason and reasonToCode functions.

Started States

Upon starting a WebSocketStream, either:

image

A peer-created WebSocketStream can only be created upon receiving an Ack message. This avoids race conditions where data is sent immediately after an Ack message, causing the receiver to have a half-open WebSocketStream. This also avoids a sender creating a stream and sending Data to it without knowing what the buffer size of the receiver is.

Closing States

Upon closing a WritableStream, the WebSocketStream will pass by the following states:

This opposite is true for closing a ReadableStream.

It is important to ensure that any operations that involve a blocking promise check if a Stream is closed before and after the promise resolution. This will make sure that the operations that the promises were blocking will not continue if the Stream is closed.

Backpressure

Backpressure will be achieved by implementing a message type that represents the acknowledgement of a message received.

On initiation of the stream, the receiver will send an ACK with their buffer size to tell how much data the sender can send before blocking.

Hence, a sender will send their DATA and the receiver will send ACK when they have processed the data that the sender has sent.

The ACK message will be resent accordingly as data is processed by the receiver.

image

After the first initial ACK sent by the receiver, the sender sets a byte counter to the value of the size of the receiver's buffer. Each subsequent ACK should include the readBytes rather than the buffer size, so that the sender can add to the counter, representing how many free bytes are available on the receiver's buffer. The sender should block when the counter reaches 0.

If the sender were to send a message bigger than the receiver's buffer, the message should be sent in chunks. The sending of subsequent chunks after the first should be blocked on awaiting an ACK from the recipient.

!image

Additional context

Tasks

  1. [x] Spec this out.
  2. [x] Implement parsing messages and message types
  3. [x] Implement backpressure
  4. [x] Implement error handling
CMCDragonkai commented 1 year ago

This issue needs further specification based on the Q&A notes I'm copying over here.

The original design of our RPC system was the assumption that the underlying transport will handle multiplexing and encryption. It was supposed to give us 2 major constructs to work with: "connection" and "stream".

RPC-wise, the only thing it needs to care about is "stream" concept. When you make an RPC call to somewhere else, you need to open a stream. If you receive a stream, then you are handling an RPC call. That's it. The stream has to bidirectional and full-duplex.

This is achieved right now by both js-quic and the current WS implementation in PK.

However there's a key feature right now that's actually missing in the WS implementation. In QUIC, it is possible for the server which received a connection, to open up new streams back to the client on the same connection.

With WS, it is not currently possible. Right now we are treating each WS connection as the stream, meaning a 1 to 1 relationship between "connection" and "stream".

Without this ability, there's no way to achieve a sort of RPC conversation context. We have some situations where this is useful. Where the handling of an RPC call involves calling back to the client on a different handler. Think like A.f calling B.g which calls A.h. We see this with things like node claims, and later with smart token usage.

Now with WS, it could work if both sides can just open new connections to each other. You can do this in QUIC too. However it's not as efficient as simply opening a new stream on the same connection. And furthermore, this actually cannot work if the client cannot run a server to accept new connections.

When using QUIC, agents are both clients and servers. Thus you can initiate connections from any host.

When using WS for the client service. The PKC is only a client, and the PKA is only a server. It's not possible for the PKA to open a new WS connection to the PKC. And this is further complicated by the fact that PKC may run in a browser! Which would not expose the underlying websocket server at all!

So in order to achieve a sort of RPC conversation. It would be necessary to add an additional multiplexing layer on top of the WS connection. Thus creating WS streams on top of WS connections. Doing this would make our WS transport very similar to QUIC.

Doing this also means the WS protocol is more complicated. We leverage WS's natural message framing (using binary messages), and put a header in front. This header must then supply 2 pieces of information: the stream ID and the message type.

The stream ID can be a varint similar to QUIC's varint.

The message type can be 0x00, 0x01 and 0x02 for data, error, close respectively. Or whatever is convenient.

The rest of the payload will then be plugged into the relevant WS stream for further processing.

Note that JSON RPC has an id field. We can just simply copy over the stream ID here, or use it for logging/auditing/identification. It's not really used for tracking our streams. Could be useful for tracing.

When using websocat to test this out, we can do things like:

printf '\x01\x02\x03\x04' | websocat -b ws://your-websocket-server-endpoint

For QUIC-style varints, the encoding is a bit different than Protocol Buffers varints. The size of the varint is determined by the two highest bits of the first byte:

For the numbers 0 through 10, only the 1-byte format is needed since they can all be represented in 6 bits or fewer:

0  = `00 000000` = `00`
1  = `00 000001` = `01`
2  = `00 000010` = `02`
...
10 = `00 001010` = `0A`

Using printf in a shell to display QUIC-style varints for numbers 0 through 10:

printf "\x00"  # 0
printf "\x01"  # 1
printf "\x02"  # 2
# ... and so on
printf "\x0A"  # 10

As you can see, the representation for numbers 0 to 10 is identical in output between Protocol Buffers and QUIC-style varints. The difference arises mainly when you go beyond 6-bit values, as the way the continuation bits work and the total length of the varint changes between the two methods.

CMCDragonkai commented 1 year ago

The benefits of doing this are:

  1. Not have to rely on underlying transport to do the multiplexing. See MatrixAI/js-ws#3 for discussion on spotty support from ws and browsers for WS multiplexing on HTTP2.
  2. More efficient RPC calls, as streams are now just lightweight messages. This is really just point 1.
  3. The ability to have RPC conversations.
  4. The ability to create push-oriented architecture for PKE. This means it's possible for PK to push data to the client, even if the client is not a server, because the client just has to establish a connection.

Once you have something like this. Any WS connection can emit new streams. If you don't want to handle streams from the other side. You just need to close the stream as soon as you handle them.

Streams will need 3 kinds of messages. Data message is the regular one. Close message is another. And finally an error message. We can use mechanisms similar to QUIC streams to achieve this.

Another thing is that the stream ID has to be monotonic. Once a stream is closed or errored, its stream ID is not allowed to be re-used. So they must just keep increasing the stream ID. In this sense, it is necessary consider the protocol error if a stream ID appears again. See how QUIC deals with this and copy the behaviour.

CMCDragonkai commented 1 year ago

Message framing of the different stream-related messages is easy due to websocket's guarantee that each message is atomically transferred even if the underlying connection does fragmentation.

Therefore, this is all we really need.

+----------------------+----------------+------------------------+
|   QUIC-style Varint  |  Message Type  |       Payload          |
+----------------------+----------------+------------------------+

If however message framing was not guaranteed. We would add Message Length after the type, and it would probably also be a QUIC-style varint.

Note that QUIC-style varints have potentially different sizes. Just like varints in general. However they are bounded by the maximum size. Whereas protobuf's varints are not, they can just keep going on forever.

The QUIC-style varints I think are little more efficient even though they have a max bound.

CMCDragonkai commented 1 year ago

This work should only be done during js-ws extraction from PK. Thus creating a WS transport that has its behaviour aligned with js-quic which is tracked in MatrixAI/js-ws#3.

CMCDragonkai commented 1 year ago

As an aside, I was also exploring the idea of HTTP2 as the transport. Apparently HTTP2 streams could be used too. Based on this issue, it would also require an additional multiplexing built on top to support the js-rpc if we want conversational RPC and push-architecture. That would create something like js-http2 enabling HTTP2 streams and then HTTP2 stream streams.

The browser apparently supports using fetch API to achieve this. You can pass in a stream as the body, and also retrieve the response body as a stream.

async function* generateDataChunks() {
    for (let i = 0; i < 5; i++) {
        yield new TextEncoder().encode(`Chunk ${i}\n`);
        await new Promise(resolve => setTimeout(resolve, 1000)); // Simulate delay between chunks
    }
}

const stream = new ReadableStream({
    async start(controller) {
        for await (const chunk of generateDataChunks()) {
            controller.enqueue(chunk);
        }
        controller.close();
    }
});

const response = await fetch('https://localhost:8443/echo-endpoint', {
    method: 'POST',
    body: stream,
    headers: { 'Content-Type': 'text/plain' }
});

// Process the response headers
console.log('Status Code:', response.status);
console.log('Content-Type:', response.headers.get('content-type'));

// Process the data chunks
const reader = response.body.getReader();
while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    console.log('Received from server:', new TextDecoder().decode(value));
}

This however is more complex as the HTTP2 stream doesn't have a message framing. And you'd have to do that yourself. Each HTTP2 stream itself would be multiplexed on top of a single HTTP2 connection.

The need to layer an additional multiplexed stream on top of http2 is only necessary because the browser doesn't expose the http2 streams directly. If you didn't care about the browser, you could just use HTTP2 connection and HTTP2 streams just like QUIC connection and QUIC streams. The js-quic doesn't work in the browser too.

For browser communication, I don't think we would bother doing this with HTTP2, as we already have WS to do this. But it's just an interesting idea.

Don't forget that this would be specific to HTTP2, as HTTP3 would be different again. The differences would be different depending on HTTP


Anyway this means these kinds of transports would be possible.

WebSockets (browser possible):

1 RPC Call - 1 WebSocketStream
N WebSocketStream - 1 WebSocket
1 WebSocket - 1 IncomingMessage
1 IncomingMessage - 1 Socket

QUIC (no browser support):

1 RPC Call - 1 QUICStream
N QUICStream - 1 QUICConnection
N QUICConnection - 1 QUICSocket

HTTP1.1 without KeepAlive (browser possible):

1 RPC Call - 1 Http1Stream
N Http1Stream - 1 IncomingMessage
1 IncomingMessage - 1 Socket

HTTP2 (browser possible):

1 RPC Call - 1 Http2StreamStream
N Http2StreamStream - 1 Http2Stream
N Http2Stream - 1 Http2Session
1 Http2Session - 1 Socket

HTTP2 (no browser support)

1 RPC Call - 1 Http2Stream
N Http2Stream - 1 Http2Session
1 Http2Session - 1 Socket
CMCDragonkai commented 1 year ago

Moving this to js-ws as it would be done there.

amydevs commented 1 year ago

An extra control message needs to be introduced, being ACK. This will allow for per-stream backpressure. The receiver for the WS stream will have a set buffer size, where upon being reached, will be read and processed, and have an ACK message be sent back to the sender. The sender only needs to keep a rolling counter for sent messages, which is reset after every receive of ACK. The counter is initialized by a buffer size included in the first ACK message, and is decremented after each sent message. After the counter reaches zero, the sender waits for the next ACK to keep sending messages.

CMCDragonkai commented 1 year ago

Follow this: https://chat.openai.com/share/18a9bec1-adf4-425f-baf1-5645a47c24c3 as well for the conversation regarding ACK.

CMCDragonkai commented 1 year ago

The backpressure mechanism along with the muxing and demuxing of streams are all tightly interconnected. So it has to be designed bit by bit. You'll need to start with just the message protocol first by using a binary header as per the first chatgpt link in https://chat.openai.com/share/18a9bec1-adf4-425f-baf1-5645a47c24c3.

Also @amydevs can you please spec out the tasks above, just think high level tasks first.

tegefaulkes commented 1 year ago

Good work on the spec so far, there is a good amount of information on how the back-pressure and writing works.

Small note about the ack message, As you described above I get the impression you're sending the available buffer each time you ack. Remember that this is a concurrent system. If you for example sent 500 bytes and 500 bytes quickly. When the receiving side processed the first 500 bytes and acked the 1024 available space. The sender could immediately send another 1024 bytes. At this point we'd have 1524 bytes in flight, well exceeding the available buffer. Very likely resulting in an error.

I think we should only ack the number of bytes processed and not the number of bytes available. On the sending side we keep a count of available bytes we can send. We subtract after writing and add after receiving the acks. Addition and subtraction here is commutative and atomic so we can't butcher the value. Overwriting this value concurrently will run into consistency problems.

The spec is also light on details for how the error and close messages will work. But also needs more details on how the messages are structured as well. How does an error message look? What details are included in it? Will it have an code and message? Same for the close message.

If we consider each stream as a separate state machine, how do the state transitions work? The sending and receiving streams function independently right, how does that work?

We don't need to go into full implementation detail, but we do need enough of it described so that anyone coming to this cold can understand enough of the structure to start working on it.