tsy77 / blog

78 stars 2 forks source link

libuv源码-Event Loop #11

Open tsy77 opened 6 years ago

tsy77 commented 6 years ago

本文将主要介绍libuv的事件循环,包括了事件循环的流程,而我们也知道libuv是使用poll机制来实现网络I/O,通过线程池来实现文件I/O,当然线程间也是通过poll机制来实现通信的,后面就将介绍线程池与事件循环是如何结合的。

event loop流程

事件循环的流程大致如下图所示:

代码如下所示:

int uv_run(uv_loop_t* loop, uv_run_mode mode) {
  int timeout;
  int r;
  int ran_pending;

  // 有活跃的handle或req
  r = uv__loop_alive(loop);
  if (!r)
    uv__update_time(loop);

  while (r != 0 && loop->stop_flag == 0) {
    uv__update_time(loop);
    uv__run_timers(loop);
    // run pending queue
    ran_pending = uv__run_pending(loop);
    // UV_LOOP_WATCHER_DEFINE,执行队列
    uv__run_idle(loop);
    uv__run_prepare(loop);

    timeout = 0;
    if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
      // 检查下还有没有active handle,返回下次timer发生剩余时间
      timeout = uv_backend_timeout(loop);

    uv__io_poll(loop, timeout);
    uv__run_check(loop);
    uv__run_closing_handles(loop);

    if (mode == UV_RUN_ONCE) {
      /* UV_RUN_ONCE implies forward progress: at least one callback must have
       * been invoked when it returns. uv__io_poll() can return without doing
       * I/O (meaning: no callbacks) when its timeout expires - which means we
       * have pending timers that satisfy the forward progress constraint.
       *
       * UV_RUN_NOWAIT makes no guarantees about progress so it's omitted from
       * the check.
       */
      uv__update_time(loop);
      uv__run_timers(loop);
    }

    r = uv__loop_alive(loop);
    if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
      break;
  }

  /* The if statement lets gcc compile it to a conditional store. Avoids
   * dirtying a cache line.
   */
  if (loop->stop_flag != 0)
    loop->stop_flag = 0;

  return r;
}

时间循环可以分为以下几个步骤:

1.缓存当前时间
2.执行定时器队列(最小堆)中的callback
3.执行上一轮循环pending的I/O callback
4.执行idle队列中的callback
5.执行prepare队列中的callback
6.计算离下一个timer到来的时间间隔 poll timeout
7.阻塞处理poll I/O, 超时时间为上一步计算的timeout
8.执行check队列中的callback
9.执行close队列中的callback

时间循环结束的条件有如下几种:

1.loop不是alive,也就是说没有活跃的handle或req
2.mode模式为UV_RUN_ONCE或UV_RUN_NOWAIT

下面挑选重要的几点进行讲解:

判断loop是不是alive

决定loop是否是alive取决于是否有活跃的handle或者req,或者被直接stop掉,代码如下:

static int uv__loop_alive(const uv_loop_t* loop) {
  return uv__has_active_handles(loop) ||
         uv__has_active_reqs(loop) ||
         loop->closing_handles != NULL;
}

uv__run_timers

uv__run_timers代码如下:

void uv__run_timers(uv_loop_t* loop) {
  struct heap_node* heap_node;
  uv_timer_t* handle;

  for (;;) {
    // 从timer堆中找出节点
    heap_node = heap_min((struct heap*) &loop->timer_heap);
    if (heap_node == NULL)
      break;

    // 通过heap_node找到结构体起始为止,从而找到handle
    handle = container_of(heap_node, uv_timer_t, heap_node);
    // 还没到时间
    if (handle->timeout > loop->time)
      break;

    // uv__active_handle_rm
    uv_timer_stop(handle);
    uv_timer_again(handle);
    handle->timer_cb(handle);
  }
}

我们注意到,存储timer节点的数据结构是一个以handle->timeout为基准的最小堆,函数循环过程中主要做了如下几件事:

1.从最小堆中取出当前timeout最小的节点,也就是说最先执行的阶段
2.如果最小的节点还没到时间去执行,break退出
3.如果到了该执行的时间,调用heap_remove从堆中删除节点,调用uv__active_handle_rm将loop->active_handles减1

uv__run_pending

uv__run_pending主要是将loop->pending_queue中的callback取出执行,代码如下:

