Open vislee opened 6 years ago
早在nginx version 1.1.4 提供了keepalive指令,支持upstream的长链接。如果不配置该指令,ngx和ups每次请求都会建立一个新的tcp连接,请求结束后关闭。
nginx的模块化和异步回调可以让nginx支持模块来实现一些新功能。该功能是ngx_http_upstream_keepalive_module模块提供的,在src/http/modules/ngx_http_upstream_keepalive_module.c文件中。
在指令解析阶段通过,keepalive指令对应的回调函数ngx_http_upstream_keepalive来潜入ngx中。在该函数中,用该模块的结构体把原来的回调函数保存下来。把该模块的功能函数赋值。
// 保存长链接 typedef struct { ngx_http_upstream_keepalive_srv_conf_t *conf; ngx_queue_t queue; ngx_connection_t *connection; socklen_t socklen; ngx_sockaddr_t sockaddr; } ngx_http_upstream_keepalive_cache_t;
static char * ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { ngx_http_upstream_srv_conf_t *uscf; ngx_http_upstream_keepalive_srv_conf_t *kcf = conf; ngx_int_t n; ngx_str_t *value; if (kcf->max_cached) { return "is duplicate"; } /* read options */ value = cf->args->elts; n = ngx_atoi(value[1].data, value[1].len); if (n == NGX_ERROR || n == 0) { ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "invalid value \"%V\" in \"%V\" directive", &value[1], &cmd->name); return NGX_CONF_ERROR; } kcf->max_cached = n; uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module); // 保存原回调函数, 和指令顺序有关系。 kcf->original_init_upstream = uscf->peer.init_upstream ? uscf->peer.init_upstream : ngx_http_upstream_init_round_robin; // 该模块嵌入执行的开始,该函数在ngx_http_upstream_init_main_conf函数中被调用。 uscf->peer.init_upstream = ngx_http_upstream_init_keepalive; return NGX_CONF_OK; } // 该函数也是在启动阶段执行 static ngx_int_t ngx_http_upstream_init_keepalive(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *us) { ngx_uint_t i; ngx_http_upstream_keepalive_srv_conf_t *kcf; ngx_http_upstream_keepalive_cache_t *cached; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, cf->log, 0, "init keepalive"); kcf = ngx_http_conf_upstream_srv_conf(us, ngx_http_upstream_keepalive_module); // 调用原来的初始化函数 if (kcf->original_init_upstream(cf, us) != NGX_OK) { return NGX_ERROR; } // 保存原来上游节点初始化函数 kcf->original_init_peer = us->peer.init; // us->peer.init = ngx_http_upstream_init_keepalive_peer; /* allocate cache items and add to free queue */ cached = ngx_pcalloc(cf->pool, sizeof(ngx_http_upstream_keepalive_cache_t) * kcf->max_cached); if (cached == NULL) { return NGX_ERROR; } // 初始化空闲链接缓存链表 ngx_queue_init(&kcf->cache); // 初始化空闲缓存链表 ngx_queue_init(&kcf->free); for (i = 0; i < kcf->max_cached; i++) { ngx_queue_insert_head(&kcf->free, &cached[i].queue); cached[i].conf = kcf; } return NGX_OK; }
当请求执行时,在proxy模块的请求阶段注册的回调函数ngx_http_proxy_handler中,读取完请求body调用了ngx_http_upstream_init函数开始初始化上游链接,发送请求内容,接收解析请求结果等。
在ngx_http_upstream_init函数中调用ngx_http_upstream_init_request初始化上游的请求。首先会根据配置找到一组上游服务,即一个upstream配置块。找到以后会调用上游的初始化函数,即:uscf->peer.init。 因在upstream块配置了keepalive指令,在指令解析该回调函数被改为ngx_http_upstream_init_keepalive_peer。
static ngx_int_t ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r, ngx_http_upstream_srv_conf_t *us) { ngx_http_upstream_keepalive_peer_data_t *kp; ngx_http_upstream_keepalive_srv_conf_t *kcf; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "init keepalive peer"); kcf = ngx_http_conf_upstream_srv_conf(us, ngx_http_upstream_keepalive_module); kp = ngx_palloc(r->pool, sizeof(ngx_http_upstream_keepalive_peer_data_t)); if (kp == NULL) { return NGX_ERROR; } if (kcf->original_init_peer(r, us) != NGX_OK) { return NGX_ERROR; } kp->conf = kcf; kp->upstream = r->upstream; kp->data = r->upstream->peer.data; // 保存了上游服务器地址 kp->original_get_peer = r->upstream->peer.get; // 原获取上游服务器ip的函数 kp->original_free_peer = r->upstream->peer.free; // 覆盖上游服务器地址结构体,和上游服务器ip获取函数等。 r->upstream->peer.data = kp; r->upstream->peer.get = ngx_http_upstream_get_keepalive_peer; r->upstream->peer.free = ngx_http_upstream_free_keepalive_peer; #if (NGX_HTTP_SSL) kp->original_set_session = r->upstream->peer.set_session; kp->original_save_session = r->upstream->peer.save_session; r->upstream->peer.set_session = ngx_http_upstream_keepalive_set_session; r->upstream->peer.save_session = ngx_http_upstream_keepalive_save_session; #endif return NGX_OK; } // 获取一个上游地址的回调函数 // 在ngx_event_connect_peer函数中被调用 // 如果返回NGX_OK就继续执行建立到上游的tcp链接 // 返回NGX_DONE 就直接返回,不需要建立tcp链接。 static ngx_int_t ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data) { ngx_http_upstream_keepalive_peer_data_t *kp = data; ngx_http_upstream_keepalive_cache_t *item; ngx_int_t rc; ngx_queue_t *q, *cache; ngx_connection_t *c; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, "get keepalive peer"); /* ask balancer */ // 调用原来获取上游服务器ip的函数 // 如果没有配置负载均衡策略的,则该函数默认是round robin模块赋值的ngx_http_upstream_get_round_robin_peer函数 rc = kp->original_get_peer(pc, kp->data); if (rc != NGX_OK) { return rc; } /* search cache for suitable connection */ cache = &kp->conf->cache; for (q = ngx_queue_head(cache); q != ngx_queue_sentinel(cache); q = ngx_queue_next(q)) { item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue); c = item->connection; if (ngx_memn2cmp((u_char *) &item->sockaddr, (u_char *) pc->sockaddr, item->socklen, pc->socklen) == 0) { ngx_queue_remove(q); // 从cache链表删除 // 添加到free链表里 ngx_queue_insert_head(&kp->conf->free, q); // 从缓存的空闲链接链表找到了对应地址的链接 goto found; } } return NGX_OK; found: ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0, "get keepalive peer: using connection %p", c); c->idle = 0; c->sent = 0; c->log = pc->log; c->read->log = pc->log; c->write->log = pc->log; c->pool->log = pc->log; pc->connection = c; pc->cached = 1; return NGX_DONE; } // 释放上游回调函数,在ngx_http_upstream_finalize_request函数中被调用。 static void ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data, ngx_uint_t state) { ngx_http_upstream_keepalive_peer_data_t *kp = data; ngx_http_upstream_keepalive_cache_t *item; ngx_queue_t *q; ngx_connection_t *c; ngx_http_upstream_t *u; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, "free keepalive peer"); /* cache valid connections */ u = kp->upstream; c = pc->connection; if (state & NGX_PEER_FAILED || c == NULL || c->read->eof // 上游关闭链接 || c->read->error || c->read->timedout || c->write->error || c->write->timedout) { goto invalid; } if (!u->keepalive) { // 不需要支持长链接的 goto invalid; } if (!u->request_body_sent) { // 请求body没有转发到上游的 goto invalid; } if (ngx_terminate || ngx_exiting) { // nginx work进程需要退出的 goto invalid; } if (ngx_handle_read_event(c->read, 0) != NGX_OK) { // 可读事件添加成边沿触发的,监听上游关闭操作 goto invalid; } ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0, "free keepalive peer: saving connection %p", c); if (ngx_queue_empty(&kp->conf->free)) { q = ngx_queue_last(&kp->conf->cache); ngx_queue_remove(q); item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue); ngx_http_upstream_keepalive_close(item->connection); } else { q = ngx_queue_head(&kp->conf->free); ngx_queue_remove(q); item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue); } ngx_queue_insert_head(&kp->conf->cache, q); item->connection = c; // 这一步赋值NULL是非常有意义的 // 否则在ngx_http_upstream_finalize_request函数中还会关闭链接的 pc->connection = NULL; if (c->read->timer_set) { c->read->delayed = 0; ngx_del_timer(c->read); } if (c->write->timer_set) { ngx_del_timer(c->write); } c->write->handler = ngx_http_upstream_keepalive_dummy_handler; // 空闲时上游可读回调函数 c->read->handler = ngx_http_upstream_keepalive_close_handler; c->data = item; c->idle = 1; c->log = ngx_cycle->log; c->read->log = ngx_cycle->log; c->write->log = ngx_cycle->log; c->pool->log = ngx_cycle->log; item->socklen = pc->socklen; ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen); if (c->read->ready) { ngx_http_upstream_keepalive_close_handler(c->read); } invalid: kp->original_free_peer(pc, kp->data, state); } // 空闲时上游可读回调函数 // 一般情况该链接被使用,回调函数会被覆盖 static void ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev) { ngx_http_upstream_keepalive_srv_conf_t *conf; ngx_http_upstream_keepalive_cache_t *item; int n; char buf[1]; ngx_connection_t *c; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0, "keepalive close handler"); c = ev->data; if (c->close) { goto close; } // 读一个字节,检查结果。 // 使用了MSG_PEEK 该字节还会保存在套接字缓存区不会被删掉,下次读还可以读到。 n = recv(c->fd, buf, 1, MSG_PEEK); if (n == -1 && ngx_socket_errno == NGX_EAGAIN) { ev->ready = 0; // 如果是资源暂时不可用,则继续添加到可读回调中等待后续可读回调的触发 if (ngx_handle_read_event(c->read, 0) != NGX_OK) { goto close; } return; } close: item = c->data; conf = item->conf; // 关闭链接 ngx_http_upstream_keepalive_close(c); // 缓存结构体放回到free链表中 ngx_queue_remove(&item->queue); ngx_queue_insert_head(&conf->free, &item->queue); }
upstream keepalive 模块在释放上游服务地址回调函数中把完好的链接保存到链表中,在下一个请求获取上游服务器地址回调函数中,根据选中的ip从链表中取得链接。从而避免了关闭tcp链接的4次握手和新建链接的3次握手,在作为前端的代理服务器新能提升应该是很明显的。
唯一的缺点是这个缓存池是一个链表,查找需要一个一个的遍历,不过好在链表长度小,影响应该不大。
概述
早在nginx version 1.1.4 提供了keepalive指令,支持upstream的长链接。如果不配置该指令,ngx和ups每次请求都会建立一个新的tcp连接,请求结束后关闭。
源码分析
nginx的模块化和异步回调可以让nginx支持模块来实现一些新功能。该功能是ngx_http_upstream_keepalive_module模块提供的,在src/http/modules/ngx_http_upstream_keepalive_module.c文件中。
指令解析
在指令解析阶段通过,keepalive指令对应的回调函数ngx_http_upstream_keepalive来潜入ngx中。在该函数中,用该模块的结构体把原来的回调函数保存下来。把该模块的功能函数赋值。
请求执行
当请求执行时,在proxy模块的请求阶段注册的回调函数ngx_http_proxy_handler中,读取完请求body调用了ngx_http_upstream_init函数开始初始化上游链接,发送请求内容,接收解析请求结果等。
在ngx_http_upstream_init函数中调用ngx_http_upstream_init_request初始化上游的请求。首先会根据配置找到一组上游服务,即一个upstream配置块。找到以后会调用上游的初始化函数,即:uscf->peer.init。 因在upstream块配置了keepalive指令,在指令解析该回调函数被改为ngx_http_upstream_init_keepalive_peer。
总结
upstream keepalive 模块在释放上游服务地址回调函数中把完好的链接保存到链表中,在下一个请求获取上游服务器地址回调函数中,根据选中的ip从链表中取得链接。从而避免了关闭tcp链接的4次握手和新建链接的3次握手,在作为前端的代理服务器新能提升应该是很明显的。
唯一的缺点是这个缓存池是一个链表,查找需要一个一个的遍历,不过好在链表长度小,影响应该不大。