wzhudev / blog

:book:
220 stars 14 forks source link

vscode 源码解析 - 事件模块 #40

Closed wzhudev closed 3 years ago

wzhudev commented 3 years ago


在进一步深入学习 vscode 的各种机制之前,我们先对 vscode 当中的一些基础工具做一些探索,因为核心机制大量地用到了这些基础模块,这篇文章将会介绍事件(event)模块,相关代码在 vs/base/common/event.ts 文件中。

Event 模块实现

Event 接口

Event 接口规定了一个函数,当调用了这个函数,就表示监听了这个函数所对应的事件流。

export interface Event<T> {
    (listener: (e: T) => any, thisArgs?: any, disposables?: IDisposable[] | DisposableStore): IDisposable;
}

返回的 IDisposable 对象用于解除这个监听的(通过调用它的 dispose 方法)。

另外一种解除监听的方式就是 disposable 了,Event 函数在执行的过程中会将 IDisposable 插入 disposables,方便调用方决定在什么时候解除监听。

Emitter

有了 Event 接口,我们可以规定事件如何被消费,那么事件是如何产生的呢?一种方法就是通过 Emitter

Emitter 类型暴露了两个重要方法:

fire,从这个方法的函数签名就能看出它就是用来派发一个事件的,该方法的主要逻辑就是将 this._listeners 当中的保存的 listener 全部调用一遍(省略了部分分支逻辑和性能监控相关代码)

    fire(event: T): void {
        if (this._listeners) {
            for (let listener of this._listeners) {
                this._deliveryQueue.push([listener, event]);
            }

            while (this._deliveryQueue.size > 0) {
                const [listener, event] = this._deliveryQueue.shift()!;
                try {
                    if (typeof listener === 'function') {
                        listener.call(undefined, event);
                    } else {
                        listener[0].call(listener[1], event);
                    }
                } catch (e) {
                    onUnexpectedError(e);
                }
            }
        }
    }

get event(),这个方法会在 Emitter 中创建一个 Event,其主要逻辑就是将 listener 添加到 this._listeners 当中

const remove = this._listeners.push(!thisArgs ? listener : [listener, thisArgs]);

Emitter 类型还提供了一些特殊的回调接口:

export interface EmitterOptions {
    onFirstListenerAdd?: Function;
    onFirstListenerDidAdd?: Function;
    onListenerDidAdd?: Function;
    onLastListenerRemove?: Function;
}

这使得 Emitter 在注册消费者的时候执行一些额外的逻辑。我们将会在下文中看到其中一些回调所扮演的重要角色。

Event 辅助方法

vscode 还提供了一系列工具方法用于组合 Event ,得到更加丰富的事件处理能力。下面我们一一进行说明。

once

    export function once<T>(event: Event<T>): Event<T> {
        return (listener, thisArgs = null, disposables?) => {
            // we need this, in case the event fires during the listener call
            let didFire = false;
            let result: IDisposable;
            result = event(/* A */ e => {               
                if (didFire) {
                    return;
                } else if (result) {
                    result.dispose();
                } else {
                    didFire = true;
                }

                return listener.call(thisArgs, e);
            }, null, disposables);

            if (didFire) {
                result.dispose();
            }

            return result;
        };
    }

这个方法用于将一个 Event 变为只能派发一次的,事件类型相同的 Event

每一个事件到达时,会从 A 处开始执行,可以看到这段代码通过 didFire 作为锁,保证 listener.call(thisArgs, e) 只会被执行一次。

很明显, once 的执行过程中有两个 Event ,那么消息如何在 Event 之间传递的呢?我们注意到 A 处的匿名函数调用了一个 Eventlistener,而 A 本身又是另一个 Event 的 listener,所以答案是很明显的:消息沿着 Event 链传递的过程,就是 Eventlistener 们递归调用的过程。

snapshot

    export function snapshot<T>(event: Event<T> /* B */): Event<T> {
        let listener: IDisposable;
        const emitter = new Emitter<T>({
            onFirstListenerAdd() {
                listener = event(emitter.fire, emitter);
            },
            onLastListenerRemove() {
                listener.dispose();
            }
        });

        /* C */
        return emitter.event;
    }

这个工具方法用于生成 map 等操作,我们把它和 map 一起分析。

map

将一种类型的事件转换成另一种类型的事件,看起来和 Arraymap 非常相似。

    export function map<I, O>(event: Event<I> /* A */, map: (i: I) => O): Event<O> {
        return snapshot(
                         /* B */
            (listener, thisArgs = null, disposables?) => event(i => listener.call(thisArgs, map(i)), null, disposables));
    }

从代码可以看出:这里的 Event 链的顺序是

  1. map 装饰的 Event A
  2. snapshot 的参数,匿名的 Event B
  3. Emitter 暴露出的 Event C