static int uv__run_pending(uv_loop_t* loop) {
  QUEUE* q;
  QUEUE pq;
  uv__io_t* w;

  if (QUEUE_EMPTY(&loop->pending_queue))
    return 0;

  QUEUE_MOVE(&loop->pending_queue, &pq);

  while (!QUEUE_EMPTY(&pq)) {
    q = QUEUE_HEAD(&pq);
    QUEUE_REMOVE(q);
    QUEUE_INIT(q);
    w = QUEUE_DATA(q, uv__io_t, pending_queue);
    w->cb(loop, w, POLLOUT);
  }

  return 1;
}

后面的uv__run_idle和uv__run_prepare与之类似。

poll I/O

poll I/O是事件循环的重点,它基于IO多路复用的机制,所有网络操作都使用 non-blocking 套接字,并使用各个平台上性能最好的 poll 机制例如 linux 上的 epoll,OSX 的 kqueue 等等;而所有文件I/O基于线程池实现,但线程间通信同样基于相应的poll机制。

下面的uv__io_poll是基于linux伤的epoll来实现,其他平台的实现也类似,具体代码如下:

void uv__io_poll(uv_loop_t* loop, int timeout) {
  /* A bug in kernels < 2.6.37 makes timeouts larger than ~30 minutes
   * effectively infinite on 32 bits architectures.  To avoid blocking
   * indefinitely, we cap the timeout and poll again if necessary.
   *
   * Note that "30 minutes" is a simplification because it depends on
   * the value of CONFIG_HZ.  The magic constant assumes CONFIG_HZ=1200,
   * that being the largest value I have seen in the wild (and only once.)
   */
  static const int max_safe_timeout = 1789569;
  static int no_epoll_pwait;
  static int no_epoll_wait;
  struct uv__epoll_event events[1024];
  struct uv__epoll_event* pe;
  struct uv__epoll_event e;
  int real_timeout;
  QUEUE* q;
  uv__io_t* w;
  sigset_t sigset;
  uint64_t sigmask;
  uint64_t base;
  int have_signals;
  int nevents;
  int count;
  int nfds;
  int fd;
  int op;
  int i;

  // loop->watchers[w->fd] = w in uv__io_start func
  if (loop->nfds == 0) {
    assert(QUEUE_EMPTY(&loop->watcher_queue));
    return;
  }

  // 取出观察者队列中的fd, 调用uv__epoll_ctl监听
  while (!QUEUE_EMPTY(&loop->watcher_queue)) {
    q = QUEUE_HEAD(&loop->watcher_queue);
    QUEUE_REMOVE(q);
    QUEUE_INIT(q);

     // QUEUE_DATA类似container
    w = QUEUE_DATA(q, uv__io_t, watcher_queue);
    assert(w->pevents != 0);
    assert(w->fd >= 0);
    assert(w->fd < (int) loop->nwatchers);

    e.events = w->pevents;
    e.data = w->fd;

    if (w->events == 0)
      op = UV__EPOLL_CTL_ADD;
    else
      op = UV__EPOLL_CTL_MOD;

    /* XXX Future optimization: do EPOLL_CTL_MOD lazily if we stop watching
     * events, skip the syscall and squelch the events after epoll_wait().
     */
    // fd = uv__epoll_create1(UV__EPOLL_CLOEXEC); loop->backend_fd = fd;
    if (uv__epoll_ctl(loop->backend_fd, op, w->fd, &e)) {
      if (errno != EEXIST)
        abort();

      assert(op == UV__EPOLL_CTL_ADD);

      /* We've reactivated a file descriptor that's been watched before. */
      if (uv__epoll_ctl(loop->backend_fd, UV__EPOLL_CTL_MOD, w->fd, &e))
        abort();
    }

    w->events = w->pevents;
  }

  sigmask = 0;
  if (loop->flags & UV_LOOP_BLOCK_SIGPROF) {
    sigemptyset(&sigset);
    sigaddset(&sigset, SIGPROF);
    sigmask |= 1 << (SIGPROF - 1);
  }

  assert(timeout >= -1);
  base = loop->time;
  count = 48; /* Benchmarks suggest this gives the best throughput. */
  real_timeout = timeout;

  for (;;) {
    /* See the comment for max_safe_timeout for an explanation of why
     * this is necessary.  Executive summary: kernel bug workaround.
     */
    if (sizeof(int32_t) == sizeof(long) && timeout >= max_safe_timeout)
      timeout = max_safe_timeout;

    if (sigmask != 0 && no_epoll_pwait != 0)
      if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
        abort();

    if (no_epoll_wait != 0 || (sigmask != 0 && no_epoll_pwait == 0)) {
      // 返回需要处理的事件数目
      nfds = uv__epoll_pwait(loop->backend_fd,
                             events,
                             ARRAY_SIZE(events),
                             timeout,
                             sigmask);
      if (nfds == -1 && errno == ENOSYS)
        no_epoll_pwait = 1;
    } else {
      nfds = uv__epoll_wait(loop->backend_fd,
                            events,
                            ARRAY_SIZE(events),
                            timeout);
      if (nfds == -1 && errno == ENOSYS)
        no_epoll_wait = 1;
    }

    if (sigmask != 0 && no_epoll_pwait != 0)
      if (pthread_sigmask(SIG_UNBLOCK, &sigset, NULL))
        abort();

    /* Update loop->time unconditionally. It's tempting to skip the update when
     * timeout == 0 (i.e. non-blocking poll) but there is no guarantee that the
     * operating system didn't reschedule our process while in the syscall.
     */
    SAVE_ERRNO(uv__update_time(loop));

    if (nfds == 0) {
      assert(timeout != -1);

      if (timeout == 0)
        return;

      /* We may have been inside the system call for longer than |timeout|
       * milliseconds so we need to update the timestamp to avoid drift.
       */
      // 没有需要处理的事件
      goto update_timeout;
    }

    if (nfds == -1) {
      if (errno == ENOSYS) {
        /* epoll_wait() or epoll_pwait() failed, try the other system call. */
        assert(no_epoll_wait == 0 || no_epoll_pwait == 0);
        continue;
      }

      if (errno != EINTR)
        abort();

      if (timeout == -1)
        continue;

      if (timeout == 0)
        return;

      /* Interrupted by a signal. Update timeout and poll again. */
      goto update_timeout;
    }

    have_signals = 0;
    nevents = 0;

    assert(loop->watchers != NULL);
    loop->watchers[loop->nwatchers] = (void*) events;
    loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds;
    for (i = 0; i < nfds; i++) {
      pe = events + i;
      // (*pe).data
      fd = pe->data;

      /* Skip invalidated events, see uv__platform_invalidate_fd */
      if (fd == -1)
        continue;

      assert(fd >= 0);
      assert((unsigned) fd < loop->nwatchers);

      w = loop->watchers[fd];

      if (w == NULL) {
        /* File descriptor that we've stopped watching, disarm it.
         *
         * Ignore all errors because we may be racing with another thread
         * when the file descriptor is closed.
         */
        // 从红黑树中删除fd
        uv__epoll_ctl(loop->backend_fd, UV__EPOLL_CTL_DEL, fd, pe);
        continue;
      }

      /* Give users only events they're interested in. Prevents spurious
       * callbacks when previous callback invocation in this loop has stopped
       * the current watcher. Also, filters out events that users has not
       * requested us to watch.
       */
      pe->events &= w->pevents | POLLERR | POLLHUP;

      /* Work around an epoll quirk where it sometimes reports just the
       * EPOLLERR or EPOLLHUP event.  In order to force the event loop to
       * move forward, we merge in the read/write events that the watcher
       * is interested in; uv__read() and uv__write() will then deal with
       * the error or hangup in the usual fashion.
       *
       * Note to self: happens when epoll reports EPOLLIN|EPOLLHUP, the user
       * reads the available data, calls uv_read_stop(), then sometime later
       * calls uv_read_start() again.  By then, libuv has forgotten about the
       * hangup and the kernel won't report EPOLLIN again because there's
       * nothing left to read.  If anything, libuv is to blame here.  The
       * current hack is just a quick bandaid; to properly fix it, libuv
       * needs to remember the error/hangup event.  We should get that for
       * free when we switch over to edge-triggered I/O.
       */
      if (pe->events == POLLERR || pe->events == POLLHUP)
        pe->events |= w->pevents & (POLLIN | POLLOUT | UV__POLLPRI);

      if (pe->events != 0) {
        /* Run signal watchers last.  This also affects child process watchers
         * because those are implemented in terms of signal watchers.
         */
        if (w == &loop->signal_io_watcher)
          have_signals = 1;
        else
          // uv__async_io, uv__async_start中的uv__io_init注册
          w->cb(loop, w, pe->events);

        nevents++;
      }
    }

    if (have_signals != 0)
      loop->signal_io_watcher.cb(loop, &loop->signal_io_watcher, POLLIN);

    loop->watchers[loop->nwatchers] = NULL;
    loop->watchers[loop->nwatchers + 1] = NULL;

    if (have_signals != 0)
      return;  /* Event loop should cycle now so don't poll again. */

    if (nevents != 0) {
      if (nfds == ARRAY_SIZE(events) && --count != 0) {
        /* Poll for more events but don't block this time. */
        timeout = 0;
        continue;
      }
      return;
    }

    if (timeout == 0)
      return;

    if (timeout == -1)
      continue;

update_timeout:
    assert(timeout > 0);

    real_timeout -= (loop->time - base);
    if (real_timeout <= 0)
      return;

    timeout = real_timeout;
  }
}

