Pines-Cheng / blog

技术博客
https://pines-cheng.github.io/blog/
546 stars 42 forks source link

Theia 技术揭秘之 JSON-RPC 通信 #85

Open Pines-Cheng opened 3 years ago

Pines-Cheng commented 3 years ago

image

Theia 框架前端 UI 布局和 Services 一样,具备灵活可拓展的特点。VSCode 是内置了一套基本的组件系统,而 Theia 框架的 UI 布局基于 PhosphorJS 框架。 PhosphorJS 提供了包含 widgets、layouts、事件和数据结构的丰富工具包。这使得开发人员能够构建可扩展的、高性能的、类桌面的 Web 应用程序,比如 JupyterLab。

PhosphorJS 作者退休,项目已归档,该项目现在被 Jupyter 团队重命名为 jupyterlab/lumino 继续维护。见 issue:https://github.com/jupyterlab/team-compass/issues/28

写在前面

前置条件:

  1. 了解 Theia 的简单原理及前后端模块加载的方式
  2. 了解 InversifyJS 的依赖注入的原理和使用

Theia JSON RPC 实现的缺点:

  1. 概念多,什么 factory,proxy 等,server 和 client 概念有点混淆。
  2. 每次添加接口都需要实现 IServer/IClient/IWatcher,然后按照规范注入,工作量并不少
  3. 和 Inversify 、Theia 源码、后端服务耦合严重,没有独立成包

Theia JSON-RPC 协议示例

image

通信场景

  1. Server 与 Browser

通过 Websocket 信道进行通信。

  1. 插件(Web Worker)
this.postMessage(m);
  1. iframe 与 Browser (Webview)
        postMessage(channel, data) {
            window.parent.postMessage({ target: id, channel, data }, '*');
        }

添加日志调试 JSON RPC 服务

在启动后,Theia 会启动一个 Express 服务。前后端的 JSON-RPC 通信,正是基于 Express 上的 Websocket 连接。

接下来将创建调试日志系统服务,然后通过 JSON RPC 连接到它。

注册服务

因此,你要做的第一件事是暴露服务,以便前端可以连接到它。

你需要创建类似于下面这个(logger-server-module. ts)的后端服务器模块文件:

import { ContainerModule } from 'inversify';
import { ConnectionHandler, JsonRpcConnectionHandler } from "../../messaging/common";
import { ILoggerServer, ILoggerClient } from '../../application/common/logger-protocol';

export const loggerServerModule = new ContainerModule(bind => {
    bind(ConnectionHandler).toDynamicValue(ctx =>
        new JsonRpcConnectionHandler<ILoggerClient>("/services/logger", client => {
            const loggerServer = ctx.container.get<ILoggerServer>(ILoggerServer);
            loggerServer.setClient(client);
            return loggerServer;
        })
    ).inSingletonScope()
});

核心在于 ConnectionHandlerJsonRpcConnectionHandler

ConnectionHandler

ConnectionHandler 类型绑定到 messaging-module.ts 中的 ContributionProvider

MessagingContribution 启动(调用 onStart)时,它为所有绑定 ConnectionHandlers 创建一个 Websocket 连接。

即依次在 Server 注册 path,并绑定 onConnection 事件。

// packages/core/src/node/messaging/messaging-contribution.ts
export class MessagingContribution implements BackendApplicationContribution, MessagingService {
  constructor( @inject(ContributionProvider) @named(ConnectionHandler) protected readonly handlers: 
  ContributionProvider<ConnectionHandler>) {
      }

    // 服务启动时调用
    onStart(server: http.Server): void {
        // 遍历
        for (const handler of this.handlers.getContributions()) {
            const path = handler.path;
            try {
                createServerWebSocketConnection({
                    server,
                    path
                }, connection => handler.onConnection(connection));
            } catch (error) {
                console.error(error)
            }
        }
    }
}

JsonRpcConnectionHandler

