tsy77 / blog

78 stars 2 forks source link

Node.js源码-setTimeout&setImmediate&process.nextTick #12

Open tsy77 opened 6 years ago

tsy77 commented 6 years ago

本文将讲解setTimeout、setImmediate和process.nextTick的实现,其中有一些部分与上一篇讲解的事件循环关联或依赖。

setTimeout

首先setTimeout在lib/internal/bootstrap/node.js中被初始化,代码如下:

function setupGlobalTimeouts() {
    const timers = NativeModule.require('timers');
    global.clearImmediate = timers.clearImmediate;
    global.clearInterval = timers.clearInterval;
    global.clearTimeout = timers.clearTimeout;
    global.setImmediate = timers.setImmediate;
    global.setInterval = timers.setInterval;
    global.setTimeout = timers.setTimeout;
}

下面我们看下lib/timer.js中的setTimeout方法:

function setTimeout(callback, after, arg1, arg2, arg3) {
  if (typeof callback !== 'function') {
    throw new ERR_INVALID_CALLBACK();
  }

  var i, args;
  switch (arguments.length) {
    // fast cases
    case 1:
    case 2:
      break;
    case 3:
      args = [arg1];
      break;
    case 4:
      args = [arg1, arg2];
      break;
    default:
      args = [arg1, arg2, arg3];
      for (i = 5; i < arguments.length; i++) {
        // extend array dynamically, makes .apply run much faster in v6.0.0
        args[i - 2] = arguments[i];
      }
      break;
  }

  const timeout = new Timeout(callback, after, args, false, false);
  active(timeout);

  return timeout;
}

这里做了两件事:

1.实例化timeout
2.调用active(timeout)

沿着调用轨迹,首先看下Timeout类,其在internal/timers.js中被定义:

// Timer constructor function.
// The entire prototype is defined in lib/timers.js
function Timeout(callback, after, args, isRepeat, isUnrefed) {
  after *= 1; // coalesce to number or NaN
  if (!(after >= 1 && after <= TIMEOUT_MAX)) {
    if (after > TIMEOUT_MAX) {
      process.emitWarning(`${after} does not fit into` +
                          ' a 32-bit signed integer.' +
                          '\nTimeout duration was set to 1.',
                          'TimeoutOverflowWarning');
    }
    after = 1; // schedule on next tick, follows browser behavior
  }

  this._called = false;
  this._idleTimeout = after;
  this._idlePrev = this;
  this._idleNext = this;
  this._idleStart = null;
  // this must be set to null first to avoid function tracking
  // on the hidden class, revisit in V8 versions after 6.2
  this._onTimeout = null;
  this._onTimeout = callback;
  this._timerArgs = args;
  this._repeat = isRepeat ? after : null;
  this._destroyed = false;

  this[unrefedSymbol] = isUnrefed;

  initAsyncResource(this, 'Timeout');
}

这里有两个属性值得关注:

1._idleTimeout用于记录空闲时间,也就是过多久该触发
2._onTimeout记录了timer的回调

下面我们来看active方法:

// Schedule or re-schedule a timer.
// The item must have been enroll()'d first.
const active = exports.active = function(item) {
  insert(item, false);
};

// The underlying logic for scheduling or re-scheduling a timer.
//
// Appends a timer onto the end of an existing timers list, or creates a new
// TimerWrap backed list if one does not already exist for the specified timeout
// duration.
function insert(item, unrefed, start) {
  const msecs = item._idleTimeout;
  if (msecs < 0 || msecs === undefined) return;

  if (typeof start === 'number') {
    item._idleStart = start;
  } else {
    item._idleStart = TimerWrap.now();
  }

  const lists = unrefed === true ? unrefedLists : refedLists;

  // Use an existing list if there is one, otherwise we need to make a new one.
  var list = lists[msecs];
  if (list === undefined) {
    debug('no %d list was found in insert, creating a new one', msecs);
    lists[msecs] = list = new TimersList(msecs, unrefed);
  }

  if (!item[async_id_symbol] || item._destroyed) {
    item._destroyed = false;
    initAsyncResource(item, 'Timeout');
  }

  L.append(list, item);
  assert(!L.isEmpty(list)); // list is not empty
}