这里主要做了如下几件事:

1.取出loop->watcher_queue中所有对象的uv__io_t handle(w),调用调用uv__epoll_ctl来监听w.fd
2.循环阻塞调用uv__epoll_pwait,其返回当时需要处理的事件数目
3.如果当前没有要处理的事件,检查是否超时
4.如果有需要处理的事件,那么从loop->watchers根据相应的fd取出uv__io_t handle w,调用w.cb()执行其对应的回调

这里需要注意的有以下几点:

loop->backend_fd

uv__epoll_ctl(loop->backend_fd, op, w->fd, &e),了解epoll的同学都会知道这里loop->backend_fd在内核高速缓冲区,用来表示当前这个epoll在所在红黑树的起点。

其在uv__platform_loop_init中被赋值,代码如下:

fd = uv__epoll_create1(UV__EPOLL_CLOEXEC);

loop->watchers

epoll通过调用uv__epoll_pwait来获取需要处理事件的数据,参数events用来从内核得到事件的集合,这也是epoll的优势之一(共享内存的方式)。我们从events中取出相应的fd,然后根据fd从loop->watchers中取出handle并执行起callback,那么loop->watchers是如何初始化的呢?

void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  assert(0 == (events & ~(POLLIN | POLLOUT | UV__POLLRDHUP | UV__POLLPRI)));
  assert(0 != events);
  assert(w->fd >= 0);
  assert(w->fd < INT_MAX);

  w->pevents |= events;
  maybe_resize(loop, w->fd + 1);

