MatrixAI / Polykey

Polykey Core Library
https://polykey.com
GNU General Public License v3.0
29 stars 4 forks source link

Transport Agnostic RPC #249

Closed CMCDragonkai closed 1 year ago

CMCDragonkai commented 2 years ago

We have reviewed the gRPC API, and worked with it extensively. It's time to work out a better RPC layer for PK. There are several problems to to consider here:

We have 2 main proto files:

  1. proto/schemas/Client.proto
  2. proto/schemas/Agent.proto

And a Test.proto as well, this will need to be used to generate the marshaling code.

Additional context

Tasks

  1. [ ] - Review the proto3 guide, naming standard with reference to the proto files, look for ways to abstract or clean up the data
    • 279

    • [x] - Version the Client.proto and Agent.proto with version names and test out multiple-version services, and find out what the client does when the version wanted is not available
    • [ ] - Test out grpc reflection for versioning if necessary
    • [ ] - Update all the primitive types, and any usage of 64 bit int should have the string hint
    • [ ] - Add some benchmarks to the grpc, and check if MAX_CONCURRENT_CONNECTIONS is used
    • [ ] - Clean up types #200 or gRPC abstractions based on new grpc libraries
    • [ ] - Consider how to shutdown the grpc server and terminate all client connections as well
  2. [ ] - Figure out CQRS, pagination and streaming
    • [ ] - Clean up js-pagination client and server side utilities
    • [ ] - Pagination on unary calls, and streaming calls that can be initialised with a pagination parameters (which would enable some sort of CQRS and event sourcing concepts)
  3. [ ] - Review in reference to authentication and sessions
  4. [ ] - Check the HTTP API and web gateway compatibility
  5. [ ] - Deal with the fact that the new grpc-js 1.4.1 version has some incompatibility: TypeError: http2Server.on is not a function.
CMCDragonkai commented 2 years ago

With https://github.com/grpc/grpc-node/releases/tag/%40grpc%2Fgrpc-js%401.4.5 1.4.5 version released. Apparently the deadlines problem in https://github.com/MatrixAI/js-polykey/issues/249#issuecomment-991999605 would be resolved. Not sure about the asynchronous interceptors.

Note the question here: https://github.com/grpc/grpc-node/issues/1984#issuecomment-996417452

CMCDragonkai commented 2 years ago

Since splitting up the handlers for client and agent service https://github.com/MatrixAI/js-polykey/pull/292#issuecomment-997542398. I noticed that vault handlers were using their own custom code for representing "not found" errors https://github.com/MatrixAI/js-polykey/pull/292#issuecomment-997343182. This is not consistent if how we are using domain exceptions in other areas. However when working with @emmacasolin and the development of this guideline: https://github.com/MatrixAI/js-polykey/wiki/api-design and older article https://github.com/MatrixAI/js-polykey/wiki/errors, we realise that we do need a "general" set of service exceptions to be used by agent or client service.

In general, client service can provider proper exception information to the client. However agent service should reclassify ALL exceptions under src/agent/errors.ts exceptions in order to hide exception information from untrusted agents that are talking to it. Think of it like agent service is a public API, while client service is a private API.

However even under these conditions, there are usecases for having client service specific exceptions as well. So this means a refactoring of how errors are propagated on agent service, and how errors are propagated on client service especially on the vault/secrets handlers are will be required. This could be done in #266.

CMCDragonkai commented 2 years ago

This article https://www.kabisa.nl/tech/sending-data-to-the-other-side-of-the-world-json-protocol-buffers-rest-grpc/ points out that if json-rpc is used with compression, it achieves similar performance and size to using protocol buffers.

Size:

image

Speed:

image

GRPC's query & stream multiplexing is ultimately reliant on HTTP2. However json-rpc can also use HTTP2 and take advantage of the same query & stream multiplexing.

Furthermore by using json-rpc, we can swap out implementations of http2/http3 depending on what platform we are in, and even nodejs's http2 implementation allows us to pass a direct socket to be used, which means we can then run http2 on any kind of reliable stream such as our UTP connection without needing to go over a proxy. (If we were to do this, we should extract out our proxy implementation and keep it as a separate library since it is still generically useful as a peer-to-peer tunnel).

Alot of grpc features were designed for internal microservice architectures. But the json-rpc model just seems more useful for open, decentralised, peer to peer architectures simply through its flexibility and compatibility.

CMCDragonkai commented 2 years ago

I've added all the issues involving grpc woes under this one single epic. Retargetted this epic to focus on working towards replacing gRPC with something that is transport agnostic to give us greater protocol flexibility. This means we should freeze any effort to work on those sub-issues, as they would become irrelevant moving ahead to non-gRPC stack. While the creation of this new RPC layer needs to consider all of these requirements that we have derived from our experiments with gRPC.

CMCDragonkai commented 2 years ago

Investigated graphql, it's still not sufficient for what we need. Our RPC mechanism needs to support arbitrary streaming, and graphql just doesn't have any client side streaming or bidirectional streaming capabilities.