function TimersList(msecs, unrefed) {
  this._idleNext = this; // Create the list with the linkedlist properties to
  this._idlePrev = this; // prevent any unnecessary hidden class changes.
  this._unrefed = unrefed;
  this.msecs = msecs;

  const timer = this._timer = new TimerWrap();
  timer._list = this;

  if (unrefed === true)
    timer.unref();
  timer.start(msecs);
}

在active实际调用了insert方法,而在insert方法中,做了如下两件事:

1.如果没有lists[msecs],则实例化TimersList双向链表
2.向lists[msecs]链表中插入当前Timeout实例

下面将选择其中关键点进行讲解。

首先是msecs,这里的msecs指的是item._idleTimeout,也就是触发时间,这里也可以看出lists是一个以触发时间为key的一个map,其value就是对应的TimeList。

下面再来看TimersList,在构造函数中做了两件事:

1.实例化TimerWrap,赋值给this.timer
2.调用timer的start方法,参数是msecs

我们接着看TimerWrap,其在src/timer_wrap.cc中定义,构造函数如下所示:

static void New(const FunctionCallbackInfo<Value>& args) {
    // This constructor should not be exposed to public javascript.
    // Therefore we assert that we are not trying to call this as a
    // normal function.
    CHECK(args.IsConstructCall());
    Environment* env = Environment::GetCurrent(args);
    new TimerWrap(env, args.This());
  }

  TimerWrap(Environment* env, Local<Object> object)
      : HandleWrap(env,
                   object,
                   reinterpret_cast<uv_handle_t*>(&handle_),
                   AsyncWrap::PROVIDER_TIMERWRAP) {
    int r = uv_timer_init(env->event_loop(), &handle_);
    CHECK_EQ(r, 0);
  }

static void Start(const FunctionCallbackInfo<Value>& args) {
    TimerWrap* wrap;
    ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());

    CHECK(HandleWrap::IsAlive(wrap));

    int64_t timeout = args[0]->IntegerValue();
    int err = uv_timer_start(&wrap->handle_, OnTimeout, timeout, 0);
    args.GetReturnValue().Set(err);
  }

其中TimerWrap构造函数初始化过程中调用了libuv的uv_timer_init,初始化了类的公共属性handle_,Start方法调用了uv_timer_start方法,将handle_ -> timer_cb设为cb(将回调设置成了OnTimeout方法),同时将 handle->heap_node 插入到事件循环中timer最小堆中,这也就和上一篇文章讲的事件循环联系在了一起。下面我们接着看当执行到当前timer时,OnTimeout做了什么:

static void OnTimeout(uv_timer_t* handle) {
    TimerWrap* wrap = static_cast<TimerWrap*>(handle->data);
    Environment* env = wrap->env();
    HandleScope handle_scope(env->isolate());
    Context::Scope context_scope(env->context());
    Local<Value> ret;
    Local<Value> args[1];
    do {
      args[0] = env->GetNow();
      ret = wrap->MakeCallback(env->timers_callback_function(), 1, args)
                .ToLocalChecked();
    } while (ret->IsUndefined() &&
             !env->tick_info()->has_thrown() &&
             wrap->object()->Get(env->context(),
                                 env->owner_string()).ToLocalChecked()
                                                     ->IsUndefined());
  }

这里首先拿到handle->data,然后调用wrap->MakeCallback执行回调,这里需要注意两点:

1.handle->data是什么
2.wrap->MakeCallback是什么
3.env->timers_callback_function()又是什么

handle->data在TimerWrap的构造函数中被初始化,实际上调用时TimerWrap的父类HandleWrap的构造函数,HandleWrap构造函数如下:

HandleWrap::HandleWrap(Environment* env,
                       Local<Object> object,
                       uv_handle_t* handle,
                       AsyncWrap::ProviderType provider)
    : AsyncWrap(env, object, provider),
      state_(kInitialized),
      handle_(handle) {
  handle_->data = this;
  HandleScope scope(env->isolate());
  env->handle_wrap_queue()->PushBack(this);
}