#if !defined(__sun)
  /* The event ports backend needs to rearm all file descriptors on each and
   * every tick of the event loop but the other backends allow us to
   * short-circuit here if the event mask is unchanged.
   */
  if (w->events == w->pevents)
    return;
#endif

  if (QUEUE_EMPTY(&w->watcher_queue))
    QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);

  if (loop->watchers[w->fd] == NULL) {
    loop->watchers[w->fd] = w;
    loop->nfds++;
  }
}

其在uv__io_start中被初始化,loop->watchers是一个数组类型,其index用来表示uv__iot handle中的fd,这样我们根据fd可以轻松的找出其uv\_io_t handle。

uv__io_start在多处被用到,包括uv__async_start中调用uv__io_start来监听线程间通信用到的fd,还有在tcp、udp模块中都有用其监听fd。

我们可以看出,IO事件都会调用 uv__io_start 函数,该函数将需要监听的事件保存到 event loop的watcher_queue队列中

超时

我们发现uv__io_poll其实是阻塞的,为了解决阻塞的问题,在调用的时候加入了timeout参数,timeout参数表示距离下一个timer需要执行(超过了timer的timeout)的时间,当没有要处理的事件时,会根据进入uv__io_poll时的事件来计算是否需要break。update_timeout的代码如下:

assert(timeout > 0);

real_timeout -= (loop->time - base);
if (real_timeout <= 0)
  return;

timeout = real_timeout;

线程池实现文件异步I/O