我们看看一下 JsonRpcConnectionHandler 的实现,就会发现 onConnection 做了三件事:

  1. 基于 JsonRpcProxyFactory 和传入的 path 创建 factory 实例
  2. 通过 createProxy 方法创建代理 proxy
  3. 从 factory 创建一个代理对象:factory.target = this.targetFactory(proxy);
  4. 将 factory 和 connection 连接起来

第三步将调用 new JsonRpcConnectionHandler( ) 传入的函数:

        client => {
            const loggerServer = ctx.container.get<ILoggerServer>(ILoggerServer);
            loggerServer.setClient(client);
            return loggerServer;
        }

这将在 loggerServer 上设置 Client,在这种情况下,用于向前端发送 onLogLevelChanged 通知。

// packages/core/src/common/messaging/proxy-factory.ts
export class JsonRpcConnectionHandler<T extends object> implements ConnectionHandler {
    constructor(
        readonly path: string,
        readonly targetFactory: (proxy: JsonRpcProxy<T>) => any,
        readonly factoryConstructor: new () => JsonRpcProxyFactory<T> = JsonRpcProxyFactory
    ) { }
        onConnection(connection: MessageConnection): void {
            // 1. 在 path “logger” 上创建了一个 JsonRpcProxy
            const factory = new JsonRpcProxyFactory(this.path);

            // 2. 在 factory 类上创建了一个代理对象
            // 这个对象可以使用 ILoggerClient 定义的接口调用 JSON-RPC 的另一端。
            const proxy = factory.createProxy();

            // 3. 这里调用了 new JsonRpcConnectionHandler 传入的函数 client=>{},用于 loggerServer.setClient
            factory.target = this.targetFactory(proxy);

          // 4. 这将 factory 与 connection 连接了起来
            factory.listen(connection);
        }
    }
}

这样,services/* 的请求由 Webpack dev server 处理,请参阅 webpack.config.js

'/services/*': {
    target: 'ws://localhost:3000',
    ws: true
},

Server 实现

Server 定义通过 JSON-RPC 调用的后端对象,ILoggerServer 接口如下,这里定义了 4 个方法。

// packages/core/src/common/logger-protocol.ts
export interface ILoggerServer extends JsonRpcServer<ILoggerClient> {
    setLogLevel(name: string, logLevel: number): Promise<void>;
    getLogLevel(name: string): Promise<number>;
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    log(name: string, logLevel: number, message: any, params: any[]): Promise<void>;
    child(name: string): Promise<void>;
}

继承自 JsonRpcServer

// packages/core/src/common/messaging/proxy-factory.ts
export type JsonRpcServer<Client> = Disposable & {
    /**
     * If this server is a proxy to a remote server then
     * a client is used as a local object
     * to handle JSON-RPC messages from the remote server.
     */
    setClient(client: Client | undefined): void;
    getClient?(): Client | undefined;
};

当前,源码中仅有 ConsoleLoggerServer 的实现: export class ConsoleLoggerServer implements ILoggerServer {}

Client 实现

Client 用于定义接收来自后端对象的通知,DispatchingLoggerClient 实现如下:

// packages/core/src/common/logger-protocol.ts
@injectable()
export class DispatchingLoggerClient implements ILoggerClient {

    readonly clients = new Set<ILoggerClient>();

    onLogLevelChanged(event: ILogLevelChangedEvent): void {
        this.clients.forEach(client => client.onLogLevelChanged(event));
    }

}

前端连接服务

上面我们创建了后端服务,接下来我们需要从前端连接它。

分为以下三步:

  1. 创建了一个 watcher,使用 loggerWatcher Client 从后端获取事件通知
  2. 获得了 Websocket 连接
  3. 通过loggerWatcher.getLoggerClient()获得本地对象,用来来处理来自远程对象的 JSON-RPC 消息,通过传入 createProxy 创建一个代理