我们可以看出this就是当前的HandleWrap实例,所以在取出后又调用了static_cast转成了其子类TimerWrap。

wrap->MakeCallback调用的其实是其父类的父类AsyncWrap的MakeCallback方法,代码如下:

MaybeLocal<Value> AsyncWrap::MakeCallback(const Local<Function> cb,
                                          int argc,
                                          Local<Value>* argv) {
  EmitTraceEventBefore();

  ProviderType provider = provider_type();
  async_context context { get_async_id(), get_trigger_async_id() };
  MaybeLocal<Value> ret = InternalMakeCallback(
      env(), object(), cb, argc, argv, context);

  // This is a static call with cached values because the `this` object may
  // no longer be alive at this point.
  EmitTraceEventAfter(provider, context.async_id);

  return ret;
}

在其中又调用了node.cc中的InternalMakeCallback方法,代码如下:

MaybeLocal<Value> InternalMakeCallback(Environment* env,
                                       Local<Object> recv,
                                       const Local<Function> callback,
                                       int argc,
                                       Local<Value> argv[],
                                       async_context asyncContext) {
  CHECK(!recv.IsEmpty());
  InternalCallbackScope scope(env, recv, asyncContext);
  if (scope.Failed()) {
    return Undefined(env->isolate());
  }

  Local<Function> domain_cb = env->domain_callback();
  MaybeLocal<Value> ret;
  if (asyncContext.async_id != 0 || domain_cb.IsEmpty() || recv.IsEmpty()) {
    ret = callback->Call(env->context(), recv, argc, argv);
  } else {
    std::vector<Local<Value>> args(1 + argc);
    args[0] = callback;
    std::copy(&argv[0], &argv[argc], args.begin() + 1);
    ret = domain_cb->Call(env->context(), recv, args.size(), &args[0]);
  }

  if (ret.IsEmpty()) {
    // NOTE: For backwards compatibility with public API we return Undefined()
    // if the top level call threw.
    scope.MarkAsFailed();
    return scope.IsInnerMakeCallback() ? ret : Undefined(env->isolate());
  }

  scope.Close();
  if (scope.Failed()) {
    return Undefined(env->isolate());
  }

  return ret;
}

InternalMakeCallback我们这里需要关注的是ret = callback->Call(env->context(), recv, argc, argv);,也就是执行了callback,也就是env->timers_callback_function()方法,那么这个方法又是什么呢?

我们回到lib/timer.js中,看到模块里有下面一段逻辑:

const [immediateInfo, toggleImmediateRef] =
  setupTimers(processImmediate, processTimers);

function processTimers(now) {
  if (this.owner)
    return unrefdHandle(this.owner, now);
  return listOnTimeout(this, now);
}

setupTimers定义在timer_wrap.cc中,代码如下:

static void SetupTimers(const FunctionCallbackInfo<Value>& args) {
    CHECK(args[0]->IsFunction());
    CHECK(args[1]->IsFunction());
    auto env = Environment::GetCurrent(args);

    env->set_immediate_callback_function(args[0].As<Function>());
    env->set_timers_callback_function(args[1].As<Function>());

    auto toggle_ref_cb = [] (const FunctionCallbackInfo<Value>& args) {
      Environment::GetCurrent(args)->ToggleImmediateRef(args[0]->IsTrue());
    };
    auto toggle_ref_function =
        env->NewFunctionTemplate(toggle_ref_cb)->GetFunction(env->context())
        .ToLocalChecked();
    auto result = Array::New(env->isolate(), 2);
    result->Set(env->context(), 0,
                env->immediate_info()->fields().GetJSArray()).FromJust();
    result->Set(env->context(), 1, toggle_ref_function).FromJust();
    args.GetReturnValue().Set(result);
  }

这里我们看到,其实就是把processImmediate设置成immediate_callback_function,processTimers设置成timers_callback_function。