Libuv的文件I/O是基于线程池来实现的,大致原理是主线程提交任务到任务队列,发送信号给线程池,线程池中的worker收到信号,从任务队列中取出任务并执行,工作线程执行完任务后,将任务对应uv_async_t handle的pending状态置0,通过fd通知主线程(该 fd 同样由epoll管理),主线程监听该fd,当有epoll事件时,执行非pending的uv_async_t handle对应的回调,然后根据层层回调,最终会调用到用户注册的回调函数

说到线程池,几乎所有线程池的实现都遵循如下模型,也就是任务队列+线程池的模型,libuv的实现也是基于此。

libuv中任务队列基于一个双向链表,其中的任务的struct声明如下:

struct uv__work {
  void (*work)(struct uv__work *w);
  void (*done)(struct uv__work *w, int status);
  struct uv_loop_s* loop;
  void* wq[2];
};

我们可以看到,其中work代表线程池实际要做的工作,done代表任务执行后的callback,wq数组为两个指针,分别指向任务队列中的前后节点。

下面我们首先看一下主线程如何提交任务到任务队列:

首先在fs.c中有这样一段逻辑,其中所有的文件操作都会调用POST,代码如下:

#define POST                                                                  \
  do {                                                                        \
    if (cb != NULL) {                                                         \
      uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done);        \
      return 0;                                                               \
    }                                                                         \
    else {                                                                    \
      // 回调为 null 是同步调用                                                  \
      uv__fs_work(&req->work_req);                                            \
      return req->result;                                                     \
    }                                                                         \
  }                                                                           \
  while (0)

// 操作完成后的回调函数
static void uv__fs_done(struct uv__work* w, int status) {
  uv_fs_t* req;

  req = container_of(w, uv_fs_t, work_req);
  uv__req_unregister(req->loop, req);

  if (status == -ECANCELED) {
    assert(req->result == 0);
    req->result = -ECANCELED;
  }

  req->cb(req);  // 调用用户注册的回调
}

POST宏中调用了uv__work_submit将任务提交到队列,下面我们看下uv__work_submit的代码:

void uv__work_submit(uv_loop_t* loop,
                     struct uv__work* w,
                     void (*work)(struct uv__work* w),
                     void (*done)(struct uv__work* w, int status)) {
  uv_once(&once, init_once);
  w->loop = loop;
  w->work = work;
  w->done = done;
  post(&w->wq);
}

这里主要做了两件事:

1.初始化线程池,这里利用了&once,来保证只执行一次,在这里我们也可以看出,libuv中的线程池是在第一次使用时被初始化
2.post提交

uv__work_submit这块涉及的逻辑如下:

static void init_once(void) {
  unsigned int i;
  const char* val;
  uv_sem_t sem;

  // UV_THREADPOOL_SIZE决定线程池中线程的数量
  nthreads = ARRAY_SIZE(default_threads);
  val = getenv("UV_THREADPOOL_SIZE");
  if (val != NULL)
    nthreads = atoi(val);
  if (nthreads == 0)
    nthreads = 1;
  if (nthreads > MAX_THREADPOOL_SIZE)
    nthreads = MAX_THREADPOOL_SIZE;

  threads = default_threads;
  if (nthreads > ARRAY_SIZE(default_threads)) {
    threads = uv__malloc(nthreads * sizeof(threads[0]));
    if (threads == NULL) {
      nthreads = ARRAY_SIZE(default_threads);
      threads = default_threads;
    }
  }

  if (uv_cond_init(&cond))
    abort();

  if (uv_mutex_init(&mutex))
    abort();

  QUEUE_INIT(&wq);

  if (uv_sem_init(&sem, 0))
    abort();

  for (i = 0; i < nthreads; i++)
    if (uv_thread_create(threads + i, worker, &sem))
      abort();

  for (i = 0; i < nthreads; i++)
    uv_sem_wait(&sem);

  uv_sem_destroy(&sem);
}

/* To avoid deadlock with uv_cancel() it's crucial that the worker
 * never holds the global mutex and the loop-local mutex at the same time.
 */
