EasonYou / my-blog

It's my blog recording front-end
2 stars 0 forks source link

【node源码】net源码阅读 #15

Open EasonYou opened 4 years ago

EasonYou commented 4 years ago

net

net模块用于创建基于TCP或IPC的服务端或客户端。

这里有一个net创建TCP服务器的基本例子,以此执行服务端和客户端即可在终端看到输出。

// 创建socket服务器
const net = require('net');
let clients = 0;

const server = net.createServer(client => {
  clients++;
  let clientId = clients;
  console.log('Client connect:', clientId);

  client.on('end', () => {
    console.log('Client disconnected:', clientId);
  });
  console.log(client)
  client.write('Welcome client: ' + clientId + ' \r\n');
  client.pipe(client); // 把客户端的数据返回给客户端
});

server.listen(8000, () => {
  console.log('Server started on port 8000');
});

// 创建socket客户端
const net = require('net');

const client = net.connect(8000);

client.on('data', data => {
  console.log('data>>>', data.toString());
});

client.on('end', () => {
  console.log('Client disconnected');
});

源码

net.createServer

通过上面的例子,我可以看到,服务端是通过net.createServer这个工厂函数来进行创建的。在源码中,是通过new了一个Sever实例来进行创建的。

Server方法做了几件很简单的事情

function createServer(options, connectionListener) {
  return new Server(options, connectionListener);
}

function Server(options, connectionListener) {
  // ...继承ee
  EventEmitter.call(this);

  if (typeof options === 'function') {
    // 这里前后都省略一些代码,主要做的是监听connection事件以及初始化options
    this.on('connection', connectionListener);
    options = { ...options };
  } else { /** ... */}
  // 默认_connections为零
  this._connections = 0;
  // 劫持connections属性,对其get/set方法做定制化
  ObjectDefineProperty(this, 'connections', {
    get: deprecate(() => {
      return this._connections;
    }),
    set: deprecate((val) => (this._connections = val)),
    configurable: true, enumerable: false
  });
  // ...
}

在这里可以看到,实例的创建非常简单,但是这样还不能创建一个TCP服务,还需要对端口进行监听。

从源码里可以看到,listen方法做了各种各样的判断条件,以适配各种各样入参条件。这些我们都可以忽略,最重要的是看到了一个方法,listenInCluster。不管什么样的情况都会调用到这个方法,即使是lookupAndListen方法,内部也是listenInCluster

Server.prototype.listen = function(...args) {
  // ...
  // 监听listening事件
  if (cb !== null) {
    this.once('listening', cb);
  }
  // ...
  if (options instanceof TCP) {
    // ...
    listenInCluster(this, null, -1, -1, backlogFromArgs);
    return this;
  }

  if (typeof options.fd === 'number' && options.fd >= 0) {
    listenInCluster(this, null, null, null, backlogFromArgs, options.fd);
    return this;
  }

  let backlog;
  if (typeof options.port === 'number' || typeof options.port === 'string') {
    // ...
    if (options.host) {
      lookupAndListen(this, options.port | 0, options.host, backlog,
                      options.exclusive, flags);
    } else {
      listenInCluster(this, null, options.port | 0, 4,
                      backlog, undefined, options.exclusive);
    }
    return this;
  }
  if (options.path && isPipeName(options.path)) {
    // ...
    listenInCluster(this, pipeName, -1, -1,
                    backlog, undefined, options.exclusive);

    if (!this._handle) {
      // Failed and an error shall be emitted in the next tick.
      // Therefore, we directly return.
      return this;
    }
    // ...
    return this;
  }
  // ...
};

那么接下来看看listenInCluster方法做了些什么事情

function listenInCluster(server, address, port, addressType,
  backlog, fd, exclusive, flags) {
  if (cluster === undefined) cluster = require('cluster');

  if (cluster.isMaster || exclusive) {
    // 如果是主进程,直接进行监听
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }
  // 否则需要在额外处理集群的监听方法,这里先不做深入研究
  cluster._getServer(server, serverQuery, listenOnMasterHandle);
  function listenOnMasterHandle(err, handle) {
    // ...
    server._listen2(address, port, addressType, backlog, fd, flags);
  }
  // ...
}

