Closed addievo closed 1 year ago
RPCClient.ts
@ready(new rpcErrors.ErrorMissingCaller())
public async unaryCaller<I extends JSONValue, O extends JSONValue>(
method: string,
parameters: I,
ctx: Partial<ContextTimedInput> = {},
): Promise<O> {
const callerInterface = await this.duplexStreamCaller<I, O>(method, ctx);
const reader = callerInterface.readable.getReader();
const writer = callerInterface.writable.getWriter();
try {
await writer.write(parameters);
const output = await reader.read();
if (output.done) {
throw new rpcErrors.ErrorMissingCaller('Missing response', {
cause: ctx.signal?.reason,
});
}
await reader.cancel();
await writer.close();
return output.value;
} finally {
// Attempt clean up, ignore errors if already cleaned up
await reader.cancel().catch(() => {});
await writer.close().catch(() => {});
}
}
Instance 2
// Hooking up agnostic stream side
let rpcStream: RPCStream<Uint8Array, Uint8Array>;
const streamFactoryProm = this.streamFactory({ signal, timer });
try {
rpcStream = await Promise.race([streamFactoryProm, abortRaceProm.p]);
} catch (e) {
cleanUp();
void streamFactoryProm.then((stream) =>
stream.cancel(ErrorRPCStreamEnded),
);
throw e;
}
void timer.then(
() => {
rpcStream.cancel(
new rpcErrors.ErrorRPCTimedOut('RPC has timed out', {
cause: ctx.signal?.reason,
}),
);
},
() => {}, // Ignore cancellation error
);
// Deciding if we want to allow refreshing
// We want to refresh timer if none was provided
const refreshingTimer: Timer | undefined =
ctx.timer == null ? timer : undefined;
// Composing stream transforms and middleware
const metadata = {
...(rpcStream.meta ?? {}),
command: method,
};
const outputMessageTransformStream =
rpcUtils.clientOutputTransformStream<O>(metadata, refreshingTimer);
const inputMessageTransformStream = rpcUtils.clientInputTransformStream<I>(
method,
refreshingTimer,
);
const middleware = this.middlewareFactory(
{ signal, timer },
rpcStream.cancel,
metadata,
);
RPCServer.ts
@ready(new rpcErrors.ErrorRPCHandlerFailed())
public handleStream(rpcStream: RPCStream<Uint8Array, Uint8Array>) {
// This will take a buffer stream of json messages and set up service
// handling for it.
// Constructing the PromiseCancellable for tracking the active stream
const abortController = new AbortController();
// Setting up timeout timer logic
const timer = new Timer({
delay: this.handlerTimeoutTime,
handler: () => {
abortController.abort(new rpcErrors.ErrorRPCTimedOut());
if (this.onTimeoutCallback) {
this.onTimeoutCallback();
}
},
});
const prom = (async () => {
const id = await this.idGen();
const headTransformStream = rpcUtilsMiddleware.binaryToJsonMessageStream(
rpcUtils.parseJSONRPCRequest,
);
// Transparent transform used as a point to cancel the input stream from
const passthroughTransform = new TransformStream<
Uint8Array,
Uint8Array
>();
const inputStream = passthroughTransform.readable;
const inputStreamEndProm = rpcStream.readable
.pipeTo(passthroughTransform.writable)
// Ignore any errors here, we only care that it ended
.catch(() => {});
void inputStream
// Allow us to re-use the readable after reading the first message
.pipeTo(headTransformStream.writable, {
preventClose: true,
preventCancel: true,
})
// Ignore any errors here, we only care that it ended
.catch(() => {});
const cleanUp = async (reason: any) => {
await inputStream.cancel(reason);
await rpcStream.writable.abort(reason);
await inputStreamEndProm;
timer.cancel(cleanupReason);
await timer.catch(() => {});
};
// Read a single empty value to consume the first message
const reader = headTransformStream.readable.getReader();
// Allows timing out when waiting for the first message
let headerMessage:
| ReadableStreamDefaultReadResult<JSONRPCRequest>
| undefined
| void;
try {
headerMessage = await Promise.race([
reader.read(),
timer.then(
() => undefined,
() => {},
),
]);
} catch (e) {
const newErr = new rpcErrors.ErrorRPCHandlerFailed(
'Stream failed waiting for header',
{ cause: e },
);
await inputStreamEndProm;
timer.cancel(cleanupReason);
await timer.catch(() => {});
this.dispatchEvent(
new rpcEvents.RPCErrorEvent({
detail: new rpcErrors.ErrorRPCOutputStreamError(
'Stream failed waiting for header',
{
cause: newErr,
},
),
}),
);
return;
}
// Downgrade back to the raw stream
await reader.cancel();
// There are 2 conditions where we just end here
// 1. The timeout timer resolves before the first message
// 2. the stream ends before the first message
Discussion about RPCServer and RPCClient lifecycles:
https://github.com/MatrixAI/Polykey/pull/552#issuecomment-1742092717
Brian — Yesterday at 12:15 PM
Moving the RPCServer into NCM is what we discussed before. I think it's fine so long as the server manifest is generic and passed in just to simplify testing.
Seems like the RPCServer could be StartStop pattern. It could just be a normal class but the handleStream is ready decorated. While the RPCServer doesn't really manage the streams it's handling. It is managing the handlers and fully allowed to cancel any active streams directly.
I think the main reason the RPCServer has a life cycle is to clearly track the existence of active streams. If we're ending the RPCServer, for simplicity we want any running handlers to end before the RPCServer has ended. This makes it clear that it's safe to stop any domains because we can be sure no handlers are actively using them.
That said, stopping the transport layer stuff serves the same function, but since the RPC is agnostic to transport we don't want to depend on that. Then again, the RPC can only advise abort unless we want to keep a force destroy functionality around.
CMCDragonkai — Yesterday at 12:17 PM
Yes that's why I was thinking that stopping the RPCServer is more appropriate and in stopping should only result in a signal abortion, not cancellation of streams
Because RPCServer is agnostic to the stream, and managing the lifecycle of the handlers is controlled by an advisory signal abortion instead of cancelling streams
Brian — Yesterday at 12:17 PM
Fair, In any case, the RPCServer should only by fully stopped once all handlers have completed.
CMCDragonkai — Yesterday at 12:18 PM
Yes after sending the signal abortion it can wait for all handlers to have completely resolved
Brian — Yesterday at 12:24 PM
Mmmk, So as far as changes go, based on that we're just removing the force: boolean option from rpcServer.destroy(), making sure it awaits all active streams. And possibly converting it to StartStop.
This will need to be changed on js-rpc.
CMCDragonkai — Yesterday at 12:33 PM
Including unary, cs, SS, and duplex
Brian — Yesterday at 12:33 PM
I'm not sure what you mean?
CMCDragonkai — Yesterday at 12:58 PM
I'm not sure we have force at all
It's just signal abortion
For all 4 different kinds of handlers
And signal abortion is always advisory
So you just wait until handlers have finished
Brian — Yesterday at 12:59 PM
Logic wise, setting force:true for RPCServer.destroy() just triggers abortion for the handlers before waiting for them to complete. force:false just waits for them to complete.
Conclusion is:
Stop
has force
and force
true means sending a special abortion signal to all the handler types. (Not cancel streams)Right now this says this.activeStreams
. If this is the handler promises. This is a very bad name for this. It should be called this.handlerPs
. This tells us that these are the active handler promises that still resolving.
And it needs to send a special reason for force
true.
If the RPC handlers reject upon being aborted by stop
, and it uses throwIfAborted
, that will result in RPCServer.stop
throwing.
Internal signal reasons are usually symbols. Not exception objects.
Ignore everything except than the OP. The OP has been rewritten to focus on what we need.
@amydevs is this also being done?
This is still pending as there is still cancellations occurring without reasons in RPCServer.ts
.
i don't think there are any places where cancel
or abort
are used without reason, or closing the stream ungracefully where it is unneeded.
There is one place where it is done:
const reader = headTransformStream.readable.getReader();
// Allows timing out when waiting for the first message
let headerMessage:
| ReadableStreamDefaultReadResult<JSONRPCRequest>
| undefined
| void;
try {
headerMessage = await Promise.race([
reader.read(),
timer.then(
() => undefined,
() => {},
),
]);
} catch (e) {
const newErr = new errors.ErrorRPCHandlerFailed(
'Stream failed waiting for header',
{ cause: e },
);
await inputStreamEndProm;
timer.cancel(cleanupReason);
await timer.catch(() => {});
this.dispatchEvent(
new events.RPCErrorEvent({
detail: new errors.ErrorRPCOutputStreamError(
'Stream failed waiting for header',
{ cause: newErr },
),
}),
);
return;
}
// Downgrade back to the raw stream
await reader.cancel();
But this does not matter, as the headTransformStream is Tee of the actual underlying stream that is meant to only parse the header of each RPC call. Hence, the cancel does not propagate to the parent stream.
Therefore, this issue is considered no longer applicable.
When you say it doesn't matter, should we even keep it?
Specification
We cannot call
cancel
orabort
without reasons. This can causeundefined
errors, which is very difficult to debug.These are being used to ungracefully close streams, possibly in
RPCClient
andRPCServer
.This shouldn't be used anymore anyway because we can rely on timeouts instead, rather than ungracefully closing streams.
Additional Context
cancel
andabort
calls