static void worker(void* arg) {
  struct uv__work* w;
  QUEUE* q;

  uv_sem_post((uv_sem_t*) arg);
  arg = NULL;

  for (;;) {
    uv_mutex_lock(&mutex);

    while (QUEUE_EMPTY(&wq)) {
      idle_threads += 1;
      uv_cond_wait(&cond, &mutex);
      idle_threads -= 1;
    }

    q = QUEUE_HEAD(&wq);

    if (q == &exit_message)
      uv_cond_signal(&cond);
    else {
      QUEUE_REMOVE(q);
      QUEUE_INIT(q);  /* Signal uv_cancel() that the work req is
                             executing. */
    }

    uv_mutex_unlock(&mutex);

    if (q == &exit_message)
      break;

    w = QUEUE_DATA(q, struct uv__work, wq);
    w->work(w);

    uv_mutex_lock(&w->loop->wq_mutex);
    w->work = NULL;  /* Signal uv_cancel() that the work req is done
                        executing. */
    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
    uv_async_send(&w->loop->wq_async);
    uv_mutex_unlock(&w->loop->wq_mutex);
  }
}

static void post(QUEUE* q) {
  uv_mutex_lock(&mutex);
  QUEUE_INSERT_TAIL(&wq, q);
  if (idle_threads > 0)
    uv_cond_signal(&cond);
  uv_mutex_unlock(&mutex);
}

这里需要关注的有以下几点:

1.init_once关键代码其实就是获取线程池中线程的数量并创建对应数量的线程,每个线程中执行worker函数,
2.线程池中线程数量从UV_THREADPOOL_SIZE环境变量中获取,默认是4
3.在worker中,工作线程等待cond信号,如果有,则取任务队列中的任务来执行,执行后调用uv_async_send通知主线程,后面会详细介绍uv\_async\_send
4.post方法用来将wq插入到任务队列,并发出信号

我们再来看下工作线程执行完任务后是如何通知主线程的,也就是上述的uv_async_send方法:

int uv_async_send(uv_async_t* handle) {
  /* Do a cheap read first. */
  if (ACCESS_ONCE(int, handle->pending) != 0)
    return 0;

  if (cmpxchgi(&handle->pending, 0, 1) == 0)
    uv__async_send(&handle->loop->async_watcher);

  return 0;
}

void uv__async_send(struct uv__async* wa) {
  const void* buf;
  ssize_t len;
  int fd;
  int r;

  buf = "";
  len = 1;
  fd = wa->wfd;

#if defined(__linux__)
  if (fd == -1) {
    static const uint64_t val = 1;
    buf = &val;
    len = sizeof(val);
    fd = wa->io_watcher.fd;  /* eventfd */
  }
#endif

  do
    r = write(fd, buf, len);
  while (r == -1 && errno == EINTR);

  if (r == len)
    return;

  if (r == -1)
    if (errno == EAGAIN || errno == EWOULDBLOCK)
      return;

  abort();
}

这里主要做了如下几件事:

1.将uv_async_t handle(也就是&w->loop->wq_async)的pending状态码置0,代表执行完毕
2.调用uv__async_send方法,向handle->loop->async_watcher->io_watcher.fd写入一个空字节(主线程epoll会监听到)

当主线程监听到async_watcher->io_watcher.fd的变化后,通过层层回调,最终调用uv__work的done函数,也就是用户注册的回调。这部分我们首先从前向后看下回调的注册:

// async.c
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
  int err;

  err = uv__async_start(loop);
  if (err)
    return err;

  uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
  handle->async_cb = async_cb;
  handle->pending = 0;

  // 加入到async_handles上
  QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
  uv__handle_start(handle);

  return 0;
}

// async.c
// 将loop->async_io_watcher.fd加入loop->watcher_queue监听
static int uv__async_start(uv_loop_t* loop) {
  int pipefd[2];
  int err;

  if (loop->async_io_watcher.fd != -1)
    return 0;

  err = uv__async_eventfd();
  if (err >= 0) {
    pipefd[0] = err;
    pipefd[1] = -1;
  }
  else if (err == UV_ENOSYS) {
    err = uv__make_pipe(pipefd, UV__F_NONBLOCK);
#if defined(__linux__)
    /* Save a file descriptor by opening one of the pipe descriptors as
     * read/write through the procfs.  That file descriptor can then
     * function as both ends of the pipe.
     */
    if (err == 0) {
      char buf[32];
      int fd;

      snprintf(buf, sizeof(buf), "/proc/self/fd/%d", pipefd[0]);
      fd = uv__open_cloexec(buf, O_RDWR);
      if (fd >= 0) {
        uv__close(pipefd[0]);
        uv__close(pipefd[1]);
        pipefd[0] = fd;
        pipefd[1] = fd;
      }
    }
#endif
  }

  if (err < 0)
    return err;

  // 注册 async io 事件的 callback 为 uv__async_io
  // loop->async_io_watcher注册fd等
  uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
  // 将该 io_watcher 添加到 loop->watcher_queue, epoll会取出
  uv__io_start(loop, &loop->async_io_watcher, POLLIN);
  loop->async_wfd = pipefd[1];

  return 0;
}