// logger-frontend-module. ts
import { ContainerModule, Container } from 'inversify';
import { WebSocketConnectionProvider } from '../../messaging/browser/connection';
import { ILogger, LoggerFactory, LoggerOptions, Logger } from '../common/logger';
import { ILoggerServer } from '../common/logger-protocol';
import { LoggerWatcher } from '../common/logger-watcher';

export const loggerFrontendModule = new ContainerModule(bind => {
    bind(ILogger).to(Logger).inSingletonScope();
          // 1. 这里创建了一个 watcher,使用 loggerWatcher Client从后端获取事件通知
    bind(LoggerWatcher).toSelf().inSingletonScope();
    bind(ILoggerServer).toDynamicValue(ctx => {
        const loggerWatcher = ctx.container.get(LoggerWatcher);
    // 2. 这里获得了一个 Websocket 连接
        const connection = ctx.container.get(WebSocketConnectionProvider);
        // 3. 这里,我们传入了一个用于处理 JSON-RPC 的对象。
        return connection.createProxy<ILoggerServer>("/services/logger", loggerWatcher.getLoggerClient());
    }).inSingletonScope();
});

WebSocketConnectionProvider 的 connection.createProxy 实际执行以下代码:

// packages/core/src/common/messaging/abstract-connection-provider.ts
export abstract class AbstractConnectionProvider<AbstractOptions extends object> {
    /**
     * Create a proxy object to remote interface of T type
     * over a web socket connection for the given path.
     */
    createProxy<T extends object>(path: string, arg?: object): JsonRpcProxy<T> {
        const factory = arg instanceof JsonRpcProxyFactory ? arg : new JsonRpcProxyFactory<T>(arg);
        this.listen({
            path,
            onConnection: c => factory.listen(c)
        });
        return factory.createProxy();
    }
    /**
     * Install a connection handler for the given path.
     */
    listen(handler: ConnectionHandler, options?: AbstractOptions): void {
        this.openChannel(handler.path, channel => {
            const connection = createWebSocketConnection(channel, this.createLogger());
            connection.onDispose(() => channel.close());
            handler.onConnection(connection);
        }, options);
    }
}

接下来,即可使用 ILoggerService 获取对象进行 RPC 调用。

LoggerWatcher

LoggerWatcher 定义了 onLogLevelChanged 的消息响应。

@injectable()
export class LoggerWatcher {

    getLoggerClient(): ILoggerClient {
        const emitter = this.onLogLevelChangedEmitter;
        return {
            onLogLevelChanged(event: ILogLevelChangedEvent): void {
                emitter.fire(event);
            }
        };
    }

    private onLogLevelChangedEmitter = new Emitter<ILogLevelChangedEvent>();

    get onLogLevelChanged(): Event<ILogLevelChangedEvent> {
        return this.onLogLevelChangedEmitter.event;
    }

    // FIXME: get rid of it, backend services should as well set a client on the server
    fireLogLevelChanged(event: ILogLevelChangedEvent): void {
        this.onLogLevelChangedEmitter.fire(event);
    }
}

加载模块

需要导入模块和加载进主容器两步。

// 导入模块
import { loggerServerModule } from 'theia-core/lib/application/node/logger-server-module';

// 加载进容器
container.load(loggerServerModule);

完整的通信例子可以看:

Add debug logging support · eclipse-theia/theia@99d191f

源码

核心的接口和类有:ConnectionHandler,JsonRpcConnectionHandler 以及 JsonRpcProxyFactory,搞清楚他们的作用。

ConnectionHandler

ConnectionHandler 是一个简单的接口,它指定连接的 path 以及 onConnection 方法。

export interface ConnectionHandler {
    readonly path: string;
    onConnection(connection: MessageConnection): void;
}

JsonRpcConnectionHandler

JsonRpcProxyFactoryJsonRpcConnectionHandler 中被使用。

Websocket 连接正是在 JsonRpcConnectionHandler 类上建立的。建立连接的逻辑在 JsonRpcConnectionHandler 类的 onConnection 函数上,过程如下:

// packages/core/src/common/messaging/proxy-factory.ts
export class JsonRpcConnectionHandler<T extends object> implements ConnectionHandler {
    constructor(
        readonly path: string,
        readonly targetFactory: (proxy: JsonRpcProxy<T>) => any,
        readonly factoryConstructor: new () => JsonRpcProxyFactory<T> = JsonRpcProxyFactory
    ) { }
        onConnection(connection: MessageConnection): void {
            // 在 path “logger” 上创建了一个 JsonRpcProxy
            const factory = new JsonRpcProxyFactory(this.path);

            // 在 factory 类上创建了一个代理对象
            // 这个对象可以使用 ILoggerClient 定义的接口调用 JSON-RPC 的另一端。
            const proxy = factory.createProxy();

            // 这里调用了我们在参数中传入的函数
            factory.target = this.targetFactory(proxy);

          // 这将 factory 与 connection 连接了起来
            factory.listen(connection);
        }
    }
}

JsonRpcProxyFactory

JSON RPC 的核心在于:JsonRpcProxyFactory,源码里注释很详细,还有使用 Demo,值得好好学习一下。

// packages/core/src/common/messaging/proxy-factory.ts
/**
 * Factory for JSON-RPC proxy objects.
 *
 * A JSON-RPC proxy exposes the programmatic interface of an object through
 * JSON-RPC.  This allows remote programs to call methods of this objects by
 * sending JSON-RPC requests.  This takes place over a bi-directional stream,
 * where both ends can expose an object and both can call methods each other's
 * exposed object.
 *
 * For example, assuming we have an object of the following type on one end:
 *
 *     class Foo {
 *         bar(baz: number): number { return baz + 1 }
 *     }
 *
 * which we want to expose through a JSON-RPC interface.  We would do:
 *
 *     let target = new Foo()
 *     let factory = new JsonRpcProxyFactory<Foo>('/foo', target)
 *     factory.onConnection(connection)
 *
 * The party at the other end of the `connection`, in order to remotely call
 * methods on this object would do:
 *
 *     let factory = new JsonRpcProxyFactory<Foo>('/foo')
 *     factory.onConnection(connection)
 *     let proxy = factory.createProxy();
 *     let result = proxy.bar(42)
 *     // result is equal to 43
 *
 * One the wire, it would look like this:
 *
 *     --> {"jsonrpc": "2.0", "id": 0, "method": "bar", "params": {"baz": 42}}
 *     <-- {"jsonrpc": "2.0", "id": 0, "result": 43}
 *
 * Note that in the code of the caller, we didn't pass a target object to
 * JsonRpcProxyFactory, because we don't want/need to expose an object.
 * If we had passed a target object, the other side could've called methods on
 * it.
 *
 * @param <T> - The type of the object to expose to JSON-RPC.
 */
export class JsonRpcProxyFactory<T extends object> implements ProxyHandler<T> {

    protected readonly onDidOpenConnectionEmitter = new Emitter<void>();
    protected readonly onDidCloseConnectionEmitter = new Emitter<void>();

    protected connectionPromiseResolve: (connection: MessageConnection) => void;
    protected connectionPromise: Promise<MessageConnection>;

    /**
     * Build a new JsonRpcProxyFactory.
     *
     * @param target - The object to expose to JSON-RPC methods calls.  If this
     *   is omitted, the proxy won't be able to handle requests, only send them.
     */
    constructor(public target?: any) {
        this.waitForConnection();
    }

    protected waitForConnection(): void {
        this.connectionPromise = new Promise(resolve =>
            this.connectionPromiseResolve = resolve
        );
        this.connectionPromise.then(connection => {
            connection.onClose(() =>
                this.onDidCloseConnectionEmitter.fire(undefined)
            );
            this.onDidOpenConnectionEmitter.fire(undefined);
        });
    }