当用户调用这个 map 转换出的 Event 的时候,实际上订阅的是 C,然后 C 在第一次被订阅时,会调用 B,而 B 又去订阅了 A。这里我们看到了 Emitter 的参数钩子起到了什么作用:B 是一个很特殊的 Event 它在 onFirstListenerAdd 中被订阅了 ,并且之后它并不会参与到 listener 的调用链中来,而是帮助 A 和 C 的 listener 之间创建了调用链,同时调用 map 对事件做了处理。

当有事件传递过来的时候,则是调用 A 的 listener i => listener.call(thisArgs, map(i)) ,而这里的 listener 很明显可以看出是 C 的 listener,也就是 Emitterfire 方法,通过上文中对 Emitter 的学习,我们知道,fire 方法会触发用户调用 C 时所传递来的 listener,这样整个传递链条就完整了。

forEach / filter / reduce

了解了 map 的工作原理之后,这三个函数就容易理解了,大家可以自行阅读代码。

signal

这个函数仅仅是做了一下类型转换,让订阅者忽略事件所携带的数据,比较简单。

any

    export function any<T>(...events: Event<T>[]): Event<T> {
        return (listener, thisArgs = null, disposables?) => combinedDisposable(...events.map(event => event(e => listener.call(thisArgs, e), null, disposables)));
    }

这个方法会在 events 中任意一个 Event 派发事件的时候派发一个事件。

debounce

对 Event 链条上的事件做防抖处理。

    export function debounce<T>(event: Event<T>, merge: (last: T | undefined, event: T) => T, delay?: number, leading?: boolean, leakWarningThreshold?: number): Event<T>;
    export function debounce<I, O>(event: Event<I>, merge: (last: O | undefined, event: I) => O, delay?: number, leading?: boolean, leakWarningThreshold?: number): Event<O>;
    export function debounce<I, O>(event: Event<I>, merge: (last: O | undefined, event: I) => O, delay: number = 100, leading = false, leakWarningThreshold?: number): Event<O> {

        let subscription: IDisposable;
        let output: O | undefined = undefined;
        let handle: any = undefined;
        let numDebouncedCalls = 0;

        const emitter = new Emitter<O>({
            leakWarningThreshold,
            onFirstListenerAdd() {
                subscription = event(/* A */ cur => {
                    numDebouncedCalls++;
                    output = merge(output, cur);

                    if (leading && !handle) {
                        emitter.fire(output);
                        output = undefined;
                    }

                    clearTimeout(handle);
                    handle = setTimeout(() => {
                        const _output = output;
                        output = undefined;
                        handle = undefined;
                        if (!leading || numDebouncedCalls > 1) {
                            emitter.fire(_output!);
                        }

                        numDebouncedCalls = 0;
                    }, delay);
                });
            },
            onLastListenerRemove() {
                subscription.dispose();
            }
        });

        return emitter.event;
    }

不难看出这段代码的核心逻辑就是 A 处的 listener,它会对 debounce 时间内对数据做归并处理,并设置定时器,当收到新事件时就取消定时器,而定时器到期时就调用 emitter.fire 向下游继续发送事件。

stopWatch

这是一个记录耗时的 Event,当它收到第一个事件时,会把这个事件转换为它从创建到收到该事件的耗时。

latch

这个 Event 仅有当事件确实发生变化时,才会向下游发送事件。原理也很简单,就是在 filter 的基础上,利用闭包来缓存上一次事件的数据,然后用新数据和它做比较,新老数据不同或者是第一次接收数据才放通。

buffer

这个 Event 在没有人订阅它时,会缓存所有收到的事件,并在收到订阅时将已经缓存的事件全部发送出去。

    export function buffer<T>(event: Event<T>, nextTick = false, _buffer: T[] = []): Event<T> {
        let buffer: T[] | null = _buffer.slice();

        let listener: IDisposable | null = event(e => {
            if (buffer) {
                buffer.push(e);
            } else {
                emitter.fire(e);
            }
        });

        const flush = () => {
            if (buffer) {
                buffer.forEach(e => emitter.fire(e));
            }
            buffer = null;
        };

        const emitter = new Emitter<T>({
            onFirstListenerAdd() {
                if (!listener) {
                    listener = event(e => emitter.fire(e));
                }
            },

            onFirstListenerDidAdd() {
                if (buffer) {
                    if (nextTick) {
                        setTimeout(flush);
                    } else {
                        flush();
                    }
                }
            },

            onLastListenerRemove() {
                if (listener) {
                    listener.dispose();
                }
                listener = null;
            }
        });

        return emitter.event;
    }

如果调用 buffer 时传入了 nextTick = True ,则发送缓存事件的操作会易步进行,所以如果你第一次订阅时同步添加了很多 listener,则它们都会收到这些缓存的事件。

ChainableEvent

如果要多次使用 map, filter 等函数,一个比较优雅的写法是链式调用,例如 Event.map.filter.xxxChainableEvent 就是为此准备的,通过调用 chain 方法,一个 Event 会转换成 ChainableEvent,然后就可以进行链式调用:

        export function chain<T>(event: Event<T>): IChainableEvent<T> {
        return new ChainableEvent(event);
    }