// core.c
void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd) {
  assert(cb != NULL);
  assert(fd >= -1);
  QUEUE_INIT(&w->pending_queue);
  QUEUE_INIT(&w->watcher_queue);
  w->cb = cb;
  w->fd = fd;
  w->events = 0;
  w->pevents = 0;

#if defined(UV_HAVE_KQUEUE)
  w->rcount = 0;
  w->wcount = 0;
#endif /* defined(UV_HAVE_KQUEUE) */
}

// core.c
void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  assert(0 == (events & ~(POLLIN | POLLOUT | UV__POLLRDHUP | UV__POLLPRI)));
  assert(0 != events);
  assert(w->fd >= 0);
  assert(w->fd < INT_MAX);

  w->pevents |= events;
  maybe_resize(loop, w->fd + 1);

#if !defined(__sun)
  /* The event ports backend needs to rearm all file descriptors on each and
   * every tick of the event loop but the other backends allow us to
   * short-circuit here if the event mask is unchanged.
   */
  if (w->events == w->pevents)
    return;
#endif

  if (QUEUE_EMPTY(&w->watcher_queue))
    QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);

  if (loop->watchers[w->fd] == NULL) {
    loop->watchers[w->fd] = w;
    loop->nfds++;
  }
}

这块按照执行顺序做了如下几件事:

1.uv_loop_init中调用uv_async_init初始化loop->async_io_watcher.fd, 同时将loop->async_io_watcher加入到loop->async_handles中
2.uv__async_start调用uv__io_init和uv__io_start
3.uv__io_init注册 async io 事件的 callback 为 uv__async_io,并在loop->async_io_watcher上注册fd
4.uv__io_start将loop->async_io_watcher.fd加入loop->watcher_queue供epoll监听,同时在loop->watchers中通过fd注册loop->async_io_watcher

现在我们来梳理下当主线程接收到事件后,如何层层回调,最终执行uv__work的done即用户提交的回调函数。

在uv__io_poll方法中,通过uv__epoll_pwait监听到时间后,会执行loop->watchers取出uv__io_start中注册的uv__io_t(也就是上面注册的loop->async_iowatcher),然后执行其注册的回调(uv\_async_io)。

uv__async_io代码如下:

static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  char buf[1024];
  ssize_t r;
  QUEUE queue;
  QUEUE* q;
  uv_async_t* h;

  assert(w == &loop->async_io_watcher);

  // 将在uv__async_send()中向fd中写入的数据取干净
  for (;;) {
    r = read(w->fd, buf, sizeof(buf));

    if (r == sizeof(buf))
      continue;

    if (r != -1)
      break;

    if (errno == EAGAIN || errno == EWOULDBLOCK)
      break;

    if (errno == EINTR)
      continue;

    abort();
  }

  // 执行loop->async_handles里的回调函数
  QUEUE_MOVE(&loop->async_handles, &queue);
  while (!QUEUE_EMPTY(&queue)) {
    q = QUEUE_HEAD(&queue);
    h = QUEUE_DATA(q, uv_async_t, queue);

    QUEUE_REMOVE(q);
    QUEUE_INSERT_TAIL(&loop->async_handles, q);

    // h->pending == 0
    if (cmpxchgi(&h->pending, 1, 0) == 0)
      continue;

    if (h->async_cb == NULL)
      continue;

    h->async_cb(h);
  }
}

这里主要做了两件事:

1.将在uv__async_send()中向fd中写入的数据取干净
2.执行loop->async_handles中,pending状态码为0的handle的回调函数(async_cb),其async_cb就是我们再uv_loop_init中调用uv_async_init注册的uv__work_done方法,在其中最终调用了用户注册的回调。

总结

由于Node.js异步I/O依赖libuv,libuv的核心又是event loop,本文主要介绍了event loop的流程以及线程池的实现。