winterggg / blog

0 stars 0 forks source link

AsyncLocalStorage 源码分析 #7

Open winterggg opened 5 months ago

winterggg commented 5 months ago

AsyncLocalStorage 源码分析

TL; DR

本文概述了 AsyncLocalStorage 的概念和它在 Node.js 中的历史发展。随后简要探讨了 AsyncLocalStorage 的部分源码,以阐明其在异步编程中如何管理上下文状态。最后简单分析了 AsyncLocalStoragefx-code 项目中的实际应用场景。

What, Why & History

What & Why?

AsyncLocalStorage (后文简写为 ALS)是 Node.js 中的一个 API,它提供了一种在异步调用链中存储和传递数据的方法。

这个类属于 async_hooks 模块,类似于 PythoncontextvarsJavaThreadLocal ,能够让程序在一个异步上下文中存储数据。

ALS 的典型应用场景包括:请求范围内跟踪用户 auth、日志记录、事务追踪、Metrics 等等。

ALS 的历史

ALS 之前,Node 社区曾涌现过各种解决异步上下文跟踪的方案,包括 domainCLSCLS-hooked

简单来说,domainCLS的实现方式是给 node 打猴子补丁(运行时补丁),而 CLS-hookedALS 则是依赖官方的 async_hooks API。经过一系列的优化,ALS 的性能损耗已经足够低,官方也推荐使用 ALS 来追踪异步执行上下文。

使用方式

根据官方文档ALS 的主要 API 如下:

官网的一个例子:

import http from 'node:http';
import { AsyncLocalStorage } from 'node:async_hooks';

const asyncLocalStorage = new AsyncLocalStorage();

function logWithId(msg) {
  const id = asyncLocalStorage.getStore();
  console.log(`${id !== undefined ? id : '-'}:`, msg);
}

let idSeq = 0;
http.createServer((req, res) => {
  asyncLocalStorage.run(idSeq++, () => {
    logWithId('start');
    // Imagine any chain of async operations here
    setImmediate(() => {
      logWithId('finish');
      res.end();
    });
  });
}).listen(8080);

http.get('http://localhost:8080');
http.get('http://localhost:8080');
// Prints:
//   0: start
//   1: start
//   0: finish
//   1: finish

在这个例子中,我们创建了一个 AsyncLocalStorage 实例,并使用中间件来为每个请求分配一个唯一 ID。使用 run 方法可以在当前异步执行上下文中绑定请求 ID,这样,在这个异步执行上下文创建的任何异步事件中,我们都可以通过 getStore 方法得到同一个 ID,而无需显式地将请求 ID 透传给每个函数。

实现原理

ALS 的原理其实非常简单,下面以 Node 18.13 源码为例进行简要分析。

ALS 的源码中 js 部分在代码仓库的 lib/async_hooks.js 文件里,首先看 run 的实现:

run(store, callback, ...args) {
    // Avoid creation of an AsyncResource if store is already active
    if (ObjectIs(store, this.getStore())) {
      return ReflectApply(callback, null, args);
    }

    this._enable();

    const resource = executionAsyncResource();
    const oldStore = resource[this.kResourceStore];

    resource[this.kResourceStore] = store;

    try {
      return ReflectApply(callback, null, args);
    } finally {
      resource[this.kResourceStore] = oldStore;
    }
}

其中,

可以看出,run 主要做了这样一件事情:备份当前的 store,然后替换为 run 函数入参提供的新 store,然后执行入参里的回调函数,这样,后续的异步操作都可以通过这个 resource 访问到新的 store

当然,回调函数里也能继续嵌套调用 run,而这里其实是借用函数的调用栈结构,实现了一个 store 的切换与重置。

到这里,回调函数里的所有同步函数都能正确的通过 getStore 获得对应的存储:

  getStore() {
    if (this.enabled) {
      const resource = executionAsyncResource();
      return resource[this.kResourceStore];
    }
  }