I was thinking that at the end of all of it. We could just have a plain old stream connection that we send and receive JSON messages over (or protobuf messages), and just make use of any stream based connection.

At the bottom webrtc data channel, wireguard or web transport can then be used to provide that reliable p2p stream, and whatever it does, must be capable of datagram punch packets. It should also provide some sort of muxing and demuxing, so that it's possible to create independent streams over the same connection. (As we do right now with grpc streams).

Another thought I had is that alot of these connections appear to be client-server. Then for any bidirectionality it has to be done on top again. If we get rid of all the complexity, and start from a foundation of a bidirectional connection, then it should be possible to both sides to create arbitrary streams on top that enable bidirectional RPC.

That would simplify the network architecture as it means any connection from Node 1 to Node 2 would enable Node 1 to call Node 2 and Node 2 to call Node 1.

CMCDragonkai commented 2 years ago

Our own RPC can still work, all we need is a defined routing system, and then an encoding of messages like JSON RPC.

Stream manipulation needs to be multiplexed on top of a connection.

I'd define 3 layers of APIs:

So the final layer RPC is built on top of the streams. It will be 1 to 1 from RPC to stream. A unary call is a function that creates a stream, sends 1 message, and gets back 1 message.

A streaming call is a function that creates a stream, sends N messages, and receives N messages.

In a way this is what GRPC was supposed to do, but it's specifying too much about the connection API, and it should "connection" agnostic. At the same time a connection is always client to server, the server has to open their own connection back to the client to send calls, but this should be unnecessary...

Maybe a better grpc https://github.com/deeplay-io/nice-grpc - then we swap out the underlying socket for a reliable socket provided by webtransport or webrtc.

On the topic of nodejs, originally electron process could call nodejs APIs directly without needing to send data to the nodejs process. This is now considered insecure. But I believe that this can be used securely, one just needs to be aware about cross site scripting and not use the "web browser", or the secure bridge. This means grpc can be used in nodejs without a problem. At the same time... it makes sense to eventually reduce the reliance on node-specific APIs too.

CMCDragonkai commented 2 years ago

It's possible that the underlying transport layer can already do both connection and streaming. It then has to expose the ability to create new streams for usage.

On the receiving side, there's both a connection handler, and a stream handler. New streams results in new response streams. Within the stream itself, it is expected that a RPC message is sent that indicates what kind of handler it is intending to do. That would be the "header message" essentially.

CMCDragonkai commented 2 years ago

So we are going back to investigating JSON RPC. And I think I found the right libraries for this:

There's also another implementation of this concept here: https://github.com/bigstepinc/jsonrpc-bidirectional.

CMCDragonkai commented 2 years ago

Another example is scuttlebutt's RPC system. It seems quite simple, and something we can take inspiration from: https://ssbc.github.io/scuttlebutt-protocol-guide/#rpc-protocol. According to https://github.com/ssbc/muxrpc it appears to be transport agnostic. Scuttlebutt appears to have server streaming, duplex streaming and unary calls. No client streaming, but that's just the same as duplex streaming.

I'm thinking that a P2P RPC system should have the ability for clients to provide their own handlers to the server. Which is to say upon receiving a new connection on the server, the server should be able to make calls back to the client as well (and not just respond). This can allow quite complex protocols to form.

However https://viewer.scuttlebot.io/%25xcRxdizHSOvF7uV9kVd%2FSKjrt3Fij37eiQFhMuId29M%3D.sha256 it does seem they are facing similar problems and are hoping for a better multiplexer like QUIC.

CMCDragonkai commented 1 year ago

Worth exploring this: https://github.com/deepkit/deepkit-framework/issues/113.

See these comments: https://github.com/deepkit/deepkit-framework/issues/113#issuecomment-1276933581

I've been using GRPC quite a bit with JS, but it's a very clunky library (it still uses callbacks!), and there are many things in it that make it quite inflexible. We've been using it quite advanced ways (peer to peer, proxied over UDP network, custom TLS verification logic, conversion of streams to async iterators, in-band error management).

I'm evaluating deepkit rpc. And I have some questions.

  1. What kind of streaming does this support? We're using client streaming, duplex streaming and server streaming. In particular duplex streaming is useful for creating sub-protocols on top of a general RPC system. We sometimes have an "back and forth interactions" between peers that requires statefulness for a short amount of time. That is "encapsulated" within a single "transaction".
  2. Is the general model transport agnostic? We need the RPC system to work on top of a reliable UDP stream? And if it is transport agnostic, how does it handle concurrency specifically muxing and demuxing, how does it identify streams in flight?
  3. JSON/binary serialisation/deserialisation. In some cases we have structured objects that are like JSON. In other cases we have actual binary content. If we used JSON soley, we would have to use a base64 encoding which blows up sizes by 33% (unless we use a custom base encoding to reduce this). Does this support a flexible serialisation/deserialisation for binary data?
  4. Error handling - how does errors propagate from client to server, or server to client? GRPC only supports server to client errors, but not client to server errors. Are the errors managed through "leading" or "trailing" headers?
  5. AsyncIterables vs Observables vs Promises - how do you foresee backpressure being managed and push vs pull APIs here when dealing with streams?
  6. What do you think JSON-RPC?