看到这里大家就明白了,原来在timer.js中先注册了timers_callback_function,然后在timeout时调用wrap->MakeCallback(env->timers_callback_function(), 1, args).ToLocalChecked();调用了js里的回调函数,也就是执行了响应队列中的回调。

setImmediate

setImmediate入口代码如下:

function setImmediate(callback, arg1, arg2, arg3) {
  if (typeof callback !== 'function') {
    throw new ERR_INVALID_CALLBACK();
  }

  var i, args;
  switch (arguments.length) {
    // fast cases
    case 1:
      break;
    case 2:
      args = [arg1];
      break;
    case 3:
      args = [arg1, arg2];
      break;
    default:
      args = [arg1, arg2, arg3];
      for (i = 4; i < arguments.length; i++) {
        // extend array dynamically, makes .apply run much faster in v6.0.0
        args[i - 1] = arguments[i];
      }
      break;
  }

  return new Immediate(callback, args);
}

这里只实例化了一个Immediate对象,那么Immediate里又有什么呢?

const Immediate = class Immediate {
  constructor(callback, args) {
    this._idleNext = null;
    this._idlePrev = null;
    // this must be set to null first to avoid function tracking
    // on the hidden class, revisit in V8 versions after 6.2
    this._onImmediate = null;
    this._onImmediate = callback;
    this._argv = args;
    this._destroyed = false;
    this[kRefed] = false;

    initAsyncResource(this, 'Immediate');

    this.ref();
    immediateInfo[kCount]++;

    immediateQueue.append(this);
  }

这里其实就是注册了回调,然后把自己插入到链表中。

那么setImmediate什么时候执行呢?我们发现在介绍setTimeout时的这段代码:

const [immediateInfo, toggleImmediateRef] =
  setupTimers(processImmediate, processTimers);

这里利用setupTimers方法,把processImmediate这个js方法设置成immediate_callback_function,那么什么时候调用这个函数呢?

我们通过immediate_callback_function找到了调用它的地方,原来在emv.cc中:

void Environment::CheckImmediate(uv_check_t* handle) {
  Environment* env = Environment::from_immediate_check_handle(handle);

  if (env->immediate_info()->count() == 0)
    return;

  HandleScope scope(env->isolate());
  Context::Scope context_scope(env->context());

  env->RunAndClearNativeImmediates();

  do {
    MakeCallback(env->isolate(),
                 env->process_object(),
                 env->immediate_callback_function(),
                 0,
                 nullptr,
                 {0, 0}).ToLocalChecked();
  } while (env->immediate_info()->has_outstanding());

  if (env->immediate_info()->ref_count() == 0)
    env->ToggleImmediateRef(false);
}

CheckImmediate又是在Environment::Start中被注册,我们看以下代码:

void Environment::Start(int argc,
                        const char* const* argv,
                        int exec_argc,
                        const char* const* exec_argv,
                        bool start_profiler_idle_notifier) {
  HandleScope handle_scope(isolate());
  Context::Scope context_scope(context());

  // 初始化check阶段
  uv_check_init(event_loop(), immediate_check_handle());
  // loop的handle计数器减1
  uv_unref(reinterpret_cast<uv_handle_t*>(immediate_check_handle()));

  // 初始化idle阶段
  uv_idle_init(event_loop(), immediate_idle_handle());

  // 开始执行check阶段的CheckImmediate
  uv_check_start(immediate_check_handle(), CheckImmediate);

  ......
}

原来正在Environment::Start中把CheckImmediate加到了event loop的check阶段队列中。

这也印证了setImmediate在check阶段被执行。

process.nextTick

我们知道使用process.nextTick挂载的方法会在事件循环每一阶段结束后执行,这是如何做到的呢,我们接下来见分晓。

process.nextTick在internal/bootstrap/node.js中被初始化:

NativeModule.require('internal/process/next_tick').setup();

process.nextTick()实际调用的是internal/process/next_tick.js中的nextTick()方法:

// `nextTick()` will not enqueue any callback when the process is about to
  // exit since the callback would not have a chance to be executed.
  function nextTick(callback) {
    if (typeof callback !== 'function')
      throw new ERR_INVALID_CALLBACK();

    if (process._exiting)
      return;

    var args;
    switch (arguments.length) {
      case 1: break;
      case 2: args = [arguments[1]]; break;
      case 3: args = [arguments[1], arguments[2]]; break;
      case 4: args = [arguments[1], arguments[2], arguments[3]]; break;
      default:
        args = new Array(arguments.length - 1);
        for (var i = 1; i < arguments.length; i++)
          args[i - 1] = arguments[i];
    }

    if (queue.isEmpty())
      tickInfo[kHasScheduled] = 1;
    queue.push(new TickObject(callback, args, getDefaultTriggerAsyncId()));
 }

这里首先实例化TickObject对象,然后将其push到queue中。TickObject类定义如下:

class TickObject {
    constructor(callback, args, triggerAsyncId) {
      // this must be set to null first to avoid function tracking
      // on the hidden class, revisit in V8 versions after 6.2
      this.callback = null;
      this.callback = callback;
      this.args = args;

      const asyncId = newAsyncId();
      this[async_id_symbol] = asyncId;
      this[trigger_async_id_symbol] = triggerAsyncId;

      if (initHooksExist()) {
        emitInit(asyncId,
                 'TickObject',
                 triggerAsyncId,
                 this);
      }
    }
  }

其实就是把callback挂载属性上。

那么process.nextTick()上挂载的回调在什么时候被调用呢?

我们注意到next_tick.js中有这样一点注册代码:

 // tickInfo is used so that the C++ code in src/node.cc can
  // have easy access to our nextTick state, and avoid unnecessary
  // calls into JS land.
  // runMicrotasks is used to run V8's micro task queue.
  // set_tick_callback_function()
  const [
    tickInfo,
    runMicrotasks
  ] = process._setupNextTick(_tickCallback);

其中,process._setupNextTick方法在node.cc中定义,代码如下:

void SetupNextTick(const FunctionCallbackInfo<Value>& args) {
  Environment* env = Environment::GetCurrent(args);

  CHECK(args[0]->IsFunction());

  env->set_tick_callback_function(args[0].As<Function>());

  env->process_object()->Delete(
      env->context(),
      FIXED_ONE_BYTE_STRING(env->isolate(), "_setupNextTick")).FromJust();

  v8::Local<v8::Function> run_microtasks_fn =
      env->NewFunctionTemplate(RunMicrotasks)->GetFunction(env->context())
          .ToLocalChecked();
  run_microtasks_fn->SetName(
      FIXED_ONE_BYTE_STRING(env->isolate(), "runMicrotasks"));

  Local<Array> ret = Array::New(env->isolate(), 2);
  ret->Set(env->context(), 0,
           env->tick_info()->fields().GetJSArray()).FromJust();
  ret->Set(env->context(), 1, run_microtasks_fn).FromJust();

  args.GetReturnValue().Set(ret);
}

这里主要做了三件事:

1.set_tick_callback_function,也就是定义tick_callback_function为我们next_tick.js中的_tickCallback方法
2.挂载RunMicrotasks,也就是v8中执行Microtasks的方法
3.挂载tick_info属性

那么tick_callback_function执行就是执行我们的_tickCallback方法,也就是执行我们的nexttick queue,下面我们就只需要关注tick_callback_function执行时机即可。

我们寻着tick_callback_function,发现他在InternalCallbackScope::Close中被调用:

void InternalCallbackScope::Close() {
  ......

  if (env_->tick_callback_function()->Call(process, 0, nullptr).IsEmpty()) {
    env_->tick_info()->set_has_thrown(true);
    failed_ = true;
  }
}

InternalCallbackScope这个我们似曾相识,在上面好像的InternalCallback中好像出现过,果然:

MaybeLocal<Value> InternalMakeCallback(Environment* env,
                                       Local<Object> recv,
                                       const Local<Function> callback,
                                       int argc,
                                       Local<Value> argv[],
                                       async_context asyncContext) {
  CHECK(!recv.IsEmpty());
  InternalCallbackScope scope(env, recv, asyncContext);
  if (scope.Failed()) {
    return Undefined(env->isolate());
  }

  ...

  scope.Close();
  if (scope.Failed()) {
    return Undefined(env->isolate());
  }

  return ret;
}

InternalCallbackScope::Close在InternalMakeCallback中被调用,我们又知道InternalMakeCallback在MakeCallback中被调用,MakeCallback代码上文提到过,这里不再赘述,而MakeCallback在我们上述的timer_wrap.cc中被调用过,这也就印证了process.nextTick挂载的方法会在事件循环每一阶段结束后执行。

我们正向整理下这里的逻辑,首先next_tick.js中的_tickCallback赋值给了tick_callback_function,在event loop每个阶段执行后,会调用相应的MakeCallback,在MakeCallback中调用了InternalMakeCallback,最终在InternalMakeCallback中调用了InternalCallbackScope::Close,从而调用了_tickCallback回调。

这里有一个地方需要注意,这里的InternalCallbackScope::Close会被执行两次,原因是InternalCallbackScope的析构函数中也调用了close方法,而InternalMakeCallback中定义了InternalCallbackScope类型的对象scope,当函数调用栈被销毁时,会调用InternalCallbackScope的析构函数,从而又调用InternalCallbackScope::Close,当然,这是我们的next_tick队列之前被清空了。

最后我们看下next_tick.js中的_tickCallback方法:

function _tickCallback() {
    let tock;
    do {
      while (tock = queue.shift()) {
        const asyncId = tock[async_id_symbol];
        emitBefore(asyncId, tock[trigger_async_id_symbol]);
        // emitDestroy() places the async_id_symbol into an asynchronous queue
        // that calls the destroy callback in the future. It's called before
        // calling tock.callback so destroy will be called even if the callback
        // throws an exception that is handled by 'uncaughtException' or a
        // domain.
        // TODO(trevnorris): This is a bit of a hack. It relies on the fact
        // that nextTick() doesn't allow the event loop to proceed, but if
        // any async hooks are enabled during the callback's execution then
        // this tock's after hook will be called, but not its destroy hook.
        if (destroyHooksExist())
          emitDestroy(asyncId);

        const callback = tock.callback;
        if (tock.args === undefined)
          callback();
        else
          Reflect.apply(callback, undefined, tock.args);

        emitAfter(asyncId);
      }
      tickInfo[kHasScheduled] = 0;
      runMicrotasks();
    } while (!queue.isEmpty() || emitPromiseRejectionWarnings());
    tickInfo[kHasPromiseRejections] = 0;
  }

这里主要做了两件事:

1.执行queue中的回调
2.runMicrotasks,执行microtasks

这里需要注意的是,在开始循环之前,process.nextTick()/microtasks 是会被先清掉的。在node官方文档中我们看到这样的定义

When Node.js starts, it initializes the event loop, processes the provided input script (or drops into the REPL, which is not covered in this document) which may make async API calls, schedule timers, or call process.nextTick(), then begins processing the event loop.

而这块逻辑的关键在./lib/internal/modules/cjs/loader.js中的runMain中,代码如下:

// bootstrap main module.
Module.runMain = function() {
  // Load the main module--the command line argument.
  Module._load(process.argv[1], null, true);
  // Handle any nextTicks added in the first tick of the program
  process._tickCallback();
};

此方法执行了用户代码,然后执行了process.nextTick 和 microtasks,这段代码在node.cc中的loadEnvironment方法被调用(ExecuteBootstrapper),也就是uv_run之前执行,这就印证了上述的在开始循环之前,process.nextTick()/microtasks 是会被先清掉的。

总结

上文中我们介绍了setTimeout、setImmediate和process.nextTick的实现原理,其中setTimeout、setImmediate是依赖于event loop中的timer和check阶段执行,而process.nextTick通过定义tick_callback_function实现了在每一个event loop阶段结束后都会执行的效果。