ChainableEvent 的实现很简单,就是对上面的方法进行了一次包裹,这里就不再赘述了。

除了上面提到的对 Event 的转换方法之外,还有一些生成 Event 的方法。

fromNodeEventEmitter

    export function fromNodeEventEmitter<T>(emitter: NodeEventEmitter, eventName: string, map: (...args: any[]) => T = id => id): Event<T> {
        const fn = (...args: any[]) => result.fire(map(...args));
        const onFirstListenerAdd = () => emitter.on(eventName, fn);
        const onLastListenerRemove = () => emitter.removeListener(eventName, fn);
        const result = new Emitter<T>({ onFirstListenerAdd, onLastListenerRemove });

        return result.event;
    }

该方法是对 node.js 原生事件的包裹,在原生事件的回调中调用 Emitter.fire

fromDOMEventEmitter

对 DOM 事件进行包装,和上面的非常相似,这里就不赘述了。

fromPromise

    export function fromPromise<T = any>(promise: Promise<T>): Event<undefined> {
        const emitter = new Emitter<undefined>();
        let shouldEmit = false;

        promise
            .then(undefined, () => null)
            .then(() => {
                if (!shouldEmit) {
                    setTimeout(() => emitter.fire(undefined), 0);
                } else {
                    emitter.fire(undefined);
                }
            });

        shouldEmit = true;
        return emitter.event;
    }

将 Promise 转换为事件。通过 shouldEmit 确保 Promise 不会因为已经 resolve 而在订阅发生之前就开始派发事件(这样会导致错过事件)。

奇怪的是这里丢失了 Promise 返回的结果,不知道为什么这么设计,可能是 vscode 自己用不着吧。

toPromise

将事件转换为 Event,这个也比较简单。

另外,还有一些工具类提供更多的事件管理能力。

PauseableEmitter

类似于 Emitter,但是能通过 pauseresume 方法暂停一条 Event 链上事件的传播,比较简单。

EventMultiplexer

这个类可以订阅多个事件,并在任意一个事件派发的时候,将该事件转发给自己所有的订阅者,它的核心就是它的 hook 方法:

    private hook(e: { event: Event<T>; listener: IDisposable | null; }): void {
        e.listener = e.event(r => this.emitter.fire(r));
    }

也较为简单,这里不再赘述。

EventBufferer

这是一个非常有趣的类,它提供了一个 wrapEvent 方法包裹一个 Event,并提供了一个 bufferEvents 方法,在这个方法的回调内所有经过它 wrapEvent 包裹的 Event,都先不会被传播给订阅者。

/**
 * The EventBufferer is useful in situations in which you want
 * to delay firing your events during some code.
 * You can wrap that code and be sure that the event will not
 * be fired during that wrap.
 *
 * ```
 * const emitter: Emitter;
 * const delayer = new EventDelayer();
 * const delayedEvent = delayer.wrapEvent(emitter.event);
 *
 * delayedEvent(console.log);
 *
 * delayer.bufferEvents(() => {
 *   emitter.fire(); // event will not be fired yet
 * });
 *
 * // event will only be fired at this point
 * ```
 */
export class EventBufferer {

    private buffers: Function[][] = [];

    wrapEvent<T>(event: Event<T>): Event<T> {
        return (listener, thisArgs?, disposables?) => {
            return event(i => {
                const buffer = this.buffers[this.buffers.length - 1];

                if (buffer) {
                    buffer.push(() => listener.call(thisArgs, i));
                } else {
                    listener.call(thisArgs, i);
                }
            }, undefined, disposables);
        };
    }

    bufferEvents<R = void>(fn: () => R): R {
        const buffer: Array<() => R> = [];
        this.buffers.push(buffer);
        const r = fn();
        this.buffers.pop();
        buffer.forEach(flush => flush());
        return r;
    }
}

bufferEvents 被调用的时候,会往 this.buffers 中压入一个新 buffer,在 fn 执行过程中派发的事件,就会因为 if (buffer) 判断为 true 而被缓存, fn 执行完毕之后, buffer 被弹出,其中包含的事件全部被派发。

Relay

这个类提供了切换上游 Event 的方法。当设置 Relayinput 属性时,就会切换监听的 Event,而下游的 Event 监听的是 RelayEmitter,因此无需重新设置监听。

export class Relay<T> implements IDisposable {
    // ...

    set input(event: Event<T>) {
        this.inputEvent = event;

        if (this.listening) {
            this.nputEventListener.dispose();
            this.inputEventListener = event(this.emitter.fire, this.emitter);
        }
    }

    // ...
}

总结

至此我们已经学习了 vscode 事件模块的主要内容(其他的性能分析和泄漏监测等这篇文章就不分析了,感兴趣的读者可以自行阅读)。

vscode 事件模块是所谓响应式编程的一种实现,如果想要继续学习响应式编程,非常推荐以下两个项目: