Open vislee opened 7 years ago
nginx的master进程和worker进程之间通过管道来通信,master控制worker进程的重启等等。 master除了通过管道发送了控制信息,还通过管道把描述符传递给子进程。 这样做的结果是所有的worker进程都拥有其他进程的管道的fd[0],最后启动的工作进程是通过fork直接继承父进程的。
linux系统下,子进程会自动继承父进程已经打开的描述符。实际应用中可能需要子进程把后来打开的描述符传递回父进程。或者也有可能把描述符传递到一个无关的进程中。linux下是提供了这种机制的。
首先需要在两个进程之间建立一个unix域套接字作为传递消息的通道,然后发送进程调用sendmsg向通道发送一个特殊的消息,然后接收进程调用recvmsg从通道接收消息,从而打开描述符。 先来看下sendmsg和recvmsg的原型以及数据结构:
ssize_t sendmsg(int socket, const struct msghdr *message, int flags); ssize_t recvmsg(int socket, struct msghdr *message, int flags); struct msghdr { // 套接字地址成员,只有通道是数据包套接字才需要 // msg_name 指向要发送或是接收信息的套接口地址,msg_namelen指明了这个地址的长度 void *msg_name; socklen_t msg_namelen; // 数据缓存区 // msg_iov 是iovec类型的数组,msg_iovlen数组的长度 // 同readv和sendv函数 struct iovec *msg_iov; size_t msg_iovlen; // 附属数据缓存区成员 // msg_control 指向附属数据缓存区,msg_controllen指明了缓存区的长度 void *msg_control; size_t msg_controllen; // 接收信息标记位 int msg_flags; }; struct iovec { ptr_t iov_base; /* Starting address */ size_t iov_len; /* Length in bytes */ }; // 附属数据结构 struct cmsghdr { // 附属数据的字节数,包含结构头。由CMSG_LEN()宏计算。 socklen_t cmsg_len; // 表明了原始的协议级别 ( 例如, SOL_SOCKET) int cmsg_level; // 表明了控制信息类型。例如, SCM_RIGHTS ,附属数据对象是文件描述符 // SCM_CREDENTIALS ,附属数据对象是一个包含证书信息的结构 int cmsg_type; /* followed by u_char cmsg_data[]; */ };
为了便于移植linux系统也提供了一些宏来处理cmsghdr结构体。
typedef struct { ngx_uint_t command; ngx_pid_t pid; ngx_int_t slot; ngx_fd_t fd; } ngx_channel_t; ngx_int_t ngx_write_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size, ngx_log_t *log) { ssize_t n; ngx_err_t err; struct iovec iov[1]; struct msghdr msg; #if (NGX_HAVE_MSGHDR_MSG_CONTROL) union { struct cmsghdr cm; char space[CMSG_SPACE(sizeof(int))]; } cmsg; if (ch->fd == -1) { msg.msg_control = NULL; msg.msg_controllen = 0; } else { msg.msg_control = (caddr_t) &cmsg; msg.msg_controllen = sizeof(cmsg); ngx_memzero(&cmsg, sizeof(cmsg)); cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int)); cmsg.cm.cmsg_level = SOL_SOCKET; cmsg.cm.cmsg_type = SCM_RIGHTS; /* * We have to use ngx_memcpy() instead of simple * *(int *) CMSG_DATA(&cmsg.cm) = ch->fd; * because some gcc 4.4 with -O2/3/s optimization issues the warning: * dereferencing type-punned pointer will break strict-aliasing rules * * Fortunately, gcc with -O1 compiles this ngx_memcpy() * in the same simple assignment as in the code above */ ngx_memcpy(CMSG_DATA(&cmsg.cm), &ch->fd, sizeof(int)); } msg.msg_flags = 0; #else // 旧的系统使用msg_accrights来传递描述符 if (ch->fd == -1) { msg.msg_accrights = NULL; msg.msg_accrightslen = 0; } else { msg.msg_accrights = (caddr_t) &ch->fd; msg.msg_accrightslen = sizeof(int); } #endif iov[0].iov_base = (char *) ch; iov[0].iov_len = size; msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; n = sendmsg(s, &msg, 0); if (n == -1) { err = ngx_errno; if (err == NGX_EAGAIN) { return NGX_AGAIN; } ngx_log_error(NGX_LOG_ALERT, log, err, "sendmsg() failed"); return NGX_ERROR; } return NGX_OK; } ngx_int_t ngx_read_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size, ngx_log_t *log) { ssize_t n; ngx_err_t err; struct iovec iov[1]; struct msghdr msg; #if (NGX_HAVE_MSGHDR_MSG_CONTROL) union { struct cmsghdr cm; char space[CMSG_SPACE(sizeof(int))]; } cmsg; #else int fd; #endif iov[0].iov_base = (char *) ch; iov[0].iov_len = size; msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; msg.msg_iovlen = 1; #if (NGX_HAVE_MSGHDR_MSG_CONTROL) msg.msg_control = (caddr_t) &cmsg; msg.msg_controllen = sizeof(cmsg); #else msg.msg_accrights = (caddr_t) &fd; msg.msg_accrightslen = sizeof(int); #endif n = recvmsg(s, &msg, 0); if (n == -1) { err = ngx_errno; if (err == NGX_EAGAIN) { return NGX_AGAIN; } ngx_log_error(NGX_LOG_ALERT, log, err, "recvmsg() failed"); return NGX_ERROR; } if (n == 0) { ngx_log_debug0(NGX_LOG_DEBUG_CORE, log, 0, "recvmsg() returned zero"); return NGX_ERROR; } if ((size_t) n < sizeof(ngx_channel_t)) { ngx_log_error(NGX_LOG_ALERT, log, 0, "recvmsg() returned not enough data: %z", n); return NGX_ERROR; } #if (NGX_HAVE_MSGHDR_MSG_CONTROL) if (ch->command == NGX_CMD_OPEN_CHANNEL) { if (cmsg.cm.cmsg_len < (socklen_t) CMSG_LEN(sizeof(int))) { ngx_log_error(NGX_LOG_ALERT, log, 0, "recvmsg() returned too small ancillary data"); return NGX_ERROR; } if (cmsg.cm.cmsg_level != SOL_SOCKET || cmsg.cm.cmsg_type != SCM_RIGHTS) { ngx_log_error(NGX_LOG_ALERT, log, 0, "recvmsg() returned invalid ancillary data " "level %d or type %d", cmsg.cm.cmsg_level, cmsg.cm.cmsg_type); return NGX_ERROR; } /* ch->fd = *(int *) CMSG_DATA(&cmsg.cm); */ ngx_memcpy(&ch->fd, CMSG_DATA(&cmsg.cm), sizeof(int)); } if (msg.msg_flags & (MSG_TRUNC|MSG_CTRUNC)) { ngx_log_error(NGX_LOG_ALERT, log, 0, "recvmsg() truncated data"); } #else if (ch->command == NGX_CMD_OPEN_CHANNEL) { if (msg.msg_accrightslen != sizeof(int)) { ngx_log_error(NGX_LOG_ALERT, log, 0, "recvmsg() returned no ancillary data"); return NGX_ERROR; } ch->fd = fd; } #endif return n; }
nginx会调用ngx_start_worker_processes函数启动worker进程,每fork出一个worker进程,就调用ngx_pass_open_channel把为该worker进程打开的管道的fd[0]发送给其他worker进程。
代码如下:
static void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type) { ngx_int_t i; ngx_channel_t ch; ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start worker processes"); ngx_memzero(&ch, sizeof(ngx_channel_t)); ch.command = NGX_CMD_OPEN_CHANNEL; for (i = 0; i < n; i++) { // 循环调用启动worker进程 ngx_spawn_process(cycle, ngx_worker_process_cycle, (void *) (intptr_t) i, "worker process", type); ch.pid = ngx_processes[ngx_process_slot].pid; ch.slot = ngx_process_slot; ch.fd = ngx_processes[ngx_process_slot].channel[0]; // 每启动一个进程都会把该进程对应给master的管道的一个描述符发送给其他的worker进程 ngx_pass_open_channel(cycle, &ch); } } static void ngx_pass_open_channel(ngx_cycle_t *cycle, ngx_channel_t *ch) { ngx_int_t i; // 为什么只会把传递给早于当前worker进程的其他worker进程, // 因为晚fork的进程会在fork的时候继承父进程的这部分信息,所以不用传递。 // 最后fork出的worker进程拥有父进程所拥有的所有信息。 // ngx_last_process fork进程数+1。 for (i = 0; i < ngx_last_process; i++) { // ngx_process_slot 当前进程数组下标 if (i == ngx_process_slot || ngx_processes[i].pid == -1 || ngx_processes[i].channel[0] == -1) { continue; } ngx_log_debug6(NGX_LOG_DEBUG_CORE, cycle->log, 0, "pass channel s:%i pid:%P fd:%d to s:%i pid:%P fd:%d", ch->slot, ch->pid, ch->fd, i, ngx_processes[i].pid, ngx_processes[i].channel[0]); /* TODO: NGX_AGAIN */ // 向所有早于当前worker进程fork出的其他worker进程发送描述符 ngx_write_channel(ngx_processes[i].channel[0], ch, sizeof(ngx_channel_t), cycle->log); } }
其他worker进程的运行函数时ngx_worker_process_cycle,在启动时会调用ngx_worker_process_init函数初始化,在该函数的最后几行为管道描述符添加了可读事件的回调函数ngx_channel_handler。 上述所说父进程调用ngx_write_channel写到管道的信息会被ngx_channel_handler函数处理。
if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT, ngx_channel_handler) == NGX_ERROR) { /* fatal */ exit(2); } static void ngx_channel_handler(ngx_event_t *ev) { ngx_int_t n; ngx_channel_t ch; ngx_connection_t *c; if (ev->timedout) { ev->timedout = 0; return; } c = ev->data; ngx_log_debug0(NGX_LOG_DEBUG_CORE, ev->log, 0, "channel handler"); for ( ;; ) { n = ngx_read_channel(c->fd, &ch, sizeof(ngx_channel_t), ev->log); ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, "channel: %i", n); if (n == NGX_ERROR) { if (ngx_event_flags & NGX_USE_EPOLL_EVENT) { ngx_del_conn(c, 0); } ngx_close_connection(c); return; } if (ngx_event_flags & NGX_USE_EVENTPORT_EVENT) { if (ngx_add_event(ev, NGX_READ_EVENT, 0) == NGX_ERROR) { return; } } // 管道为空 if (n == NGX_AGAIN) { return; } ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, "channel command: %ui", ch.command); switch (ch.command) { case NGX_CMD_QUIT: ngx_quit = 1; break; case NGX_CMD_TERMINATE: ngx_terminate = 1; break; case NGX_CMD_REOPEN: ngx_reopen = 1; break; case NGX_CMD_OPEN_CHANNEL: ngx_log_debug3(NGX_LOG_DEBUG_CORE, ev->log, 0, "get channel s:%i pid:%P fd:%d", ch.slot, ch.pid, ch.fd); // 父进程通过unix域传递过来的其他子进程的信息和描述符等 ngx_processes[ch.slot].pid = ch.pid; ngx_processes[ch.slot].channel[0] = ch.fd; break; case NGX_CMD_CLOSE_CHANNEL: ngx_log_debug4(NGX_LOG_DEBUG_CORE, ev->log, 0, "close channel s:%i pid:%P our:%P fd:%d", ch.slot, ch.pid, ngx_processes[ch.slot].pid, ngx_processes[ch.slot].channel[0]); if (close(ngx_processes[ch.slot].channel[0]) == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "close() channel failed"); } ngx_processes[ch.slot].channel[0] = -1; break; } } }
概述
nginx的master进程和worker进程之间通过管道来通信,master控制worker进程的重启等等。 master除了通过管道发送了控制信息,还通过管道把描述符传递给子进程。 这样做的结果是所有的worker进程都拥有其他进程的管道的fd[0],最后启动的工作进程是通过fork直接继承父进程的。
知识回顾
linux系统下,子进程会自动继承父进程已经打开的描述符。实际应用中可能需要子进程把后来打开的描述符传递回父进程。或者也有可能把描述符传递到一个无关的进程中。linux下是提供了这种机制的。
首先需要在两个进程之间建立一个unix域套接字作为传递消息的通道,然后发送进程调用sendmsg向通道发送一个特殊的消息,然后接收进程调用recvmsg从通道接收消息,从而打开描述符。 先来看下sendmsg和recvmsg的原型以及数据结构:
为了便于移植linux系统也提供了一些宏来处理cmsghdr结构体。
nginx中的封装
nginx中描述符的传递
nginx会调用ngx_start_worker_processes函数启动worker进程,每fork出一个worker进程,就调用ngx_pass_open_channel把为该worker进程打开的管道的fd[0]发送给其他worker进程。
代码如下:
其他worker进程的运行函数时ngx_worker_process_cycle,在启动时会调用ngx_worker_process_init函数初始化,在该函数的最后几行为管道描述符添加了可读事件的回调函数ngx_channel_handler。 上述所说父进程调用ngx_write_channel写到管道的信息会被ngx_channel_handler函数处理。