@CMCDragonkai

  1. RxJS streaming via Observables/Subjects which can be from server->client (server controller) or vice-versa (client controller). Since connections are stateful you can control the stream with an arbitrary RPC action.
  2. yes, there is websocket and tcp implemented. as long as you make it connection-aware it should work fine. https://github.com/deepkit/deepkit-framework/blob/master/packages/rpc-tcp/src/server.ts#L91
  3. it uses a binary protocol and uses BSON for message serialization so works perfectly fine with binary data (typed arrays or ArrayBuffer work out of the box)
  4. when using a server-controller on the client side, thrown errors on the server will be redirected to the client. this can be disabled https://docs.deepkit.io/english/rpc.html#_transform_error
  5. back-pressure is checked on per-connection basis so no matter how many streams you have open. there is no measure in place to prevent back-pressure, so you either have to trust the client or implement it on your own on the transport layer (e.g. kick the client when bufferedAmount is too big).
  6. don't like it since I prefer fast binary protocols

@marcj regarding peer to peer usage. I noticed you have created concepts like "client" and "server.

If I am writing a client, and I connect to the server, can the server simultaneously call back methods on the client without having to establish a separate connection back to the client?

In our current P2P architecture, all nodes are both simultaneously clients and servers. In most RPC systems, this means each side has to set up a client connection to each other. However it should be possible that within a single connection, that connection could expose the client's API to the server at the same time as exposing the server's API to the client.

How difficult would it be to adapt your protocol to achieve such a thing?

Client to server, server to client, peer to peer, everything happens through the same connection. So yes this is already possible.

CMCDragonkai commented 1 year ago