不过,万一回调里开启了新的异步操作呢?这时executionAsyncResource() 会得到一个全新的 resource。因此,我们需要一个机制去在调用处传递 resource 里的 store。

这其实是通过 async_hook API 来实现的,具体来说,node 在全局维护了一个 store 列表:

const storageList = [];
const storageHook = createHook({
  init(asyncId, type, triggerAsyncId, resource) {
    const currentResource = executionAsyncResource();
    // Value of currentResource is always a non null object
    for (let i = 0; i < storageList.length; ++i) {
      storageList[i]._propagate(resource, currentResource, type);
    }
  }
});

// als._enable()
  _enable() {
    if (!this.enabled) {
      this.enabled = true;
      ArrayPrototypePush(storageList, this);
      storageHook.enable();
    }
  }

如上面的代码所示,ALS 创建了一个 init hook,首次调用 run 时,会把当前 store 对象存入全局 store 列表并启动 hook。这样,每次进入新的异步操作前,会将旧 resource 中的所有 store 实例传递给新的 resource,这样在异步操作内,也能正常通过 getStore 获取 store 啦!

至于剩下两个函数 enterWithdisable,实现比较简单。扫一眼代码就大概知道是在干嘛了:

enterWith(store) {
  this._enable();
  const resource = executionAsyncResource();
  resource[this.kResourceStore] = store;
}

disable() {
    if (this.enabled) {
      this.enabled = false;
      // If this.enabled, the instance must be in storageList
      ArrayPrototypeSplice(storageList,
                           ArrayPrototypeIndexOf(storageList, this), 1);
      if (storageList.length === 0) {
        storageHook.disable();
      }
    }
  }

FX-CODE 里的应用

fx-code 工程里的 nstarter-core 依赖里使用 ALS 封装了一个异步上下文管理器,主要用来维护一个请求的 traceId。另外 james 之前做 metrics 的时候也用它来存储一些请求范围的 trace 信息。

实现上来说,在 node_modules/nstarter-core/src/context/provider.ts 里定义了 ContextProvider 类,其中 getMiddleware 返回一个中间件,将 next 函数包括在了 als 的上下文里。这样每一个新的请求,都能通过静态方法 ContextProvider.getContext() 获取请求上下文了:

private getMiddleware(options: IContextMiddlewareOptions): RequestHandler {
    const opts: IContextMiddlewareOptions = {
        idGenerator: defaultIdGenerator,
        ...options
    };
    return (req: Request, res: Response, next: NextFunction) =>
        this._localStorage.run(new this._Context(), () => {
            const context = this.context;
            const traceId = opts.idGenerator?.(req);
            if (context && traceId) {
                context.setTraceId(traceId);
            }
            return next();
        });
}

/**
 * 获取上下文对象
 */
private get context(): T | undefined {
    return this._localStorage.getStore();
}

public static getContext<T extends BaseContext>(): T | undefined {
    try {
        return ContextProvider.getInstance<T>().context;
    } catch (err) {
        return;
    }
}

上下文丢失的问题

在大多数情况下, AsyncLocalStorage 可以正常工作,对于基于回调的函数,只需要使用 util.promisify() 将其转换为 Promise 就可以了。

在少数情况下可能会发生上下文丢失,这时只需要通过 AsyncResource API 恰当地绑定好上下文即可。当你发现上下文丢失时,可以通过在适当的位置执行 getStore 来推理造出上下文丢失的函数调用并解决修复。

例如,由 EventEmitter 触发的事件监听器可能在调用 eventEmitter.on() 时的执行上下文中运行。下面展示了一个问题场景以及对应的修复方式,其他类似的基于事件驱动的场景也可以用同样的方式修复:

import { createServer } from 'node:http';
import { AsyncResource, executionAsyncId } from 'node:async_hooks';

const server = createServer((req, res) => {
  req.on('close', AsyncResource.bind(() => {
    // Execution context is bound to the current outer scope.
  }));
  req.on('close', () => {
    // Execution context is bound to the scope that caused 'close' to emit.
  });
  res.end();
}).listen(3000);