    /**
     * Connect a MessageConnection to the factory.
     *
     * This connection will be used to send/receive JSON-RPC requests and
     * response.
     */
    listen(connection: MessageConnection): void {
        if (this.target) {
            for (const prop in this.target) {
                if (typeof this.target[prop] === 'function') {
                    connection.onRequest(prop, (...args) => this.onRequest(prop, ...args));
                    connection.onNotification(prop, (...args) => this.onNotification(prop, ...args));
                }
            }
        }
        connection.onDispose(() => this.waitForConnection());
        connection.listen();
        this.connectionPromiseResolve(connection);
    }

    /**
     * Process an incoming JSON-RPC method call.
     *
     * onRequest is called when the JSON-RPC connection received a method call
     * request.  It calls the corresponding method on [[target]].
     *
     * The return value is a Promise object that is resolved with the return
     * value of the method call, if it is successful.  The promise is rejected
     * if the called method does not exist or if it throws.
     *
     * @returns A promise of the method call completion.
     */
    protected async onRequest(method: string, ...args: any[]): Promise<any> {
        try {
            return await this.target[method](...args);
        } catch (error) {
            const e = this.serializeError(error);
            if (e instanceof ResponseError) {
                throw e;
            }
            const reason = e.message || '';
            const stack = e.stack || '';
            console.error(`Request ${method} failed with error: ${reason}`, stack);
            throw e;
        }
    }

    /**
     * Process an incoming JSON-RPC notification.
     *
     * Same as [[onRequest]], but called on incoming notifications rather than
     * methods calls.
     */
    protected onNotification(method: string, ...args: any[]): void {
        this.target[method](...args);
    }

    /**
     * Create a Proxy exposing the interface of an object of type T.  This Proxy
     * can be used to do JSON-RPC method calls on the remote target object as
     * if it was local.
     *
     * If `T` implements `JsonRpcServer` then a client is used as a target object for a remote target object.
     */
    createProxy(): JsonRpcProxy<T> {
        const result = new Proxy<T>(this as any, this);
        return result as any;
    }

    /**
     * Get a callable object that executes a JSON-RPC method call.
     *
     * Getting a property on the Proxy object returns a callable that, when
     * called, executes a JSON-RPC call.  The name of the property defines the
     * method to be called.  The callable takes a variable number of arguments,
     * which are passed in the JSON-RPC method call.
     *
     * For example, if you have a Proxy object:
     *
     *     let fooProxyFactory = JsonRpcProxyFactory<Foo>('/foo')
     *     let fooProxy = fooProxyFactory.createProxy()
     *
     * accessing `fooProxy.bar` will return a callable that, when called,
     * executes a JSON-RPC method call to method `bar`.  Therefore, doing
     * `fooProxy.bar()` will call the `bar` method on the remote Foo object.
     *
     * @param target - unused.
     * @param p - The property accessed on the Proxy object.
     * @param receiver - unused.
     * @returns A callable that executes the JSON-RPC call.
     */
    get(target: T, p: PropertyKey, receiver: any): any {
        if (p === 'setClient') {
            return (client: any) => {
                this.target = client;
            };
        }
        if (p === 'getClient') {
            return () => this.target;
        }
        if (p === 'onDidOpenConnection') {
            return this.onDidOpenConnectionEmitter.event;
        }
        if (p === 'onDidCloseConnection') {
            return this.onDidCloseConnectionEmitter.event;
        }
        const isNotify = this.isNotification(p);
        return (...args: any[]) => {
            const method = p.toString();
            const capturedError = new Error(`Request '${method}' failed`);
            return this.connectionPromise.then(connection =>
                new Promise((resolve, reject) => {
                    try {
                        if (isNotify) {  
                            // sendNotification
                            connection.sendNotification(method, ...args);
                            resolve();
                        } else {
                            // sendRequest
                            const resultPromise = connection.sendRequest(method, ...args) as Promise<any>;
                            resultPromise
                                .catch((err: any) => reject(this.deserializeError(capturedError, err)))
                                .then((result: any) => resolve(result));
                        }
                    } catch (err) {
                        reject(err);
                    }
                })
            );
        };
    }

    /**
     * Return whether the given property represents a notification.
     *
     * A property leads to a notification rather than a method call if its name
     * begins with `notify` or `on`.
     *
     * @param p - The property being called on the proxy.
     * @return Whether `p` represents a notification.
     */
    protected isNotification(p: PropertyKey): boolean {
        return p.toString().startsWith('notify') || p.toString().startsWith('on');
    }

    protected serializeError(e: any): any {
        if (ApplicationError.is(e)) {
            return new ResponseError(e.code, '',
                Object.assign({ kind: 'application' }, e.toJson())
            );
        }
        return e;
    }
    protected deserializeError(capturedError: Error, e: any): any {
        if (e instanceof ResponseError) {
            const capturedStack = capturedError.stack || '';
            if (e.data && e.data.kind === 'application') {
                const { stack, data, message } = e.data;
                return ApplicationError.fromJson(e.code, {
                    message: message || capturedError.message,
                    data,
                    stack: `${capturedStack}\nCaused by: ${stack}`
                });
            }
            e.stack = capturedStack;
        }
        return e;
    }

}

写在最后

个人还是觉得 cyrus-and/chrome-remote-interface 使用协议定义文件自动生成方式更优雅,代码更简洁。且独立成包,每次只需要添加 protocol 类型文件内容即可自动生成接口。

不过 chrome-remote-interface 只是一个客户端接口,并没有服务端。个人参考着设计了基于 Websocket 的 JSON RPC 协议规范和及 API。:cloudbase-interface,具有以下优点:

  1. 包含服务端和客户端
  2. 不管后端使用什么 websocket 框架,只需要提供:serverAdaptor 接口的实现即可。
  3. 使用中间件的思想扩充 API

参考

zzkkui commented 3 years ago

您好,请问你们有需要做ide非活动后一段时间提示或者断开链接吗?跟gitpod类似,非活动一段时间会自动断开。目前想的是通过追踪ws,根据里面的message来判断。不过目前追踪message,会修改到源码。不知道您这边有研究吗?

Pines-Cheng commented 3 years ago

您好,请问你们有需要做ide非活动后一段时间提示或者断开链接吗?跟gitpod类似,非活动一段时间会自动断开。目前想的是通过追踪ws,根据里面的message来判断。不过目前追踪message,会修改到源码。不知道您这边有研究吗?

@zzkkui 方法很多,最简单的方式就是通过注入 JS 脚本轮询实现。

zzkkui commented 3 years ago

您好,请问你们有需要做ide非活动后一段时间提示或者断开链接吗?跟gitpod类似,非活动一段时间会自动断开。目前想的是通过追踪ws,根据里面的message来判断。不过目前追踪message,会修改到源码。不知道您这边有研究吗?

@zzkkui 方法很多,最简单的方式就是通过注入 JS 脚本轮询实现。

对,肯定会采用轮询,目前的话我觉得有两种方法可行,一种就是通过mouseover 事件,另外一种就是追踪 ws 的message,判断是否是像下面这种状态 image image 但是如何在不修改theia源码的情况下,我还没找方法通过扩展的方式来拦截 https://github.com/eclipse-theia/theia/blob/65509341ebf2e6c2aedd98c066c9d405a111200f/packages/core/src/common/messaging/web-socket-channel.ts#L118-L122 或者 https://github.com/eclipse-theia/theia/blob/65509341ebf2e6c2aedd98c066c9d405a111200f/packages/core/src/common/messaging/abstract-connection-provider.ts#L107-L116

不知道您这边有做过相关研究没。谢谢

Pines-Cheng commented 3 years ago

@zzkkui 分为两种情况吧:

  1. 页面关闭。
  2. 页面超过时间无活动,我觉得直接监听 mouseover等事件,加个 debounce 触发弹窗或关闭就行了,没必要做到 ws 那么细的粒度,除非有额外的需求。