Open addievo opened 1 year ago
Ok after going through some of the API changes and decisions that were made back in the first prototype https://github.com/MatrixAI/Polykey/pull/498#issuecomment-1426294890, it turns out that the reason why server and client can only transmit JSON errors from server to client and not client to server is because we never built the "high-level API" on the client side due to usability issues.
So to clear the air, we have 3 levels of abstraction with respect to streams and RPC calls.
QUICStream
and WebSocketStream
- these streams send and receive raw bytes, and cancel
plus abort
directly translate to stream-layer error codes. @amydevs recently refactored js-ws so that internal stream protocol errors results in a connection error in order to align with js-quic behaviour. https://github.com/MatrixAI/js-ws/pull/17/commits/956def21fa4039c116086ecfc34833b8dcd0da5fcancel
and abort
still directly translate to stream-layer error codes.Here are the TS interfaces for AsyncIterator
and AsyncGenerator
.
interface AsyncIterator<T, TReturn = any, TNext = undefined> {
// NOTE: 'next' is defined using a tuple to ensure we report the correct assignability errors in all places.
next(...args: [] | [TNext]): Promise<IteratorResult<T, TReturn>>;
return?(value?: TReturn | PromiseLike<TReturn>): Promise<IteratorResult<T, TReturn>>;
throw?(e?: any): Promise<IteratorResult<T, TReturn>>;
}
interface AsyncIterable<T> {
[Symbol.asyncIterator](): AsyncIterator<T>;
}
interface AsyncIterableIterator<T> extends AsyncIterator<T> {
[Symbol.asyncIterator](): AsyncIterableIterator<T>;
}
interface AsyncGenerator<T = unknown, TReturn = any, TNext = unknown> extends AsyncIterator<T, TReturn, TNext> {
// NOTE: 'next' is defined using a tuple to ensure we report the correct assignability errors in all places.
next(...args: [] | [TNext]): Promise<IteratorResult<T, TReturn>>;
return(value: TReturn | PromiseLike<TReturn>): Promise<IteratorResult<T, TReturn>>;
throw(e: any): Promise<IteratorResult<T, TReturn>>;
[Symbol.asyncIterator](): AsyncGenerator<T, TReturn, TNext>;
}
To reach abstraction layer 3, we basically interpret streams as generator-like things.
The next()
can be used to both read and write things.
The return()
can be used to "close" things, but also write something in before hand.
The throw()
can be used to throw a serialised error into the JSON RPC stream.
So let's imagine a consistent API between client side and server side.
Let's take the example of the duplex stream.
// client side
const { readable, writable } = client.duplexStream();
// server side
const handler = async function*(input) {
yield 'hello world';
};
On the client side we get access to just the readable
and writable
sides of the RPC stream. Both streams are of JSON RPC objects. Thus:
const reader = readable.getReader();
const writer = writable.getWriter();
const helloWorld = await reader.read();
await writer.write({ hello: 'world' });
And both expose the relative cancellation methods:
await reader.cancel(reason);
await writer.abort(reason);
Both of which is the level 2 abstraction above that ends up becoming a stream error code.
So in order to enable serialisation of errors onto the RPC stream, you want to get to level 3 abstraction.
This means creating an interface that looks like the AsyncIterator
or AsyncGenerator
like above.
The first decision is whether one should wrap the entire duplex stream as an AsyncIterator
or `AsyncGenerator. I think experience from implementing the handling shows us that these should independently wrapped. The handler of course sometimes provides an "input async iterator" that can be iterated over.
The next is to realise the different functionality.
For a "readable async iterator" one doesn't have to do much, ReadableStream
already exposes for await...of
interface.
For the "writable async iterator" interface, it has to actually provide a few functions.
Let's imagine this thing:
const writableIterator = wrap(writable);
await writableIterator.next(value); // equivalent to writer.write(value)
await writableIterator.return(value); // equivalent to writer.write(value); writer.close();
await writableIterator.throw(error); // this is the thing we want, it automatically uses `fromError` to serialise the error into a JSON RPC error message, and then proceeds to do writer.write(errorMsg); writer.close();
Assuming we can create 2 interface types that satisfy the above... we should end up the level 3 abstraction that we want. Then errors can also be sent from client to server for client streaming and duplex streaming.
I imagine these interfaces could be called something like this:
First by referencing this:
/**
* This interface extends the `ReadableWritablePair` with a method to cancel
* the connection. It also includes some optional generic metadata. This is
* mainly used as the return type for the `StreamFactory`. But the interface
* can be propagated across the RPC system.
*/
interface RPCStream<R, W, M extends POJO = POJO>
extends ReadableWritablePair<R, W> {
cancel: (reason?: any) => void;
meta?: M;
}
Then creating:
interface RPCStreamIterator - derives from the `RPCStream` to present `readable: ReadableStreamIterator` and `writable: WritableStreamIterator`
interface ReadableStreamIterator - this is the "AsyncIterator" version of `ReadableStream` (while also exposing `for await...of`)
interface WritableStreamIterator - this is the "AsyncGenerator" version of `WritableStream` - this will not have `for await...of`
These interfaces could then be used on both client and server.
In fact if you look at these existing types:
type DuplexHandlerImplementation<
I extends JSONValue = JSONValue,
O extends JSONValue = JSONValue,
> = HandlerImplementation<AsyncIterable<I>, AsyncIterable<O>>;
type ServerHandlerImplementation<
I extends JSONValue = JSONValue,
O extends JSONValue = JSONValue,
> = HandlerImplementation<I, AsyncIterable<O>>;
type ClientHandlerImplementation<
I extends JSONValue = JSONValue,
O extends JSONValue = JSONValue,
> = HandlerImplementation<AsyncIterable<I>, Promise<O>>;
You can see that it already uses AsyncIterable
for the readable stream on the server side. Which simply allows on to get the AsyncIterator
from it, that is exposing the for await...of
usage.
You could replace AsyncIterable
above with ReadableStreamIterator
.
So what happens with the js-rpc RPC protocol errors?
Well in that case, we won't bother with sending a RPC error message. We could simply elevate a stream-level error code.
As we talked about last time...
Summary
The client-side implementation of duplex streams currently doesn't support the transmission of error messages to the server. This functionality is crucial for both the transport and application layers to handle errors effectively and maintain a symmetrical communication model with the server. The issue outlines the rationale behind enabling client-side error transmission and presents an architectural overview.
Background and Current State
Transport Layer: Stream Error Codes
At the transport layer, we manage what we term as "transport-level stream errors." These errors are captured and transformed using
codeToReason
andreasonToCode
functions. Both the server and client use these functions to translate a stream error code into areason
ofany
type. Thisreason
is then accepted byreadable.cancel(reason)
andwritable.abort(reason)
.Application Layer: JSON RPC Errors
On the application layer, we use JSON RPC errors that are encoded as per the JSON RPC specification. A typical JSON RPC error message might look like:
Error Handling:
toError
andfromError
Both on the client and server sides,
toError
andfromError
functions are used to serialize and deserialize errors, respectively. For example, on the server side:In this case,
fromError
would convertSpecialError
into a JSON RPC compatible error message. The client, upon receiving this, would usetoError
to convert it back into an exception object.Issue: Asymmetry in Error Handling
While the server has the capability to transmit errors back to the client, the client is currently unable to reciprocate, even when using duplex streams. This creates an asymmetry in error handling that can cause issues in various scenarios, especially when the client needs to notify the server of specific conditions or errors that should halt or modify server behavior.
Proposed Solution
Duplex Streams: Error Transmission
Duplex streams should be enhanced to allow the client to send JSON RPC error messages back to the server. Stream handlers should be prepared to handle errors thrown into the async iterator/generator on both ends.
Implementation Steps:
toError
andfromError
into the client's duplex stream setup.codeToReason
andreasonToCode
functionalities for handling transport-level errors.