listenInCluster在主进程调用了_listen2方法,并且处理了在集群中的的监听事件,这里cluster做了什么先不去追究,只要知道最后在回调方法里,也是调用了_listen2方法即可。cluster的会单独在cluster上去做研究。

下面来看下_listen2方法

// 实际上调用的是setupListenHandle方法
Server.prototype._listen2 = setupListenHandle;

function setupListenHandle(address, port, addressType, backlog, fd, flags) {
  if (this._handle) { /** */} else {
    let rval = null;
    if (!address && typeof fd !== 'number') {
      rval = createServerHandle(DEFAULT_IPV6_ADDR, port, 6, fd, flags);
      // ...
    }

    if (rval === null)
      rval = createServerHandle(address, port, addressType, fd, flags);
    // ...
    this._handle = rval; // 绑定返回的句柄实例
  }

  // 绑定onconnection方法
  this._handle.onconnection = onconnection;
  // ...
  // 生成一个唯一的key,作为连接的唯一标识
  this._connectionKey = addressType + ':' + address + ':' + port;
  // ...
}

setupListenHandle方法它通过createServerHandle创建了一个服务句柄,并绑定在了实例上,这里来看下createServerHandle的内部实现

function createServerHandle(address, port, addressType, fd, flags) {
    // ...
  if (typeof fd === 'number' && fd >= 0) {
    // 当有传入文件标识符的时候,监听其文件描述符
    try {
      handle = createHandle(fd, true);
    } catch (e) { /** ... */}
        // ...
  } else if (port === -1 && addressType === -1) {
    // 当port为-1且addressType为-1的时候,创建创建 UNIX domain socket 或 Windows pipe 服务器
    handle = new Pipe(PipeConstants.SERVER);
        // ...
  } else {
    // tcp服务
    handle = new TCP(TCPConstants.SERVER);
    isTCP = true;
  }

  if (address || port || isTCP) {
    debug('bind to', address || 'any');
    if (!address) {
      // 尝试绑定ipv6
      err = handle.bind6(DEFAULT_IPV6_ADDR, port, flags);
      if (err) {
        handle.close();
        // 回调绑定ipv4
        return createServerHandle(DEFAULT_IPV4_ADDR, port);
      }
    } else if (addressType === 6) {
      err = handle.bind6(address, port, flags); // ipv6
    } else {
      err = handle.bind(address, port); // ipv4
    }
  }
    // ...
  return handle;
}

通过入参判断是否是监听文件标识符还是管道服务还是TCP,然后通过address判断是否要用默认地址,再判断用ipv4还是ipv6

默认的IP地址在上面有定义

const DEFAULT_IPV4_ADDR = '0.0.0.0';
const DEFAULT_IPV6_ADDR = '::';

setupListenHandle方法中,有这么一段代码,用于触发connection事件

function onconnection(err, clientHandle) {
  // ... 
  const socket = new Socket({
    handle: clientHandle,
    allowHalfOpen: self.allowHalfOpen,
    pauseOnCreate: self.pauseOnConnect,
    readable: true,
    writable: true
  });
  // ...
  self.emit('connection', socket);
}

// socket内部不再赘述,这里只点出socket是一个双工流的实例
function Socket(options) {
  // ...
  // 继承双工流
    stream.Duplex.call(this, options);
  // ...
}

从这里可以看到,socket其实是一个双工流,用于监听和实现流的读写。这个方法的引用,在c++的内建模块,这里不再深入赘述

总结

最后总结下net的整体流程

结语

在核心模块中,JavaScript对服务的处理比较浅薄,只是在TCP层上封装了监听事件,最终调用的还是c++的内建模块来实现服务的监听,后面有机会,可以再往内建模块继续深入研究。