I reckon that even if QUIC is used for P2P between agent to agent. For client to agent, and to be browser compatible, the RPC layer only has 1 requirement. The ability to create bidirectional streams that are reliable. Thus between client to agent, it would sufficient to use websockets if on browers, or TCP (with TLS). This means the underlying RPC service can be contactable from agent to agent QUIC connections, or client-server websocket/TCP protocols that browsers can do (because the browser isn't exporting the P2P side).

If we would want browsers to be capable of calling the agent in a P2P way, it would have to use webrtc, or at some point QUIC must expose the capabilities (via web transport or something else).

CMCDragonkai commented 1 year ago

Ok I've been trying deepkit rpc, and none of the examples work. I think the RPC code is pretty alpha level software atm. And the docs is just misleading. But I have found in a way what it is trying to do.

On the server side, there are decorators that enable the creation of "controllers".

Controllers are namespaced methods, each method inside also gets their own decorator. The decorators act as a way of "globally" registering the methods into particular namespace.

The decorators by themselves don't do anything. The methods are still callable.

I think at the same time, there's a way to mark the input and output types and figure out how to serialise and deserialise these types using BSON. As well as serialising and deserialising exceptions. We already do this with gRPC but it is all bolted on top of the base gRPC code.

Therefore pseudocode wise (because none of this works yet), is that we see something like:

@rpc.controller('Controller')
class Controller {
  @rpc.action()
  hello() {
    return 'hello world';
  }
}

const kernel = new RpcKernel();
kernel.registerController(Controller, 'Controller');

const server = new RpcWebSocketServer(kernel, 'ws://localhost:8081');

server.start();

Think of the kernel as the IoC container. When registering these "controllers"(think services) to the kernel. The kernel is actually what instantiates the controllers... just like an IoC container.

The name/reference to these services is the registered name 'Controller'.

If we think of the kernel as an IoC, that would imply that the kernel is capable of automatic dependency injection. Take note that the registerController takes the class and a name of the class 'Con'.

I wonder if that is even useful in the context of RPC?

The primary reason for an IoC is to avoid doing manual object wiring. (Note that this ioc container concept I'm referring to is sometimes called the https://en.wikipedia.org/wiki/Service_locator_pattern).

Without an IoC container, if you create a bunch of classes using DI, it means the construction of classes doesn't result in the classes instantiating their dependencies (especially their non-encapsulated dependencies), instead there needs to be a central "bootstrap" point where it constructs all the low level objects and compose them to the high level objects. Currently this occurs in our src/PolykeyAgent.ts, so we are not using an "IoC" container to do "automatic wiring". We are in fact doing the wiring up manually.

However this is not true in the case of our GRPC services. A single GRPC handler is written like this:

function agentStatus({
  keyRing
}: {
  keyRing: KeyRing
}) {
  return async (call: ... , callback: ...) => {

  };
}

In this case the keyRing a dependency of the handler. Thus when testing this handler, it is sufficient to just inject into the agentStatus function.

Then in our src/client/service/index.ts we are aggregating all of these handler functions together and constructing a container which is just a POJO containing all of those dependencies.

function createService({
  keyRing
}: {
  keyRing: KeyRing
}) {
  container = {
    keyRing
  };
  const service = {
    agentStatus: agentStatus(container),
  };
  return service;
}

You can see here that I'm relying on using record parameters for "resolving" the dependency. If I had instead just asked for the keyRing as a parameter, this would be more difficult. Since it would require some meta-programming to introspect the function's parameter names, and then to "service locate" the appropriate dependency to inject. Whereas using record parameters this way is lightweight and idiomatic with no meta-programming required.

Compare this to some PHP code like here: https://laravel.com/docs/4.2/ioc which uses reflection utilities to find out what exactly needs to be injected. This is actually more advanced since it is capable of "injecting" dependencies with respect to the type of the parameter, not the parameter name. This also enables it to inject dependencies with respect to abstract types like interfaces.

In our example, we can still do this, the type of the parameter can still be an interface, but the injector still needs to know what exactly to inject. And the type system of TS will catch any errors in this situation.

Ok so I've digressed on IoC enough. The main point here is that an RPC system does not require some sort of injector system, in this case an IoC container that can locate the correct handler to execute while also injecting all the appropriate dependencies for doing the execution.

It also makes sense that the injection of the dependencies should occur before the handler is to be routed from the RPC server. It would extremely slow if every request to a handler involved an injection execution.

This is why our GRPC handlers are closures that close over the dependencies. In a similar way, the Controller in deepkit RPC would need to be instantiated with the dependencies before any of its methods could be called, and once instantiated, it can stay instantiated.

If we can do the IoC and DI before dealing registering RPC handlers, then we can separate these concerns. The end result would a RPC server that simply registers handlers directly, something like this:

const server = new RPCServer({
  'agentStatus': agentStatus,
});
server.register('agentStatus', agentStatus);

The expectation being that agentStatus is just any JS function, with the inputs expected and an output expected:

function agentStatus(i1, i2): o1 {
  // do stuff
}

And any dependencies would have already been fulfilled by some external system.

And because there would need to be some level of reflection, we would expect to use a decorator to help do this, or a HOF since decorators don't work on simple functions.

const agentStatus = rpcHandler(agentStatus);

The rpcHandler would be a HOF that takes a function and returns a function. The returned function would be the actual handler that is registered with the RPC server.

This wrapper, whether it is a decorator or not is what is necessary to maintain some context over the function.

The context is the type of the inputs and outputs. This is necessary for serialisation and deserialisation. This is also necessary for the RPC server to know what to do with the inputs and outputs.

This may also end up including other things like deadlines... and more.

Alternatively, we could just expect that the wrapper to be part of the RPC server construct, it would just take each function and do the necessary reflection.

I have a feeling that decorators however enable some meta-programming that is easier to do than purely reflection.

I have some more topics to cover here:

  1. Client controllers - it is possible for the client side to also register a controller for the server to execute, but how does the server get to acknowledge this? Does the handler have a way of calling back to the client? Or is the client-handler something that can keep track of client connections
  2. Error forwarding (and filtering)
  3. Serialisation/deserialisation - BSON or otherwise
  4. Streaming - how do we deal with handlers that return AsyncGenerator or Generator? Can a handler also take an AsyncGenerator as well?

Regarding client controllers, it seems to be available on the on connection handler:

kernel.onConnection((conn: RpcKernelConnection) => {
  conn.controller('some controller') // <- this is the client's supplied controller
});

Note that the controller() returns a RemoteController type which is a higher order type that is meant to transform a "controller type" which is meant to be a Class with methods.

So I guess it may be useful to make use of class constructs as that does enable some level of grouping. But it does add additional overhead compared to just using raw functions. If we dispensed with controllers... all we would get is the entire type of the client's handlers.

conn.handlers.someHandler();
CMCDragonkai commented 1 year ago

It's time to have a look at https://github.com/ssbc/muxrpc and compare this.

CMCDragonkai commented 1 year ago

As for streaming... deepkit RPC supports the handlers returning Subject and its variants from rxjs. This essentially enables the caller to subscribe to a handler just be doing something like:

const ctrl = client.controller(MyControllerInterface);

const sensorData = await ctrl.sensorData();
const sub = sensorData.subscribe((next) => {
    console.log('sensor says', next);
});

//when done watching for data, but keeps Subject
//on server alive, for other subscribers.
sub.unsubscribe();

const chat = await ctrl.getChatChannel('general');
chat.subscribe((next) => {
    console.log('sensor says', next);
});

//completes the whole Subject, triggering its TearDown
chat.unsubscribe();

It would probably ensure that upon the connection/stream disconnecting the subject is also unsubscribed.

This is essentially a push-based stream. The client is acquiring a stream where the server pushes the data.

Using iterators/async iterators is a pull-based stream. Where the client triggers the next item repeatedly and can stop at any time.

Therefore if the RPC system supports both, then it'd support both pull streams and push streams. https://github.com/pull-stream/pull-stream/blob/master/docs/glossary.md

CMCDragonkai commented 1 year ago

I wonder how backpressure works in the case of rxjs streams. Normally it would depend on the underlying network protocol. But if the RPC client is not subscribed to the rxjs stream, it is possible the rxjs stream is reading it into an internal buffer. Or maybe not... it sort of depends on how the data from the socket/stream is being read to local rxjs stream. More info here: https://xgrommx.github.io/rx-book/content/getting_started_with_rxjs/creating_and_querying_observable_sequences/backpressure.html

But pull streams backpressure is driven by the client.

CMCDragonkai commented 1 year ago

I've tried out MuxRPC and it's revealed some interesting ideas, but it also has some problems.

Similar to deepkit, there's this notion that each side can have a manifest. So upon creating a "MRPC" instance, you pass a "remote" manifest" and a "local manifest"

const rpc = MRPC(remoteManifest, localManifest);

The idea being that servers would only have a localManifest, while clients would only have a remoteManifest, while peers would have both manifests.

These manifests correspond to the controllers in deepkit RPC.

However again, it isn't clear in the docs nor the examples how the handlers on each side's manifest are supposed to make use of the other's manifest.

There's no mention of any "remote side" in any of the handlers. I checked the arguments to the handlers, and there's still nothing. It appears you'd have to somehow acquire this information when a connection is established, but it seems very underbaked here.

It seems that it would be easy if each handler just received a ctx parameter like we do with our context decorated functions, and inside this ctx, every handler could be supplied with the remote information. We could combine this with our timedCancellable decorator that means every handler would have access to deadlines, abortion signals, client information and potentially the client's remote handlers too.

One good thing is how the underlying socket is definitely not part of the MuxRPC. It's ultimately handled by yourself. All MuxRPC really needs is a "duplex reliable stream".

It makes use of an abstraction called "pull-stream" as the duplex reliable stream that it consumes. Any NodeJS stream can be converted to a "pull stream" using their utility packages such as stream-to-pull-stream.

More info to be written.

CMCDragonkai commented 1 year ago

I wrote a comparison of some multiplexed transport protocols here: https://github.com/MatrixAI/Polykey/issues/234#issuecomment-1318113855.

The conclusion is that based on the transport protocols that we are going to use, we do not require muxing/demuxing to be built into the RPC system. The RPC system should instead just rely on the underlying transport to provide the muxing/demuxing system.

Here are some examples:

  1. For the client service, when using websockets, just create a new websocket connection for every single RPC call. If this is HTTP1, then this is a new TCP connection each time. If this is HTTP2, then this is a new HTTP2 stream within 1 TCP connection.
  2. For the client service, when using HTTP, just do a new HTTP request/response for each RPC call. See: https://github.com/davedoesdev/browser-http2-duplex for hacking together "streaming" support with HTTP.
  3. For the agent service, when using QUIC, just create a new QUIC stream for every single RPC call. QUIC will multiple every single QUIC stream within 1 QUIC "connection".

This is great news, since this means our RPC system is significantly simpler, and no custom muxing/demuxing is required and third party integration is simpler as they don't need a custom muxer/demuxer either.

CMCDragonkai commented 1 year ago

Now regarding the streaming calls in MuxRPC, there are some interesting ideas here.

Unlike deepkit RPC, the RPC system in MuxRPC requires runtime specification about how the handlers are supposed to be called. Deepkit seems to rely on TS types and decorators and maybe some meta-programming.

The manifest basically specifies 4 types of handlers:

  1. async - unary
  2. source - server stream
  3. sink - client stream
  4. duplex - duplex stream

For the client calling a handler, even if the server states that it is synchronous, the handler will automatically be converted asynchronous because it goes over the network.

For async, this is just a standard unary function call. There's a bit of confusion here because RPC makes network calls seem like normal function calls, and this is actually considered a disadvantage of RPC, because network calls are not like normal function calls, and that's why message passing always provides an explicit API. However RPC is more convenient. So a middle ground might be to provide a function call but in a specific "pattern"/"structure" that ensures that users understand that this is an RPC call, and not a normal function. This means for example, parameters and return values must be serialisable/deserialisable, they cannot just be pointers to rich objects in-memory. We do not have distributed memory here. And that parameters and return values are always "copied" thus there is no reference counting of any objects inside the handler scopes.

So in mux rpc, the handler is just a regular function (with any number of parameters). I recommend that we change this so that the function always takes a single "input" and a single context object, and returns a single output. This enables explicit parsing of the input, and also explicit plumbing of exceptions. I believe this is better then letting the RPC system decide how to parse the input, and how to plumb exceptions. Functional composition is better than implicit magic. However if a local exception occurs in the handler, this could prevent returning anything. In that case, the RPC system should catch this exception and provide a particular default error response to the caller (a sort of 500 error in HTTP).

The source handlers returns a "source" stream. The sink handler returns a "sink" stream. The duplex handlers returns a compound source and sink stream object.

Notice that in all 3 cases, the stream is the returned object. There is no processing of the stream via input parameters. In fact in all 3 cases, the handler can still be called with arbitrary serialisable/deserialisable parameters. The stream object that it returns is a custom "pull-stream" concept. What exactly is a pull stream?

A pull stream is an abstraction that enables sinks to pull from sources. The sources are not allowed to emit any data, until the sink reads from the source. I looked into the code for pull-streams, and the both sinks and sources are functional closures that use event emitters. I think the original reason to develop this was to provide some sort of backpressure.

NodeJS streams also have native backpressure. They force backpressure by pausing the source until the sink reads again, and then pauses every single time. This is why the examples in muxrpc show how to convert NodeJS streams to pull streams.

Web streams also do the same thing. They were optimised for IO patterns and is used internally by the fetch API.

This article compares streams (specifically web streams) to RxJS observables: https://surma.dev/things/streams-for-reactive-programming/

This leads to the conclusion that RxJS observables are not the right kind of abstraction to use for streams in RPC. To the need to have backpressure, pull-oriented streams are better suited for RPC. It's always possible to build push-flow on top of pull-flow if you need it. Pull-flow is also designed around single-consumer, which is the case for RPC, while push-flow is designed around multi-consumer. I think pull-flow as the default is better for RPC design. If the application requires push-flow, they can design that around the RPC's pull-flow and transform the RPC's pull-stream into an rxjs observable subsequently.

To build push on top of pull, constantly be pulling from the pull API, and then push out the chunks to any consumers.

The opposite of building pull on top of push is much more code.

This sort of means there's no need to use pull-streams anymore. We can just and use web streams which are more portable and more modern.

But hold on... there's another more "language-level" abstraction that is also a pull-based dataflow that we already use all over the PK codebase. That's the iterators and async iterators provided by the async generator functions.

At the same time, node and web streams satisfy the iterable interface, so they can be easily worked with as if they were async iterators.

https://nodejs.org/api/stream.html#streams-compatibility-with-async-generators-and-async-iterators

// Readable Stream to async iterator
// Using https://nodejs.org/api/stream.html#streamreadablefromiterable-options
for await (const chunk of readableStream) {
  // use the chunks
}
// Async iterator to Readable Stream (source)
const abortController = new AbortController();

async function *generate () {
  let n = 0;
  while (!abortController.signal.aborted) {
    yield n;
    n++;
  }
}

const readableStream = Readable.from(generate());

readableStream.on('close', () => {
  abortController.abort();
});
// Piping async iterator to writable stream

import { pipeline }  from 'stream/promises';

const abortController = new AbortController();

async function *generate() {
  let n = 0;
  while (!abortController.signal.aborted) {
    yield n;
    n++;
  }
}

// I think this works...?
const iterator = generate();

try {
  // This appears to return a value?
  await pipeline(iterator, writableStream);
} catch (e) {
  abortController.abort();
  throw e;
}

The above examples are for node streams. But web streams also support the same concepts.

// Readable Stream to async iterator

// If iterator exits, then readable stream is closed
for (const chunk of readableStream) {
  // do something
}

// If iterator exits, then readable stream stays open
for (const chunk of readableStream.values({ preventCancel: true })) {
  // do something
}
// Async iterator to Readable Stream (source)
function iteratorToStream(iterator) {
  return new ReadableStream({
    async pull(controller) {
      const { value, done } = await iterator.next();
      if (done) {
        controller.close();
      } else {
        controller.enqueue(value);
      }
    },
  });
}
// Piping async iterator to writable web stream

import { WritableStream } from 'web-streams-polyfill/ponyfill';

const abortController = new AbortController();

async function *generate() {
  let n = 0;
  while (!abortController.signal.aborted) {
    yield n;
    n++;
  }
}

const iterator = generate();

const writableStream = new WritableStream({
  close() {
    abortController.abort();
  },
});

const writer = writableStream.getWriter();
for await (const chunk of iterator) {
  // Wait for it to be ready
  await writer.ready;
  // Write a chunk
  await writer.write(chunk);
}
// Wait for it to be ready (flushed)
await writer.ready;
// Close the writable stream
await writer.close();

Note that NodeJS streams supports both objects and buffers. While web streams are naturally Uint8Array only. For network communication, assuming that we want to make parsing/serialisation/deserialisation explicit, it makes sense to focus specifically on buffers or Uint8Array rather than objects. This also ensures that the data is in fact serialisable, since rich objects cannot be just passed around. Over the network.

Let's suppose our RPC handlers looked this:

// Stream oriented

const rpc = new RPC({
  async handler(input: ReadableStream): WritableStream {
    // Parse the input data (assuming this produces some structure)
    const inputParsed = parse(input);
    const writableStream = new WritableStream();
    const writer = writableStream.getWriter();
    // The writing must be done asynchronously
    // Not within here, but you must return this prior

    (async () => {
      for await (const chunk of iterator) {
        await writer.ready;
        await writer.write(chunk);
      }
      await writer.ready;
      await writer.close();
    })();
    return writableStream;
  }
});

You can see here it's a bit awkward to write because you have to return the WritableStream first before asynchronously writing to the stream.

Alternatively we can use first-class features of the language, and instead of taking streams and returning streams. We can instead take async iterators and return async iterators.

const rpc = new RPC({
  async *handler(input: Generator<Buffer>): Generator<Buffer> {
    // Now that you have 2 generators
    // you can independently read data from the first generator
    // while yield data to the output generator
    for await (const chunk of input) {
      yield chunk;
    }
    // Or more simply `yield* input;`
    // Which simply is a "echo" handler
  }
});

Notice here that the Generator type is a composition of Iterable and Iterator.

Why use a generator instead of Iterable or Iterator.

Well the Iterator type is flexible but not convenient. You have to use next.

The Iterable type is more convenient with the for..of syntax, but you have to call the [Symbol.iterator] to get access to the iterator, if you need to do more complex interactions.

2 independent generators are necessary to have concurrent sources and sinks.

We naturally have a form of backpressure here. We only acquire data from the input generator when we pull from it by asking for the next piece of data. At the same time, the yield will be frozen unless the the caller pulls from us.

The client call then has to do something like this:

// And we want to pass in a generator here

async function* input () {
  yield Buffer.from('hello');
  yield Buffer.from('world');
};

const g = client.handler(input);

for await (const chunk of g) {
  console.log(chunk);
}

Combining this with ixjs increases the power and flexibility of the generators. We can perform transformations easily on the generators.

This design starts with first-class language features. It also focuses on the most powerful communication paradigm first, that is duplex communication, and then it's possible to abstract over duplex to constrain it to server streaming, client streaming or unary calls.

The key point here is that server streaming just means no input generator is provided... or more appropriately, the input generator immediately returns.

Client streaming means the output generator immediately returns.

While unary calls is where both input generator and output generator immediately returns.

Behind the scenes, the RPC system has to plumb the generator data into streams offered the underlying transport layer. Ideally these would be web streams, so we would use the appropriate transformations. Serialisation/deserialisation is not actually part of the RPC system, but instead used inside each handler. This allows a sigificant amount of flexibility since each individual handler can pick how they want to serialise/deserialise their data. The generators are limited to working with buffer chunks (Uint8Array).

We can prototype this first without any RPC serialisation/deserialisation in a single process.

Further prototypes can be done to demonstrate how to convert such pull-dataflow into push data flow. I'm starting to see how network communication should be pull-based which can be converted to push-flows either in-memory through rxjs or other kinds of protocols that are non-rpc, but instead messaging based.... this might end up being a very elegant design!

CMCDragonkai commented 1 year ago

Still need to address how "metadata" gets passed like "header" information or session token information (for authentication). And how exceptions are passed around, how this interact with the ctx and the ability to callback on the client.

CMCDragonkai commented 1 year ago

Here's an idea for communicating stream exceptions to the handler's input async iterator.

async function *gf() {
  while (true) {
    await sleep(100);
    try {
      yield 'string';
    } catch (e) {
      yield; // <-- this is the trick
      throw e;
    } 
  }
}

async function main() {
  const g = gf();
  setTimeout(() => {
    void g.throw(new Error('Stream Failure'));
  }, 250);

  for (let i = 0; i < 10; i++) {
    try {
      console.log(await g.next());
    } catch (e) {
      console.log('Consumed an exception!', e.message);
      break;
    }
  }
}

It was a bit tricky to realise how this works. The g.throw function actually returns a promise. When you throw an exception into the generator, the generator rethrows it out if it is not caught inside the generator function. Now when it rethrows it out, that is actually sent to the promise rejection of the g.throw() call. If you don't catch it here, it would then become an unhandled promise rejection (and thus kill the node process).

What we want to do, is to essentially "signal" to the handler that is consuming the input iterator that there is an problem. But because the input iterator is a "pull-based" data flow, this error can only be signalled when the handler is calling next() on the iterator.

So when I first tried this, I just thought that rethrowing the exception is sufficient inside the generator. But it didn't work because the throw() gives back a promise rejection.

So the trick to fix this problem is in the generator function, catch the exception on the yield 'string';, then call yield; which will give a promise fulfilled on the throw() call, and then throw the exception afterwards, which will throw the exception to the next g.next() call.

In this way, only when the handler wants to next() on the generator will they then receive the relevant exception.

CMCDragonkai commented 1 year ago

Alternatively if the async generator function itself is what is pulling data from the stream, then it can just throw exception there, and that will affect the next() call.

The web stream API is actually promise-oriented, not event oriented. So technically it fits well with the async iterator API too.

We could also just use web streams directly too. See: https://github.com/ReactiveX/IxJS/blob/master/src/asynciterable/fromdomstream.ts

CMCDragonkai commented 1 year ago

Just a note about GRPC metadata.

All GRPC calls can have 1 leading metadata and 1 trailing metadata.

For a given handler, for example unary call, you're given a call object.

For server to send the leading metadata, this is done with call.sendMetadata(). This must be done before any other data is sent.

To send trailing metadata, this has to be done as part of the callback. Which is sent along with the response value.

In a duplex stream handler, one can again do call.sendMetadata for the leading metadata.

But when it ends the stream with call.end(trailingMetadata) it can pass the trailing metadata.

Therefore the important thing to remember here is that the leading and trailing metadata both only occurs once at the beginning and at the end.

Each side is capable of receiving and processing the metadata sent by the other side.

In the case of unary handlers, they can get the leading metadata with call.metadata. To get the call's trailing metadata, I think they need to do call.on('metadata').

On the unary caller side, they can get the leading metadata from the server with call.on('metadata'), I believe this may actually be called twice, with the second time being the trailing metadata.

You can see here I've completely forgotten how the API works, it's too complex in grpc.

For our RPC, we need to simplify this metadata situation. I think it does make sense that there can be a leading and trailing metadata, but this needs to be made more simpler. In fact, assuming we have 2 abstraction levels: web streams and async generators, we can understand that the web streams will basically emit the raw data. While the async generators can provide processed data (we can also have raw async generators).

If the data is processed, we would naturally expect that the initial message be the leading metadata, and the last message to be the trailing metadata.

But this depends on our "protocol abstraction" above the raw binary stream. For RPC purposes, we will need a least the ability to:

  1. Leading and trailing metadata
  2. Forwarding of exceptions (through the metadata system)
  3. Structured messages (where unary call simply means 1 structured message)

If an handler is only expecting 1 structured message, we could parse to completion of the structured message, but if there is more than 1 structured message, or data that we don't expect, this can then be considered a protocol error.

So unlike other RPC systems, we want to be able to expose the lower level in case we need to do something special, and then specialise to higher-order abstractions like JSON RPC or BSON RPC.

CMCDragonkai commented 1 year ago

Moving this to todo, will be the next priority for @tegefaulkes after merging the crypto feature upgrade.

CMCDragonkai commented 1 year ago

Once we build higher-level RPC protocols like JSONRPC or BSONRPC with defined error behaviour and metadata structure, that's also when we can add in rules for versioning/backwards-compatibility and pagination.

I'm considering supporting JSON RPC first as it's the most easiest to consume for third party APIs.

Then to supporting an additional more efficient interface to support binary data. Choices are:

Key point is that JSON should be supported first. Then we also need to define a consistent way of representing binary data. Right now that's always been { type: 'Buffer', data: [123, ... ] } which is obviously very inefficient. Instead we should define something that can take base64 encoded data, that is agreed upon spec from client and server side.

We can review Ethereum's API style to see how ours fit in. https://ethereum.org/da/developers/docs/apis/json-rpc/

As well as https://github.com/microsoft/vs-streamjsonrpc/blob/main/doc/index.md which discusses why JSON RPC is suitable for peer to peer communication.

CMCDragonkai commented 1 year ago

@tegefaulkes my initial experiments with generator based API is located in https://github.com/MatrixAI/Polykey/tree/feature-rpc.

Please take over that branch and get a new PR for it. The experiments can be copied over after rebasing. The only relevant code is in the test-* files at the project root.

CMCDragonkai commented 1 year ago

I think based on our discussions last time, it's worth while to spec out the details of how this will work in #495. This issue is more about just generally transport agnostic RPC. #495 is more specific to how to use JSON RPC for us. A combination of our generator ideas and decorators should be used to enable JSON RPC based handlers. We're still at a prototyping stage so nothing is set in stone.

We will need to meet again Friday to discuss.

marcj commented 1 year ago

Not sure why I got so many emails from this thread, but let me say something I noticed before I unsubscribe: In the hundreds of hours you spent looking at all these implementations, trying them out, and especially writing so much text, you could have instead written your own RPC implementation that is perfectly tailored to your needs.

CMCDragonkai commented 1 year ago

That's very strange... how did you get so many emails from this thread...?

We are working on our own RPC implementation now! The text is just examining all the trade-offs.

CMCDragonkai commented 1 year ago

All the tasks above regarding GRPC should be crossed out @tegefaulkes, and replaced with tasks related to JSONRPC.

tegefaulkes commented 1 year ago

In the future we shouldn't re-purpose issues like this. It makes it a little confusing what's going on in review. For something like this where we change track on an issue I think we should close the old ones. Outline the change of approach such as creating our own RPC to solve the problems. create new issues for the problem.

To a degree I think issues should be immutable. Normal changes like updating the spec is fine. But a complete revamp of the topic isn't good.

CMCDragonkai commented 1 year ago

While this has been closed, this work has now been broken down into smaller issues:

CMCDragonkai commented 1 year ago

@tegefaulkes CQRS is still a pending problem but only when we start tackling the GUI app again.