Open vislee opened 7 years ago
upstream是nginx向上游发起tcp请求的一种机制。在nginx中有很多有用的模块都用到了该机制,例如proxy模块,memcache模块等。
upstream模块提供了两个配置指令:upstream和server来指定上游服务器地址。
upstream机制的实现是在ngx_http_upstream.h|c, ngx_http_upstream_round_robin.h|c 这几个文件中。 同时使用upstream的模块(如proxy)也会初始化和调用ups模块的一些变量和函数,先从程序启动解析配置指令开始。调用逻辑在ngx_http_block函数中。
typedef struct { ngx_hash_t headers_in_hash; ngx_array_t upstreams; // upstream数组,多组上游服务器 /* ngx_http_upstream_srv_conf_t */ } ngx_http_upstream_main_conf_t;
typedef struct { // ups 机制相关的配置,常常被使用模块定义并配置。 ngx_http_upstream_conf_t upstream; ngx_array_t *body_flushes; ngx_array_t *body_lengths; ngx_array_t *body_values; ngx_str_t body_source; ngx_http_proxy_headers_t headers; #if (NGX_HTTP_CACHE) ngx_http_proxy_headers_t headers_cache; #endif ngx_array_t *headers_source; ngx_array_t *proxy_lengths; ngx_array_t *proxy_values; ngx_array_t *redirects; ngx_array_t *cookie_domains; ngx_array_t *cookie_paths; ngx_http_complex_value_t *method; ngx_str_t location; ngx_str_t url; #if (NGX_HTTP_CACHE) ngx_http_complex_value_t cache_key; #endif ngx_http_proxy_vars_t vars; ngx_flag_t redirect; ngx_uint_t http_version; ngx_uint_t headers_hash_max_size; ngx_uint_t headers_hash_bucket_size; #if (NGX_HTTP_SSL) ngx_uint_t ssl; ngx_uint_t ssl_protocols; ngx_str_t ssl_ciphers; ngx_uint_t ssl_verify_depth; ngx_str_t ssl_trusted_certificate; ngx_str_t ssl_crl; ngx_str_t ssl_certificate; ngx_str_t ssl_certificate_key; ngx_array_t *ssl_passwords; #endif } ngx_http_proxy_loc_conf_t;
static ngx_int_t ngx_http_upstream_add_variables(ngx_conf_t *cf) { ngx_http_variable_t *var, *v; for (v = ngx_http_upstream_vars; v->name.len; v++) { var = ngx_http_add_variable(cf, &v->name, v->flags); if (var == NULL) { return NGX_ERROR; } var->get_handler = v->get_handler; var->data = v->data; } return NGX_OK; }
// 对应一组上游服务器,也就是一个upstream指令 struct ngx_http_upstream_srv_conf_s { ngx_http_upstream_peer_t peer; // 负载均衡策略相关 void **srv_conf; ngx_array_t *servers; /* ngx_http_upstream_server_t */ // server 指令对应的配置,实际的上游服务器的地址 ngx_uint_t flags; ngx_str_t host; // 一组上游服务器的名称 u_char *file_name; ngx_uint_t line; in_port_t port; ngx_uint_t no_port; /* unsigned no_port:1 */ #if (NGX_HTTP_UPSTREAM_ZONE) ngx_shm_zone_t *shm_zone; #endif }; // 上游服务器地址 typedef struct { ngx_str_t name; ngx_addr_t *addrs; ngx_uint_t naddrs; ngx_uint_t weight; ngx_uint_t max_conns; ngx_uint_t max_fails; time_t fail_timeout; ngx_msec_t slow_start; unsigned down:1; unsigned backup:1; NGX_COMPAT_BEGIN(6) NGX_COMPAT_END } ngx_http_upstream_server_t; static char * ngx_http_upstream(ngx_conf_t *cf, ngx_command_t *cmd, void *dummy) { char *rv; void *mconf; ngx_str_t *value; ngx_url_t u; ngx_uint_t m; ngx_conf_t pcf; ngx_http_module_t *module; ngx_http_conf_ctx_t *ctx, *http_ctx; ngx_http_upstream_srv_conf_t *uscf; ngx_memzero(&u, sizeof(ngx_url_t)); value = cf->args->elts; u.host = value[1]; // name 一组上游服务器的名称。 u.no_resolve = 1; u.no_port = 1; // 分配ngx_http_upstream_srv_conf_t结构体添加到umcf->upstreams数组 uscf = ngx_http_upstream_add(cf, &u, NGX_HTTP_UPSTREAM_CREATE |NGX_HTTP_UPSTREAM_WEIGHT |NGX_HTTP_UPSTREAM_MAX_CONNS |NGX_HTTP_UPSTREAM_MAX_FAILS |NGX_HTTP_UPSTREAM_FAIL_TIMEOUT |NGX_HTTP_UPSTREAM_DOWN |NGX_HTTP_UPSTREAM_BACKUP); if (uscf == NULL) { return NGX_CONF_ERROR; } // http模块下每个block配置都会对应一个这样的上下文 // 指明所属block的模块配置 和 本block的模块配置 ctx = ngx_pcalloc(cf->pool, sizeof(ngx_http_conf_ctx_t)); if (ctx == NULL) { return NGX_CONF_ERROR; } http_ctx = cf->ctx; // main_conf 指向所属的http{} ctx->main_conf = http_ctx->main_conf; /* the upstream{}'s srv_conf */ ctx->srv_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_http_max_module); if (ctx->srv_conf == NULL) { return NGX_CONF_ERROR; } // upstream 模块srv级别创建配置文件结构体的函数指针为空,所以再下边数组赋值不会被覆盖掉 ctx->srv_conf[ngx_http_upstream_module.ctx_index] = uscf; uscf->srv_conf = ctx->srv_conf; /* the upstream{}'s loc_conf */ ctx->loc_conf = ngx_pcalloc(cf->pool, sizeof(void *) * ngx_http_max_module); if (ctx->loc_conf == NULL) { return NGX_CONF_ERROR; } for (m = 0; cf->cycle->modules[m]; m++) { if (cf->cycle->modules[m]->type != NGX_HTTP_MODULE) { continue; } module = cf->cycle->modules[m]->ctx; if (module->create_srv_conf) { mconf = module->create_srv_conf(cf); if (mconf == NULL) { return NGX_CONF_ERROR; } // srv_conf 数组包含了所有模块的srv级别的配置 ctx->srv_conf[cf->cycle->modules[m]->ctx_index] = mconf; } if (module->create_loc_conf) { mconf = module->create_loc_conf(cf); if (mconf == NULL) { return NGX_CONF_ERROR; } // loc_conf 数组包含了所有模块的loc级别的配置 ctx->loc_conf[cf->cycle->modules[m]->ctx_index] = mconf; } } uscf->servers = ngx_array_create(cf->pool, 4, sizeof(ngx_http_upstream_server_t)); if (uscf->servers == NULL) { return NGX_CONF_ERROR; } /* parse inside upstream{} */ pcf = *cf; cf->ctx = ctx; cf->cmd_type = NGX_HTTP_UPS_CONF; // 解析NGX_HTTP_UPS_CONF级别的配置 // 该级别的配置指令有server,配置上游服务器地址的。有ip_hash 配置负载均衡策略的。 rv = ngx_conf_parse(cf, NULL); *cf = pcf; if (rv != NGX_CONF_OK) { return rv; } if (uscf->servers->nelts == 0) { ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "no servers are inside upstream"); return NGX_CONF_ERROR; } return rv; }
static char * ngx_http_upstream_server(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { ngx_http_upstream_srv_conf_t *uscf = conf; time_t fail_timeout; ngx_str_t *value, s; ngx_url_t u; ngx_int_t weight, max_conns, max_fails; ngx_uint_t i; ngx_http_upstream_server_t *us; us = ngx_array_push(uscf->servers); if (us == NULL) { return NGX_CONF_ERROR; } ngx_memzero(us, sizeof(ngx_http_upstream_server_t)); value = cf->args->elts; weight = 1; max_conns = 0; max_fails = 1; fail_timeout = 10; for (i = 2; i < cf->args->nelts; i++) { if (ngx_strncmp(value[i].data, "weight=", 7) == 0) { if (!(uscf->flags & NGX_HTTP_UPSTREAM_WEIGHT)) { goto not_supported; } weight = ngx_atoi(&value[i].data[7], value[i].len - 7); if (weight == NGX_ERROR || weight == 0) { goto invalid; } continue; } if (ngx_strncmp(value[i].data, "max_conns=", 10) == 0) { if (!(uscf->flags & NGX_HTTP_UPSTREAM_MAX_CONNS)) { goto not_supported; } max_conns = ngx_atoi(&value[i].data[10], value[i].len - 10); if (max_conns == NGX_ERROR) { goto invalid; } continue; } if (ngx_strncmp(value[i].data, "max_fails=", 10) == 0) { if (!(uscf->flags & NGX_HTTP_UPSTREAM_MAX_FAILS)) { goto not_supported; } max_fails = ngx_atoi(&value[i].data[10], value[i].len - 10); if (max_fails == NGX_ERROR) { goto invalid; } continue; } if (ngx_strncmp(value[i].data, "fail_timeout=", 13) == 0) { if (!(uscf->flags & NGX_HTTP_UPSTREAM_FAIL_TIMEOUT)) { goto not_supported; } s.len = value[i].len - 13; s.data = &value[i].data[13]; fail_timeout = ngx_parse_time(&s, 1); if (fail_timeout == (time_t) NGX_ERROR) { goto invalid; } continue; } if (ngx_strcmp(value[i].data, "backup") == 0) { if (!(uscf->flags & NGX_HTTP_UPSTREAM_BACKUP)) { goto not_supported; } us->backup = 1; continue; } if (ngx_strcmp(value[i].data, "down") == 0) { if (!(uscf->flags & NGX_HTTP_UPSTREAM_DOWN)) { goto not_supported; } us->down = 1; continue; } goto invalid; } ngx_memzero(&u, sizeof(ngx_url_t)); u.url = value[1]; // 上游服务器ip或域名 u.default_port = 80; // 域名解析 if (ngx_parse_url(cf->pool, &u) != NGX_OK) { if (u.err) { ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "%s in upstream \"%V\"", u.err, &u.url); } return NGX_CONF_ERROR; } us->name = u.url; us->addrs = u.addrs; // 上游服务器ip地址的个数 us->naddrs = u.naddrs; // 上游服务器ip地址 us->weight = weight; // 权重 us->max_conns = max_conns; us->max_fails = max_fails; us->fail_timeout = fail_timeout; return NGX_CONF_OK; invalid: ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "invalid parameter \"%V\"", &value[i]); return NGX_CONF_ERROR; not_supported: ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "balancing method does not support parameter \"%V\"", &value[i]); return NGX_CONF_ERROR; }
// ups机制的配置,通常被使用模块定义并配置。 // proxy 模块ngx_http_proxy_loc_conf_t结构体就引用了 typedef struct { ngx_http_upstream_srv_conf_t *upstream; ngx_msec_t connect_timeout; ngx_msec_t send_timeout; ngx_msec_t read_timeout; ngx_msec_t next_upstream_timeout; size_t send_lowat; size_t buffer_size; size_t limit_rate; size_t busy_buffers_size; size_t max_temp_file_size; size_t temp_file_write_size; size_t busy_buffers_size_conf; size_t max_temp_file_size_conf; size_t temp_file_write_size_conf; ngx_bufs_t bufs; ngx_uint_t ignore_headers; ngx_uint_t next_upstream; ngx_uint_t store_access; ngx_uint_t next_upstream_tries; ngx_flag_t buffering; ngx_flag_t request_buffering; ngx_flag_t pass_request_headers; ngx_flag_t pass_request_body; ngx_flag_t ignore_client_abort; ngx_flag_t intercept_errors; ngx_flag_t cyclic_temp_file; ngx_flag_t force_ranges; ngx_path_t *temp_path; ngx_hash_t hide_headers_hash; ngx_array_t *hide_headers; ngx_array_t *pass_headers; ngx_http_upstream_local_t *local; #if (NGX_HTTP_CACHE) ngx_shm_zone_t *cache_zone; ngx_http_complex_value_t *cache_value; ngx_uint_t cache_min_uses; ngx_uint_t cache_use_stale; ngx_uint_t cache_methods; off_t cache_max_range_offset; ngx_flag_t cache_lock; ngx_msec_t cache_lock_timeout; ngx_msec_t cache_lock_age; ngx_flag_t cache_revalidate; ngx_flag_t cache_convert_head; ngx_flag_t cache_background_update; ngx_array_t *cache_valid; ngx_array_t *cache_bypass; ngx_array_t *cache_purge; ngx_array_t *no_cache; #endif ngx_array_t *store_lengths; ngx_array_t *store_values; #if (NGX_HTTP_CACHE) signed cache:2; #endif signed store:2; unsigned intercept_404:1; unsigned change_buffering:1; #if (NGX_HTTP_SSL || NGX_COMPAT) ngx_ssl_t *ssl; ngx_flag_t ssl_session_reuse; ngx_http_complex_value_t *ssl_name; ngx_flag_t ssl_server_name; ngx_flag_t ssl_verify; #endif ngx_str_t module; NGX_COMPAT_BEGIN(2) NGX_COMPAT_END } ngx_http_upstream_conf_t; static char * ngx_http_proxy_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { ngx_http_proxy_loc_conf_t *plcf = conf; size_t add; u_short port; ngx_str_t *value, *url; ngx_url_t u; ngx_uint_t n; ngx_http_core_loc_conf_t *clcf; ngx_http_script_compile_t sc; if (plcf->upstream.upstream || plcf->proxy_lengths) { return "is duplicate"; } clcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module); // 配置了proxy_pass的location的 content阶段的处理函数 clcf->handler = ngx_http_proxy_handler; if (clcf->name.data[clcf->name.len - 1] == '/') { clcf->auto_redirect = 1; } value = cf->args->elts; // proxy_pass 配置的url url = &value[1]; n = ngx_http_script_variables_count(url); if (n) { ngx_memzero(&sc, sizeof(ngx_http_script_compile_t)); sc.cf = cf; sc.source = url; sc.lengths = &plcf->proxy_lengths; sc.values = &plcf->proxy_values; sc.variables = n; sc.complete_lengths = 1; sc.complete_values = 1; if (ngx_http_script_compile(&sc) != NGX_OK) { return NGX_CONF_ERROR; } #if (NGX_HTTP_SSL) plcf->ssl = 1; #endif return NGX_CONF_OK; } if (ngx_strncasecmp(url->data, (u_char *) "http://", 7) == 0) { add = 7; port = 80; } else if (ngx_strncasecmp(url->data, (u_char *) "https://", 8) == 0) { #if (NGX_HTTP_SSL) plcf->ssl = 1; add = 8; port = 443; #else ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "https protocol requires SSL support"); return NGX_CONF_ERROR; #endif } else { ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "invalid URL prefix"); return NGX_CONF_ERROR; } ngx_memzero(&u, sizeof(ngx_url_t)); u.url.len = url->len - add; u.url.data = url->data + add; u.default_port = port; u.uri_part = 1; u.no_resolve = 1; // 一组上游服务器 配置文件中的一个upstream name {...} plcf->upstream.upstream = ngx_http_upstream_add(cf, &u, 0); if (plcf->upstream.upstream == NULL) { return NGX_CONF_ERROR; } plcf->vars.schema.len = add; // schema = http/https plcf->vars.schema.data = url->data; plcf->vars.key_start = plcf->vars.schema; // host_header = u->host port = 80/443 // key_start 包含了schema + host // 还有uri的设置,ngx_http_proxy_create_request函数中用到 ngx_http_proxy_set_vars(&u, &plcf->vars); plcf->location = clcf->name; if (clcf->named #if (NGX_PCRE) || clcf->regex #endif || clcf->noname) { if (plcf->vars.uri.len) { ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "\"proxy_pass\" cannot have URI part in " "location given by regular expression, " "or inside named location, " "or inside \"if\" statement, " "or inside \"limit_except\" block"); return NGX_CONF_ERROR; } plcf->location.len = 0; } plcf->url = *url; return NGX_CONF_OK; }
static char * ngx_http_upstream_init_main_conf(ngx_conf_t *cf, void *conf) { ngx_http_upstream_main_conf_t *umcf = conf; ngx_uint_t i; ngx_array_t headers_in; ngx_hash_key_t *hk; ngx_hash_init_t hash; ngx_http_upstream_init_pt init; ngx_http_upstream_header_t *header; ngx_http_upstream_srv_conf_t **uscfp; // 对应一个upstream{} 配置块 uscfp = umcf->upstreams.elts; for (i = 0; i < umcf->upstreams.nelts; i++) { // 负载均衡策略从此开始介入,默认为round robin。 init = uscfp[i]->peer.init_upstream ? uscfp[i]->peer.init_upstream: ngx_http_upstream_init_round_robin; if (init(cf, uscfp[i]) != NGX_OK) { return NGX_CONF_ERROR; } } /* upstream_headers_in_hash */ if (ngx_array_init(&headers_in, cf->temp_pool, 32, sizeof(ngx_hash_key_t)) != NGX_OK) { return NGX_CONF_ERROR; } for (header = ngx_http_upstream_headers_in; header->name.len; header++) { hk = ngx_array_push(&headers_in); if (hk == NULL) { return NGX_CONF_ERROR; } hk->key = header->name; hk->key_hash = ngx_hash_key_lc(header->name.data, header->name.len); hk->value = header; } hash.hash = &umcf->headers_in_hash; hash.key = ngx_hash_key_lc; hash.max_size = 512; hash.bucket_size = ngx_align(64, ngx_cacheline_size); hash.name = "upstream_headers_in_hash"; hash.pool = cf->pool; hash.temp_pool = NULL; if (ngx_hash_init(&hash, headers_in.elts, headers_in.nelts) != NGX_OK) { return NGX_CONF_ERROR; } return NGX_CONF_OK; }
默认的round robin负载均衡策略初始化函数。
typedef struct { ngx_http_upstream_init_pt init_upstream; ngx_http_upstream_init_peer_pt init; void *data; } ngx_http_upstream_peer_t; typedef struct ngx_http_upstream_rr_peer_s ngx_http_upstream_rr_peer_t; struct ngx_http_upstream_rr_peer_s { struct sockaddr *sockaddr; socklen_t socklen; ngx_str_t name; ngx_str_t server; ngx_int_t current_weight; ngx_int_t effective_weight; ngx_int_t weight; // 当前连接数 ngx_http_upstream_get_round_robin_peer 函数增加 ngx_uint_t conns; ngx_uint_t max_conns; // 最大连接数 ngx_uint_t fails; // 失败的次数 time_t accessed; time_t checked; // 上游节点被选中的时间 ngx_uint_t max_fails; // 最大失败次数 time_t fail_timeout; // 失败最长下线时间 ngx_msec_t slow_start; ngx_msec_t start_time; ngx_uint_t down; // 标志下线 #if (NGX_HTTP_SSL || NGX_COMPAT) void *ssl_session; int ssl_session_len; #endif #if (NGX_HTTP_UPSTREAM_ZONE) ngx_atomic_t lock; #endif ngx_http_upstream_rr_peer_t *next; NGX_COMPAT_BEGIN(32) NGX_COMPAT_END }; typedef struct ngx_http_upstream_rr_peers_s ngx_http_upstream_rr_peers_t; struct ngx_http_upstream_rr_peers_s { ngx_uint_t number; // 上游服务器ip个数 #if (NGX_HTTP_UPSTREAM_ZONE) ngx_slab_pool_t *shpool; ngx_atomic_t rwlock; ngx_http_upstream_rr_peers_t *zone_next; #endif ngx_uint_t total_weight; unsigned single:1; // 是否是一个上游服务器,包括backup的服务器 unsigned weighted:1; // 是否加权 ngx_str_t *name; // 一组ups的名称 ngx_http_upstream_rr_peers_t *next; // backup 的server ngx_http_upstream_rr_peer_t *peer; // 实际的上游服务器地址 }; ngx_int_t ngx_http_upstream_init_round_robin(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *us) { ngx_url_t u; ngx_uint_t i, j, n, w; ngx_http_upstream_server_t *server; ngx_http_upstream_rr_peer_t *peer, **peerp; ngx_http_upstream_rr_peers_t *peers, *backup; us->peer.init = ngx_http_upstream_init_round_robin_peer; if (us->servers) { // 一组ups配置下的server server = us->servers->elts; n = 0; // 一组ups总的ip地址的个数 ,不包括backup的server。 w = 0; // 权重 for (i = 0; i < us->servers->nelts; i++) { if (server[i].backup) { continue; } n += server[i].naddrs; w += server[i].naddrs * server[i].weight; } if (n == 0) { ngx_log_error(NGX_LOG_EMERG, cf->log, 0, "no servers in upstream \"%V\" in %s:%ui", &us->host, us->file_name, us->line); return NGX_ERROR; } // 一个ups配置 peers = ngx_pcalloc(cf->pool, sizeof(ngx_http_upstream_rr_peers_t)); if (peers == NULL) { return NGX_ERROR; } // n 个上游服务器地址 peer = ngx_pcalloc(cf->pool, sizeof(ngx_http_upstream_rr_peer_t) * n); if (peer == NULL) { return NGX_ERROR; } peers->single = (n == 1); peers->number = n; peers->weighted = (w != n); peers->total_weight = w; peers->name = &us->host; n = 0; peerp = &peers->peer; for (i = 0; i < us->servers->nelts; i++) { // 遍历的server 指令配置的上游服务器 if (server[i].backup) { continue; } // 如果server配置的上游服务器ip,则naddrs为1 // 如果配置的是上游服务器域名,则naddrs为多个 for (j = 0; j < server[i].naddrs; j++) { peer[n].sockaddr = server[i].addrs[j].sockaddr; peer[n].socklen = server[i].addrs[j].socklen; peer[n].name = server[i].addrs[j].name; peer[n].weight = server[i].weight; peer[n].effective_weight = server[i].weight; peer[n].current_weight = 0; peer[n].max_conns = server[i].max_conns; peer[n].max_fails = server[i].max_fails; peer[n].fail_timeout = server[i].fail_timeout; peer[n].down = server[i].down; peer[n].server = server[i].name; *peerp = &peer[n]; peerp = &peer[n].next; n++; } } us->peer.data = peers; /* backup servers */ n = 0; w = 0; for (i = 0; i < us->servers->nelts; i++) { if (!server[i].backup) { continue; } n += server[i].naddrs; w += server[i].naddrs * server[i].weight; } if (n == 0) { return NGX_OK; } backup = ngx_pcalloc(cf->pool, sizeof(ngx_http_upstream_rr_peers_t)); if (backup == NULL) { return NGX_ERROR; } peer = ngx_pcalloc(cf->pool, sizeof(ngx_http_upstream_rr_peer_t) * n); if (peer == NULL) { return NGX_ERROR; } peers->single = 0; backup->single = 0; backup->number = n; backup->weighted = (w != n); backup->total_weight = w; backup->name = &us->host; n = 0; peerp = &backup->peer; for (i = 0; i < us->servers->nelts; i++) { if (!server[i].backup) { continue; } for (j = 0; j < server[i].naddrs; j++) { peer[n].sockaddr = server[i].addrs[j].sockaddr; peer[n].socklen = server[i].addrs[j].socklen; peer[n].name = server[i].addrs[j].name; peer[n].weight = server[i].weight; peer[n].effective_weight = server[i].weight; peer[n].current_weight = 0; peer[n].max_conns = server[i].max_conns; peer[n].max_fails = server[i].max_fails; peer[n].fail_timeout = server[i].fail_timeout; peer[n].down = server[i].down; peer[n].server = server[i].name; *peerp = &peer[n]; peerp = &peer[n].next; n++; } } peers->next = backup; return NGX_OK; } /* an upstream implicitly defined by proxy_pass, etc. */ if (us->port == 0) { ngx_log_error(NGX_LOG_EMERG, cf->log, 0, "no port in upstream \"%V\" in %s:%ui", &us->host, us->file_name, us->line); return NGX_ERROR; } ngx_memzero(&u, sizeof(ngx_url_t)); u.host = us->host; u.port = us->port; if (ngx_inet_resolve_host(cf->pool, &u) != NGX_OK) { if (u.err) { ngx_log_error(NGX_LOG_EMERG, cf->log, 0, "%s in upstream \"%V\" in %s:%ui", u.err, &us->host, us->file_name, us->line); } return NGX_ERROR; } n = u.naddrs; peers = ngx_pcalloc(cf->pool, sizeof(ngx_http_upstream_rr_peers_t)); if (peers == NULL) { return NGX_ERROR; } peer = ngx_pcalloc(cf->pool, sizeof(ngx_http_upstream_rr_peer_t) * n); if (peer == NULL) { return NGX_ERROR; } peers->single = (n == 1); peers->number = n; peers->weighted = 0; peers->total_weight = n; peers->name = &us->host; peerp = &peers->peer; for (i = 0; i < u.naddrs; i++) { peer[i].sockaddr = u.addrs[i].sockaddr; peer[i].socklen = u.addrs[i].socklen; peer[i].name = u.addrs[i].name; peer[i].weight = 1; peer[i].effective_weight = 1; peer[i].current_weight = 0; peer[i].max_conns = 0; peer[i].max_fails = 1; peer[i].fail_timeout = 10; *peerp = &peer[i]; peerp = &peer[i].next; } us->peer.data = peers; /* implicitly defined upstream has no backup servers */ return NGX_OK; }
struct ngx_peer_connection_s { ngx_connection_t *connection; struct sockaddr *sockaddr; // 调用get函数选的上游服务器ip。 socklen_t socklen; ngx_str_t *name; // 一个上游失败后可以尝试的次数,包括backup的上游。默认情况所有后端都会尝试 ngx_uint_t tries; ngx_msec_t start_time; // 获取上游ip地址 ngx_http_upstream_get_round_robin_peer ngx_event_get_peer_pt get; ngx_event_free_peer_pt free; ngx_event_notify_peer_pt notify; // 上游服务器地址 类型可变,一般ngx_http_upstream_rr_peer_data_t // 由ngx_http_upstream_init_round_robin_peer函数设置 void *data; #if (NGX_SSL || NGX_COMPAT) ngx_event_set_peer_session_pt set_session; ngx_event_save_peer_session_pt save_session; #endif ngx_addr_t *local; int type; int rcvbuf; ngx_log_t *log; unsigned cached:1; unsigned transparent:1; /* ngx_connection_log_error_e */ unsigned log_error:2; NGX_COMPAT_BEGIN(2) NGX_COMPAT_END }; // 记录连接一个上游服务器的信息 typedef struct { ngx_uint_t status; // 连接上游服务器状态 ngx_msec_t response_time; ngx_msec_t connect_time; ngx_msec_t header_time; // 接收并解析完header的耗时 off_t response_length; off_t bytes_received; // 已接收上游服务器数据长度 ngx_str_t *peer; // 上游服务器名称 } ngx_http_upstream_state_t; struct ngx_http_upstream_s { ngx_http_upstream_handler_pt read_event_handler; ngx_http_upstream_handler_pt write_event_handler; // 上游服务器的地址和负载均衡策略回调函数 ngx_peer_connection_t peer; ngx_event_pipe_t *pipe; ngx_chain_t *request_bufs; // 要发送到上游的请求内容 ngx_output_chain_ctx_t output; // 发送到上游回调函数 ngx_chain_writer_ctx_t writer; // 发送到上游回调函数的上下文参数 ngx_http_upstream_conf_t *conf; // plcf->upstream ngx_http_upstream_srv_conf_t *upstream; // 一组选好的上游服务器 #if (NGX_HTTP_CACHE) ngx_array_t *caches; #endif // 上游服务器返回的header // 该结构体的变量由ngx_http_upstream_headers_in数组中的handler赋值, // 由ngx_http_proxy_process_header函数回调。 ngx_http_upstream_headers_in_t headers_in; ngx_http_upstream_resolved_t *resolved; ngx_buf_t from_client; ngx_buf_t buffer; // 接收上游服务器返回缓存区 off_t length; ngx_chain_t *out_bufs; 从上游接受到的响应体,要发送到下游 ngx_chain_t *busy_bufs; ngx_chain_t *free_bufs; ngx_int_t (*input_filter_init)(void *data); ngx_int_t (*input_filter)(void *data, ssize_t bytes); // 从上游接收响应体回调函数 void *input_filter_ctx; #if (NGX_HTTP_CACHE) ngx_int_t (*create_key)(ngx_http_request_t *r); #endif ngx_int_t (*create_request)(ngx_http_request_t *r); // 组装发送到上游的请求的内容 ngx_int_t (*reinit_request)(ngx_http_request_t *r); ngx_int_t (*process_header)(ngx_http_request_t *r); void (*abort_request)(ngx_http_request_t *r); void (*finalize_request)(ngx_http_request_t *r, ngx_int_t rc); ngx_int_t (*rewrite_redirect)(ngx_http_request_t *r, ngx_table_elt_t *h, size_t prefix); ngx_int_t (*rewrite_cookie)(ngx_http_request_t *r, ngx_table_elt_t *h); ngx_msec_t timeout; // 指向原请求upstream_states数组中的一个元素,且每连接一个上游服务器会重新赋值 ngx_http_upstream_state_t *state; ngx_str_t method; ngx_str_t schema; ngx_str_t uri; #if (NGX_HTTP_SSL || NGX_COMPAT) ngx_str_t ssl_name; #endif ngx_http_cleanup_pt *cleanup; unsigned store:1; unsigned cacheable:1; unsigned accel:1; unsigned ssl:1; #if (NGX_HTTP_CACHE) unsigned cache_status:3; #endif unsigned buffering:1; unsigned keepalive:1; unsigned upgrade:1; unsigned request_sent:1; // 开始向上游发送请求 unsigned request_body_sent:1; // 请求成功发送到上游 unsigned header_sent:1; // resp 的header 已经发完到下游 }; static ngx_int_t ngx_http_proxy_handler(ngx_http_request_t *r) { ngx_int_t rc; ngx_http_upstream_t *u; ngx_http_proxy_ctx_t *ctx; ngx_http_proxy_loc_conf_t *plcf; #if (NGX_HTTP_CACHE) ngx_http_proxy_main_conf_t *pmcf; #endif // 创建和上游请求的结构体 if (ngx_http_upstream_create(r) != NGX_OK) { return NGX_HTTP_INTERNAL_SERVER_ERROR; } ctx = ngx_pcalloc(r->pool, sizeof(ngx_http_proxy_ctx_t)); if (ctx == NULL) { return NGX_HTTP_INTERNAL_SERVER_ERROR; } ngx_http_set_ctx(r, ctx, ngx_http_proxy_module); plcf = ngx_http_get_module_loc_conf(r, ngx_http_proxy_module); u = r->upstream; if (plcf->proxy_lengths == NULL) { ctx->vars = plcf->vars; u->schema = plcf->vars.schema; #if (NGX_HTTP_SSL) u->ssl = (plcf->upstream.ssl != NULL); #endif } else { if (ngx_http_proxy_eval(r, ctx, plcf) != NGX_OK) { return NGX_HTTP_INTERNAL_SERVER_ERROR; } } u->output.tag = (ngx_buf_tag_t) &ngx_http_proxy_module; u->conf = &plcf->upstream; #if (NGX_HTTP_CACHE) pmcf = ngx_http_get_module_main_conf(r, ngx_http_proxy_module); u->caches = &pmcf->caches; u->create_key = ngx_http_proxy_create_key; #endif u->create_request = ngx_http_proxy_create_request; u->reinit_request = ngx_http_proxy_reinit_request; u->process_header = ngx_http_proxy_process_status_line; u->abort_request = ngx_http_proxy_abort_request; u->finalize_request = ngx_http_proxy_finalize_request; r->state = 0; if (plcf->redirects) { u->rewrite_redirect = ngx_http_proxy_rewrite_redirect; } if (plcf->cookie_domains || plcf->cookie_paths) { u->rewrite_cookie = ngx_http_proxy_rewrite_cookie; } // 处理上游响应body是否有缓存区 u->buffering = plcf->upstream.buffering; // 有缓存区则需要pipe方式处理 u->pipe = ngx_pcalloc(r->pool, sizeof(ngx_event_pipe_t)); if (u->pipe == NULL) { return NGX_HTTP_INTERNAL_SERVER_ERROR; } // 有缓存区处理响应body回调函数 u->pipe->input_filter = ngx_http_proxy_copy_filter; u->pipe->input_ctx = r; // 该回调函数会根据上游响应头判断响应的body是否是chunked协议,从而赋值input_filter不同的处理函数。 u->input_filter_init = ngx_http_proxy_input_filter_init; // 无缓存区处理响应body回调函数 u->input_filter = ngx_http_proxy_non_buffered_copy_filter; u->input_filter_ctx = r; u->accel = 1; if (!plcf->upstream.request_buffering && plcf->body_values == NULL && plcf->upstream.pass_request_body && (!r->headers_in.chunked || plcf->http_version == NGX_HTTP_VERSION_11)) { // 不缓存请求body。 r->request_body_no_buffering = 1; } // 读取请求的body并启动ups rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init); if (rc >= NGX_HTTP_SPECIAL_RESPONSE) { return rc; } return NGX_DONE; }
static void ngx_http_upstream_init_request(ngx_http_request_t *r) { ngx_str_t *host; ngx_uint_t i; ngx_resolver_ctx_t *ctx, temp; ngx_http_cleanup_t *cln; ngx_http_upstream_t *u; ngx_http_core_loc_conf_t *clcf; ngx_http_upstream_srv_conf_t *uscf, **uscfp; ngx_http_upstream_main_conf_t *umcf; if (r->aio) { return; } u = r->upstream; #if (NGX_HTTP_CACHE) // 是否开启缓存。proxy_cache 指令配置。 if (u->conf->cache) { ngx_int_t rc; // 获取缓存目录结构 rc = ngx_http_upstream_cache(r, u); if (rc == NGX_BUSY) { r->write_event_handler = ngx_http_upstream_init_request; return; } r->write_event_handler = ngx_http_request_empty_handler; if (rc == NGX_ERROR) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } if (rc == NGX_OK) { rc = ngx_http_upstream_cache_send(r, u); if (rc == NGX_DONE) { return; } if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) { rc = NGX_DECLINED; r->cached = 0; } if (ngx_http_upstream_cache_background_update(r, u) != NGX_OK) { rc = NGX_ERROR; } } if (rc != NGX_DECLINED) { ngx_http_finalize_request(r, rc); return; } // 返回 NGX_DECLINED 关闭cache, bypass。 } #endif // proxy_store 指令配置,是否需要保存resp到磁盘文件。 u->store = u->conf->store; // 是否要检查客户端过早的关闭连接 if (!u->store && !r->post_action && !u->conf->ignore_client_abort) { r->read_event_handler = ngx_http_upstream_rd_check_broken_connection; r->write_event_handler = ngx_http_upstream_wr_check_broken_connection; } if (r->request_body) { // 请求的body u->request_bufs = r->request_body->bufs; } // http_proxy_module: ngx_http_proxy_create_request // 拼接发送上游的内容,写入r->u->request_bufs中。 if (u->create_request(r) != NGX_OK) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } // 设置向上游发起链接绑定的本地ip和端口,设置到u->peer.local if (ngx_http_upstream_set_local(r, u, u->conf->local) != NGX_OK) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); u->output.alignment = clcf->directio_alignment; u->output.pool = r->pool; u->output.bufs.num = 1; u->output.bufs.size = clcf->client_body_buffer_size; if (u->output.output_filter == NULL) { // 请求body发送到上游调用。ngx_http_upstream_send_request_body 的 ngx_output_chain u->output.output_filter = ngx_chain_writer; u->output.filter_ctx = &u->writer; } u->writer.pool = r->pool; if (r->upstream_states == NULL) { r->upstream_states = ngx_array_create(r->pool, 1, sizeof(ngx_http_upstream_state_t)); if (r->upstream_states == NULL) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } else { // u->state 对应一个上游的本次请求的信息 u->state = ngx_array_push(r->upstream_states); if (u->state == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t)); } cln = ngx_http_cleanup_add(r, 0); if (cln == NULL) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } cln->handler = ngx_http_upstream_cleanup; cln->data = r; u->cleanup = &cln->handler; if (u->resolved == NULL) { // 一组上游服务器 uscf = u->conf->upstream; } else { #if (NGX_HTTP_SSL) u->ssl_name = u->resolved->host; #endif host = &u->resolved->host; umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module); uscfp = umcf->upstreams.elts; for (i = 0; i < umcf->upstreams.nelts; i++) { uscf = uscfp[i]; if (uscf->host.len == host->len && ((uscf->port == 0 && u->resolved->no_port) || uscf->port == u->resolved->port) && ngx_strncasecmp(uscf->host.data, host->data, host->len) == 0) { goto found; } } if (u->resolved->sockaddr) { if (u->resolved->port == 0 && u->resolved->sockaddr->sa_family != AF_UNIX) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no port in upstream \"%V\"", host); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } if (ngx_http_upstream_create_round_robin_peer(r, u->resolved) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_http_upstream_connect(r, u); return; } if (u->resolved->port == 0) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no port in upstream \"%V\"", host); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } temp.name = *host; ctx = ngx_resolve_start(clcf->resolver, &temp); if (ctx == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } if (ctx == NGX_NO_RESOLVER) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no resolver defined to resolve %V", host); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_BAD_GATEWAY); return; } ctx->name = *host; ctx->handler = ngx_http_upstream_resolve_handler; ctx->data = r; ctx->timeout = clcf->resolver_timeout; u->resolved->ctx = ctx; if (ngx_resolve_name(ctx) != NGX_OK) { u->resolved->ctx = NULL; ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } return; } found: if (uscf == NULL) { ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0, "no upstream configuration"); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } u->upstream = uscf; #if (NGX_HTTP_SSL) u->ssl_name = uscf->host; #endif // 默认为ngx_http_upstream_init_round_robin_peer // uscf 为选好的一组上游(一个upstream xxx {}) // 而 init回调会真正的把这一组上游服务器ip赋值到u->peer 结构 // 包括获取一台ip的回调和释放一台回调等 if (uscf->peer.init(r, uscf) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } u->peer.start_time = ngx_current_msec; if (u->conf->next_upstream_tries && u->peer.tries > u->conf->next_upstream_tries) { // 配置尝试的后端的次数,默认是所有后端都尝试一次 u->peer.tries = u->conf->next_upstream_tries; } // 选择一台上游服务器并发起连接请求 ngx_http_upstream_connect(r, u); } // ngx_http_upstream_round_robin.c // u->peer.data 的指向结构体 typedef struct { ngx_uint_t config; ngx_http_upstream_rr_peers_t *peers; // 一组上游的所有ip。 这里是指针,会修改 ngx_http_upstream_rr_peer_t *current; // 被选出来的一台上游服务器 uintptr_t *tried; // 和data 用于上游错误的标记 uintptr_t data; // } ngx_http_upstream_rr_peer_data_t; ngx_int_t ngx_http_upstream_init_round_robin_peer(ngx_http_request_t *r, ngx_http_upstream_srv_conf_t *us) { ngx_uint_t n; ngx_http_upstream_rr_peer_data_t *rrp; rrp = r->upstream->peer.data; if (rrp == NULL) { rrp = ngx_palloc(r->pool, sizeof(ngx_http_upstream_rr_peer_data_t)); if (rrp == NULL) { return NGX_ERROR; } r->upstream->peer.data = rrp; } rrp->peers = us->peer.data; rrp->current = NULL; rrp->config = 0; n = rrp->peers->number; if (rrp->peers->next && rrp->peers->next->number > n) { n = rrp->peers->next->number; } if (n <= 8 * sizeof(uintptr_t)) { rrp->tried = &rrp->data; rrp->data = 0; } else { n = (n + (8 * sizeof(uintptr_t) - 1)) / (8 * sizeof(uintptr_t)); rrp->tried = ngx_pcalloc(r->pool, n * sizeof(uintptr_t)); if (rrp->tried == NULL) { return NGX_ERROR; } } r->upstream->peer.get = ngx_http_upstream_get_round_robin_peer; r->upstream->peer.free = ngx_http_upstream_free_round_robin_peer; r->upstream->peer.tries = ngx_http_upstream_tries(rrp->peers); #if (NGX_HTTP_SSL) r->upstream->peer.set_session = ngx_http_upstream_set_round_robin_peer_session; r->upstream->peer.save_session = ngx_http_upstream_save_round_robin_peer_session; #endif return NGX_OK; } static void ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u) { ngx_int_t rc; ngx_connection_t *c; r->connection->log->action = "connecting to upstream"; if (u->state && u->state->response_time) { u->state->response_time = ngx_current_msec - u->state->response_time; } u->state = ngx_array_push(r->upstream_states); if (u->state == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t)); u->state->response_time = ngx_current_msec; u->state->connect_time = (ngx_msec_t) -1; u->state->header_time = (ngx_msec_t) -1; // 选取一个上游服务器,发起链接请求 // NGX_DECLINED connect错误 // NGX_AGAIN 还没完全建立好链接,因为connnect是非阻塞的fd // NGX_OK 成功建立好链接,可写。 rc = ngx_event_connect_peer(&u->peer); ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http upstream connect: %i", rc); if (rc == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } // 选择的上游服务器 u->state->peer = u->peer.name; if (rc == NGX_BUSY) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no live upstreams"); ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_NOLIVE); return; } if (rc == NGX_DECLINED) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } /* rc == NGX_OK || rc == NGX_AGAIN || rc == NGX_DONE */ // 和上游服务器的链接 c = u->peer.connection; // 和上游的链接的data c->data = r; c->write->handler = ngx_http_upstream_handler; c->read->handler = ngx_http_upstream_handler; // 超时或可写事件 回调函数 u->write_event_handler = ngx_http_upstream_send_request_handler; u->read_event_handler = ngx_http_upstream_process_header; c->sendfile &= r->connection->sendfile; u->output.sendfile = c->sendfile; if (c->pool == NULL) { /* we need separate pool here to be able to cache SSL connections */ c->pool = ngx_create_pool(128, r->connection->log); if (c->pool == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } c->log = r->connection->log; c->pool->log = c->log; c->read->log = c->log; c->write->log = c->log; /* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */ u->writer.out = NULL; u->writer.last = &u->writer.out; u->writer.connection = c; u->writer.limit = 0; if (u->request_sent) { if (ngx_http_upstream_reinit(r, u) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } if (r->request_body && r->request_body->buf && r->request_body->temp_file && r == r->main) { /* * the r->request_body->buf can be reused for one request only, * the subrequests should allocate their own temporary bufs */ u->output.free = ngx_alloc_chain_link(r->pool); if (u->output.free == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } u->output.free->buf = r->request_body->buf; u->output.free->next = NULL; u->output.allocated = 1; r->request_body->buf->pos = r->request_body->buf->start; r->request_body->buf->last = r->request_body->buf->start; r->request_body->buf->tag = u->output.tag; } u->request_sent = 0; u->request_body_sent = 0; if (rc == NGX_AGAIN) { // 非阻塞链接返回EINPROGRESS // 可写事件超时,如果成功连接后会触发epoll可写 ngx_add_timer(c->write, u->conf->connect_timeout); return; } #if (NGX_HTTP_SSL) if (u->ssl && c->ssl == NULL) { ngx_http_upstream_ssl_init_connection(r, u, c); return; } #endif ngx_http_upstream_send_request(r, u, 1); }
向上游发起链接请求,删除了部分epoll无关的代码。
ngx_int_t ngx_event_connect_peer(ngx_peer_connection_t *pc) { int rc, type; ngx_int_t event; ngx_err_t err; ngx_uint_t level; ngx_socket_t s; ngx_event_t *rev, *wev; ngx_connection_t *c; // ngx_http_upstream_get_round_robin_peer // 上游IP赋值到pc->sockaddr rc = pc->get(pc, pc->data); if (rc != NGX_OK) { return rc; } type = (pc->type ? pc->type : SOCK_STREAM); s = ngx_socket(pc->sockaddr->sa_family, type, 0); ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, "%s socket %d", (type == SOCK_STREAM) ? "stream" : "dgram", s); if (s == (ngx_socket_t) -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, ngx_socket_n " failed"); return NGX_ERROR; } c = ngx_get_connection(s, pc->log); if (c == NULL) { if (ngx_close_socket(s) == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, ngx_close_socket_n "failed"); } return NGX_ERROR; } c->type = type; if (pc->rcvbuf) { if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const void *) &pc->rcvbuf, sizeof(int)) == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, "setsockopt(SO_RCVBUF) failed"); goto failed; } } if (ngx_nonblocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, ngx_nonblocking_n " failed"); goto failed; } if (pc->local) { if (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) { ngx_log_error(NGX_LOG_CRIT, pc->log, ngx_socket_errno, "bind(%V) failed", &pc->local->name); goto failed; } } if (type == SOCK_STREAM) { c->recv = ngx_recv; c->send = ngx_send; c->recv_chain = ngx_recv_chain; c->send_chain = ngx_send_chain; c->sendfile = 1; } else { /* type == SOCK_DGRAM */ c->recv = ngx_udp_recv; c->send = ngx_send; c->send_chain = ngx_udp_send_chain; } c->log_error = pc->log_error; rev = c->read; wev = c->write; rev->log = pc->log; wev->log = pc->log; pc->connection = c; c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); if (ngx_add_conn) { if (ngx_add_conn(c) == NGX_ERROR) { goto failed; } } ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connect to %V, fd:%d #%uA", pc->name, s, c->number); rc = connect(s, pc->sockaddr, pc->socklen); if (rc == -1) { err = ngx_socket_errno; if (err != NGX_EINPROGRESS #if (NGX_WIN32) /* Winsock returns WSAEWOULDBLOCK (NGX_EAGAIN) */ && err != NGX_EAGAIN #endif ) { if (err == NGX_ECONNREFUSED #if (NGX_LINUX) /* * Linux returns EAGAIN instead of ECONNREFUSED * for unix sockets if listen queue is full */ || err == NGX_EAGAIN #endif || err == NGX_ECONNRESET || err == NGX_ENETDOWN || err == NGX_ENETUNREACH || err == NGX_EHOSTDOWN || err == NGX_EHOSTUNREACH) { level = NGX_LOG_ERR; } else { level = NGX_LOG_CRIT; } ngx_log_error(level, c->log, err, "connect() to %V failed", pc->name); ngx_close_connection(c); pc->connection = NULL; return NGX_DECLINED; } } if (ngx_add_conn) { if (rc == -1) { /* NGX_EINPROGRESS */ return NGX_AGAIN; } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected"); wev->ready = 1; return NGX_OK; } failed: ngx_close_connection(c); pc->connection = NULL; return NGX_ERROR; } // ngx_http_upstream_round_robin.c ngx_int_t ngx_http_upstream_get_round_robin_peer(ngx_peer_connection_t *pc, void *data) { ngx_http_upstream_rr_peer_data_t *rrp = data; ngx_int_t rc; ngx_uint_t i, n; ngx_http_upstream_rr_peer_t *peer; ngx_http_upstream_rr_peers_t *peers; ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0, "get rr peer, try: %ui", pc->tries); pc->cached = 0; pc->connection = NULL; // peers指向ups conf的配置 peers = rrp->peers; // 因可能下线上游,因此需要加锁 ngx_http_upstream_rr_peers_wlock(peers); if (peers->single) { peer = peers->peer; if (peer->down) { goto failed; } if (peer->max_conns && peer->conns >= peer->max_conns) { goto failed; } rrp->current = peer; } else { /* there are several peers */ peer = ngx_http_upstream_get_peer(rrp); if (peer == NULL) { goto failed; } ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pc->log, 0, "get rr peer, current: %p %i", peer, peer->current_weight); } pc->sockaddr = peer->sockaddr; pc->socklen = peer->socklen; pc->name = &peer->name; peer->conns++; ngx_http_upstream_rr_peers_unlock(peers); return NGX_OK; failed: if (peers->next) { ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, "backup servers"); rrp->peers = peers->next; n = (rrp->peers->number + (8 * sizeof(uintptr_t) - 1)) / (8 * sizeof(uintptr_t)); for (i = 0; i < n; i++) { rrp->tried[i] = 0; } ngx_http_upstream_rr_peers_unlock(peers); rc = ngx_http_upstream_get_round_robin_peer(pc, rrp); if (rc != NGX_BUSY) { return rc; } ngx_http_upstream_rr_peers_wlock(peers); } ngx_http_upstream_rr_peers_unlock(peers); pc->name = peers->name; return NGX_BUSY; }
向上游发送请求信息
static void ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u, ngx_uint_t do_write) { ngx_int_t rc; ngx_connection_t *c; c = u->peer.connection; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http upstream send request"); if (u->state->connect_time == (ngx_msec_t) -1) { // 连接耗时 u->state->connect_time = ngx_current_msec - u->state->response_time; } if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } c->log->action = "sending request to upstream"; // 发送body rc = ngx_http_upstream_send_request_body(r, u, do_write); if (rc == NGX_ERROR) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } if (rc >= NGX_HTTP_SPECIAL_RESPONSE) { ngx_http_upstream_finalize_request(r, u, rc); return; } if (rc == NGX_AGAIN) { if (!c->write->ready) { ngx_add_timer(c->write, u->conf->send_timeout); } else if (c->write->timer_set) { ngx_del_timer(c->write); } // 可写事件加入epoll中 if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } return; } /* rc == NGX_OK */ // body 发送完毕 u->request_body_sent = 1; if (c->write->timer_set) { // 删除发送超时 ngx_del_timer(c->write); } if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) { if (ngx_tcp_push(c->fd) == NGX_ERROR) { ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno, ngx_tcp_push_n " failed"); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } c->tcp_nopush = NGX_TCP_NOPUSH_UNSET; } // 上游body已发送完毕,回调函数为空函数。 u->write_event_handler = ngx_http_upstream_dummy_handler; if (ngx_handle_write_event(c->write, 0) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } // 添加读超时 ngx_add_timer(c->read, u->conf->read_timeout); if (c->read->ready) { // 处理上游响应 ngx_http_upstream_process_header(r, u); return; } } // 向上游发送请求内容 // 把u->request_bufs 的内容调用ngx_output_chain写到上游。 // ngx_output_chain 又调用了 u->output->output_filter 回调函数。 // u->output.output_filter = ngx_chain_writer; // u->output.filter_ctx = &u->writer; static ngx_int_t ngx_http_upstream_send_request_body(ngx_http_request_t *r, ngx_http_upstream_t *u, ngx_uint_t do_write) { ngx_int_t rc; ngx_chain_t *out, *cl, *ln; ngx_connection_t *c; ngx_http_core_loc_conf_t *clcf; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http upstream send request body"); // 请求是否有缓存 if (!r->request_body_no_buffering) { // 请求已全部读到内存 /* buffered request body */ // 请求还未发送到上游 if (!u->request_sent) { u->request_sent = 1; // 整个请求内容。 ngx_http_upstream_init_request函数中调用create_request out = u->request_bufs; } else { out = NULL; } rc = ngx_output_chain(&u->output, out); if (rc == NGX_AGAIN) { u->request_body_blocked = 1; } else { u->request_body_blocked = 0; } return rc; } if (!u->request_sent) { u->request_sent = 1; out = u->request_bufs; if (r->request_body->bufs) { for (cl = out; cl->next; cl = cl->next) { /* void */ } cl->next = r->request_body->bufs; r->request_body->bufs = NULL; } c = u->peer.connection; clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); if (clcf->tcp_nodelay && ngx_tcp_nodelay(c) != NGX_OK) { return NGX_ERROR; } r->read_event_handler = ngx_http_upstream_read_request_handler; } else { out = NULL; } // 请求有缓存,需要循环多次发送到上游 for ( ;; ) { if (do_write) { // 发送到上游,out是构造好的要发送到上游的内容,请求行+请求头+body // rc = ngx_output_chain(&u->output, out); if (rc == NGX_ERROR) { return NGX_ERROR; } while (out) { ln = out; out = out->next; ngx_free_chain(r->pool, ln); } if (rc == NGX_AGAIN) { u->request_body_blocked = 1; } else { u->request_body_blocked = 0; } if (rc == NGX_OK && !r->reading_body) { break; } } if (r->reading_body) { /* read client request body */ rc = ngx_http_read_unbuffered_request_body(r); if (rc >= NGX_HTTP_SPECIAL_RESPONSE) { return rc; } out = r->request_body->bufs; r->request_body->bufs = NULL; } /* stop if there is nothing to send */ if (out == NULL) { rc = NGX_AGAIN; break; } do_write = 1; } if (!r->reading_body) { if (!u->store && !r->post_action && !u->conf->ignore_client_abort) { r->read_event_handler = ngx_http_upstream_rd_check_broken_connection; } } return rc; }
上游响应处理函数
static void ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u) { ssize_t n; ngx_int_t rc; ngx_connection_t *c; c = u->peer.connection; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "http upstream process header"); c->log->action = "reading response header from upstream"; if (c->read->timedout) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT); return; } if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } if (u->buffer.start == NULL) { // 接受上游响应缓存区,和proxy_buffer_size相关,默认一页内存大小,建议改大一些 u->buffer.start = ngx_palloc(r->pool, u->conf->buffer_size); if (u->buffer.start == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } u->buffer.pos = u->buffer.start; u->buffer.last = u->buffer.start; u->buffer.end = u->buffer.start + u->conf->buffer_size; u->buffer.temporary = 1; u->buffer.tag = u->output.tag; // 初始化响应头链表 if (ngx_list_init(&u->headers_in.headers, r->pool, 8, sizeof(ngx_table_elt_t)) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } #if (NGX_HTTP_CACHE) if (r->cache) { u->buffer.pos += r->cache->header_start; u->buffer.last = u->buffer.pos; } #endif } for ( ;; ) { // 读取上游的resp n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last); if (n == NGX_AGAIN) { #if 0 ngx_add_timer(rev, u->read_timeout); #endif if (ngx_handle_read_event(c->read, 0) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } return; } if (n == 0) { ngx_log_error(NGX_LOG_ERR, c->log, 0, "upstream prematurely closed connection"); } if (n == NGX_ERROR || n == 0) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } u->state->bytes_received += n; u->buffer.last += n; #if 0 u->valid_header_in = 0; u->peer.cached = 0; #endif // proxy 模块 u->process_header = ngx_http_proxy_process_status_line; // ngx_http_proxy_process_status_line 处理上游返回的resp rc = u->process_header(r); if (rc == NGX_AGAIN) { if (u->buffer.last == u->buffer.end) { ngx_log_error(NGX_LOG_ERR, c->log, 0, "upstream sent too big header"); ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER); return; } continue; } break; } if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER); return; } if (rc == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } /* rc == NGX_OK */ // 成功处理完上游返回的响应头 u->state->header_time = ngx_current_msec - u->state->response_time; if (u->headers_in.status_n >= NGX_HTTP_SPECIAL_RESPONSE) { // 根据上游的状态码判断是否要重新选择一台上游服务器 // 最终还会调用ngx_http_upstream_next if (ngx_http_upstream_test_next(r, u) == NGX_OK) { return; } // 是否要截获上游服务器返回的错误,根据配置proxy_intercept_errors。 // 如果配置为on,则查看error_pages是否配置了对应的错误码,如果配置了就返回对应的错误页面。否则返回上游页面。 if (ngx_http_upstream_intercept_errors(r, u) == NGX_OK) { return; } } // 根据响应头跳转location、隐藏响应头等 // 复制上游的响应头到下游 if (ngx_http_upstream_process_headers(r, u) != NGX_OK) { return; } if (!r->subrequest_in_memory) { ngx_http_upstream_send_response(r, u); return; } /* subrequest content in memory */ if (u->input_filter == NULL) { u->input_filter_init = ngx_http_upstream_non_buffered_filter_init; // u->out_bufs u->input_filter = ngx_http_upstream_non_buffered_filter; u->input_filter_ctx = r; } if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } n = u->buffer.last - u->buffer.pos; if (n) { u->buffer.last = u->buffer.pos; u->state->response_length += n; if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } } if (u->length == 0) { ngx_http_upstream_finalize_request(r, u, 0); return; } u->read_event_handler = ngx_http_upstream_process_body_in_memory; ngx_http_upstream_process_body_in_memory(r, u); } // ngx_http_proxy_module.c 处理上游返回的状态行和header信息 static ngx_int_t ngx_http_proxy_process_status_line(ngx_http_request_t *r) { size_t len; ngx_int_t rc; ngx_http_upstream_t *u; ngx_http_proxy_ctx_t *ctx; ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module); if (ctx == NULL) { return NGX_ERROR; } u = r->upstream; rc = ngx_http_parse_status_line(r, &u->buffer, &ctx->status); if (rc == NGX_AGAIN) { return rc; } if (rc == NGX_ERROR) { #if (NGX_HTTP_CACHE) if (r->cache) { r->http_version = NGX_HTTP_VERSION_9; return NGX_OK; } #endif ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "upstream sent no valid HTTP/1.0 header"); #if 0 if (u->accel) { return NGX_HTTP_UPSTREAM_INVALID_HEADER; } #endif r->http_version = NGX_HTTP_VERSION_9; u->state->status = NGX_HTTP_OK; u->headers_in.connection_close = 1; return NGX_OK; } if (u->state && u->state->status == 0) { u->state->status = ctx->status.code; } u->headers_in.status_n = ctx->status.code; len = ctx->status.end - ctx->status.start; u->headers_in.status_line.len = len; u->headers_in.status_line.data = ngx_pnalloc(r->pool, len); if (u->headers_in.status_line.data == NULL) { return NGX_ERROR; } ngx_memcpy(u->headers_in.status_line.data, ctx->status.start, len); ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http proxy status %ui \"%V\"", u->headers_in.status_n, &u->headers_in.status_line); if (ctx->status.http_version < NGX_HTTP_VERSION_11) { u->headers_in.connection_close = 1; } // 处理上游resp的header的回调 u->process_header = ngx_http_proxy_process_header; return ngx_http_proxy_process_header(r); } static ngx_int_t ngx_http_proxy_process_header(ngx_http_request_t *r) { ngx_int_t rc; ngx_table_elt_t *h; ngx_http_upstream_t *u; ngx_http_proxy_ctx_t *ctx; ngx_http_upstream_header_t *hh; ngx_http_upstream_main_conf_t *umcf; umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module); for ( ;; ) { rc = ngx_http_parse_header_line(r, &r->upstream->buffer, 1); if (rc == NGX_OK) { /* a header line has been parsed successfully */ h = ngx_list_push(&r->upstream->headers_in.headers); if (h == NULL) { return NGX_ERROR; } h->hash = r->header_hash; h->key.len = r->header_name_end - r->header_name_start; h->value.len = r->header_end - r->header_start; h->key.data = ngx_pnalloc(r->pool, h->key.len + 1 + h->value.len + 1 + h->key.len); if (h->key.data == NULL) { h->hash = 0; return NGX_ERROR; } h->value.data = h->key.data + h->key.len + 1; h->lowcase_key = h->key.data + h->key.len + 1 + h->value.len + 1; ngx_memcpy(h->key.data, r->header_name_start, h->key.len); h->key.data[h->key.len] = '\0'; ngx_memcpy(h->value.data, r->header_start, h->value.len); h->value.data[h->value.len] = '\0'; if (h->key.len == r->lowcase_index) { ngx_memcpy(h->lowcase_key, r->lowcase_header, h->key.len); } else { ngx_strlow(h->lowcase_key, h->key.data, h->key.len); } hh = ngx_hash_find(&umcf->headers_in_hash, h->hash, h->lowcase_key, h->key.len); if (hh && hh->handler(r, h, hh->offset) != NGX_OK) { return NGX_ERROR; } ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http proxy header: \"%V: %V\"", &h->key, &h->value); continue; } if (rc == NGX_HTTP_PARSE_HEADER_DONE) { /* a whole header has been parsed successfully */ ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http proxy header done"); /* * if no "Server" and "Date" in header line, * then add the special empty headers */ if (r->upstream->headers_in.server == NULL) { h = ngx_list_push(&r->upstream->headers_in.headers); if (h == NULL) { return NGX_ERROR; } h->hash = ngx_hash(ngx_hash(ngx_hash(ngx_hash( ngx_hash('s', 'e'), 'r'), 'v'), 'e'), 'r'); ngx_str_set(&h->key, "Server"); ngx_str_null(&h->value); h->lowcase_key = (u_char *) "server"; } if (r->upstream->headers_in.date == NULL) { h = ngx_list_push(&r->upstream->headers_in.headers); if (h == NULL) { return NGX_ERROR; } h->hash = ngx_hash(ngx_hash(ngx_hash('d', 'a'), 't'), 'e'); ngx_str_set(&h->key, "Date"); ngx_str_null(&h->value); h->lowcase_key = (u_char *) "date"; } /* clear content length if response is chunked */ u = r->upstream; if (u->headers_in.chunked) { u->headers_in.content_length_n = -1; } /* * set u->keepalive if response has no body; this allows to keep * connections alive in case of r->header_only or X-Accel-Redirect */ ctx = ngx_http_get_module_ctx(r, ngx_http_proxy_module); if (u->headers_in.status_n == NGX_HTTP_NO_CONTENT || u->headers_in.status_n == NGX_HTTP_NOT_MODIFIED || ctx->head || (!u->headers_in.chunked && u->headers_in.content_length_n == 0)) { u->keepalive = !u->headers_in.connection_close; } // 如果上游resp状态码是101 则说明需要协议变更 if (u->headers_in.status_n == NGX_HTTP_SWITCHING_PROTOCOLS) { u->keepalive = 0; if (r->headers_in.upgrade) { u->upgrade = 1; } } return NGX_OK; } if (rc == NGX_AGAIN) { return NGX_AGAIN; } /* there was error while a header line parsing */ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "upstream sent invalid header"); return NGX_HTTP_UPSTREAM_INVALID_HEADER; } }
解析上游resp的header copy到下游request的headers_out
static ngx_int_t ngx_http_upstream_process_headers(ngx_http_request_t *r, ngx_http_upstream_t *u) { ngx_str_t uri, args; ngx_uint_t i, flags; ngx_list_part_t *part; ngx_table_elt_t *h; ngx_http_upstream_header_t *hh; ngx_http_upstream_main_conf_t *umcf; umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module); if (u->headers_in.x_accel_redirect && !(u->conf->ignore_headers & NGX_HTTP_UPSTREAM_IGN_XA_REDIRECT)) { // 处理x-accel-redirect 这个头执行内部跳转 ngx_http_upstream_finalize_request(r, u, NGX_DECLINED); part = &u->headers_in.headers.part; h = part->elts; for (i = 0; /* void */; i++) { if (i >= part->nelts) { if (part->next == NULL) { break; } part = part->next; h = part->elts; i = 0; } hh = ngx_hash_find(&umcf->headers_in_hash, h[i].hash, h[i].lowcase_key, h[i].key.len); if (hh && hh->redirect) { if (hh->copy_handler(r, &h[i], hh->conf) != NGX_OK) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return NGX_DONE; } } } uri = u->headers_in.x_accel_redirect->value; if (uri.data[0] == '@') { ngx_http_named_location(r, &uri); } else { ngx_str_null(&args); flags = NGX_HTTP_LOG_UNSAFE; if (ngx_http_parse_unsafe_uri(r, &uri, &args, &flags) != NGX_OK) { ngx_http_finalize_request(r, NGX_HTTP_NOT_FOUND); return NGX_DONE; } if (r->method != NGX_HTTP_HEAD) { r->method = NGX_HTTP_GET; r->method_name = ngx_http_core_get_method; } ngx_http_internal_redirect(r, &uri, &args); } ngx_http_finalize_request(r, NGX_DONE); return NGX_DONE; } part = &u->headers_in.headers.part; h = part->elts; for (i = 0; /* void */; i++) { if (i >= part->nelts) { if (part->next == NULL) { break; } part = part->next; h = part->elts; i = 0; } if (ngx_hash_find(&u->conf->hide_headers_hash, h[i].hash, h[i].lowcase_key, h[i].key.len)) { continue; } hh = ngx_hash_find(&umcf->headers_in_hash, h[i].hash, h[i].lowcase_key, h[i].key.len); if (hh) { if (hh->copy_handler(r, &h[i], hh->conf) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return NGX_DONE; } continue; } if (ngx_http_upstream_copy_header_line(r, &h[i], 0) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return NGX_DONE; } } if (r->headers_out.server && r->headers_out.server->value.data == NULL) { r->headers_out.server->hash = 0; } if (r->headers_out.date && r->headers_out.date->value.data == NULL) { r->headers_out.date->hash = 0; } r->headers_out.status = u->headers_in.status_n; r->headers_out.status_line = u->headers_in.status_line; r->headers_out.content_length_n = u->headers_in.content_length_n; r->disable_not_modified = !u->cacheable; if (u->conf->force_ranges) { r->allow_ranges = 1; r->single_range = 1; #if (NGX_HTTP_CACHE) if (r->cached) { r->single_range = 0; } #endif } u->length = -1; return NGX_OK; } // 发送resp给下游 static void ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u) { ssize_t n; ngx_int_t rc; ngx_event_pipe_t *p; ngx_connection_t *c; ngx_http_core_loc_conf_t *clcf; rc = ngx_http_send_header(r); if (rc == NGX_ERROR || rc > NGX_OK || r->post_action) { ngx_http_upstream_finalize_request(r, u, rc); return; } u->header_sent = 1; if (u->upgrade) { #if (NGX_HTTP_CACHE) if (r->cache) { ngx_http_file_cache_free(r->cache, u->pipe->temp_file); } #endif // 支持upgrade websocket ... ngx_http_upstream_upgrade(r, u); return; } c = r->connection; if (r->header_only) { // 仅需要发送header到下游(head 请求,204响应码,304响应码) if (!u->buffering) { ngx_http_upstream_finalize_request(r, u, rc); return; } if (!u->cacheable && !u->store) { ngx_http_upstream_finalize_request(r, u, rc); return; } u->pipe->downstream_error = 1; } if (r->request_body && r->request_body->temp_file && r == r->main && !r->preserve_body) { // 如果body被保存到了临时文件且不需要保留body(preserve_body应该是保护body的flag) // 清理保存body的临时文件 ngx_pool_run_cleanup_file(r->pool, r->request_body->temp_file->file.fd); r->request_body->temp_file->file.fd = NGX_INVALID_FILE; } clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); if (!u->buffering) { // proxy_buffering off; #if (NGX_HTTP_CACHE) if (r->cache) { ngx_http_file_cache_free(r->cache, u->pipe->temp_file); } #endif if (u->input_filter == NULL) { u->input_filter_init = ngx_http_upstream_non_buffered_filter_init; u->input_filter = ngx_http_upstream_non_buffered_filter; u->input_filter_ctx = r; } u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream; r->write_event_handler = ngx_http_upstream_process_non_buffered_downstream; r->limit_rate = 0; if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } if (clcf->tcp_nodelay && ngx_tcp_nodelay(c) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } n = u->buffer.last - u->buffer.pos; if (n) { u->buffer.last = u->buffer.pos; u->state->response_length += n; if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } ngx_http_upstream_process_non_buffered_downstream(r); } else { u->buffer.pos = u->buffer.start; u->buffer.last = u->buffer.start; if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } if (u->peer.connection->read->ready || u->length == 0) { ngx_http_upstream_process_non_buffered_upstream(r, u); } } return; } /* TODO: preallocate event_pipe bufs, look "Content-Length" */ #if (NGX_HTTP_CACHE) if (r->cache && r->cache->file.fd != NGX_INVALID_FILE) { ngx_pool_run_cleanup_file(r->pool, r->cache->file.fd); r->cache->file.fd = NGX_INVALID_FILE; } switch (ngx_http_test_predicates(r, u->conf->no_cache)) { case NGX_ERROR: ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; case NGX_DECLINED: u->cacheable = 0; break; default: /* NGX_OK */ if (u->cache_status == NGX_HTTP_CACHE_BYPASS) { /* create cache if previously bypassed */ if (ngx_http_file_cache_create(r) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } } break; } if (u->cacheable) { time_t now, valid; now = ngx_time(); valid = r->cache->valid_sec; if (valid == 0) { valid = ngx_http_file_cache_valid(u->conf->cache_valid, u->headers_in.status_n); if (valid) { r->cache->valid_sec = now + valid; } } if (valid) { r->cache->date = now; r->cache->body_start = (u_short) (u->buffer.pos - u->buffer.start); if (u->headers_in.status_n == NGX_HTTP_OK || u->headers_in.status_n == NGX_HTTP_PARTIAL_CONTENT) { r->cache->last_modified = u->headers_in.last_modified_time; if (u->headers_in.etag) { r->cache->etag = u->headers_in.etag->value; } else { ngx_str_null(&r->cache->etag); } } else { r->cache->last_modified = -1; ngx_str_null(&r->cache->etag); } if (ngx_http_file_cache_set_header(r, u->buffer.start) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } } else { u->cacheable = 0; } } ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, "http cacheable: %d", u->cacheable); if (u->cacheable == 0 && r->cache) { ngx_http_file_cache_free(r->cache, u->pipe->temp_file); } if (r->header_only && !u->cacheable && !u->store) { ngx_http_upstream_finalize_request(r, u, 0); return; } #endif // 配置了缓存(proxy_buffering=on) p = u->pipe; // ngx_http_upstream_output_filter resp body 发到下游的时候调用了body的过滤函数 p->output_filter = ngx_http_upstream_output_filter; p->output_ctx = r; p->tag = u->output.tag; p->bufs = u->conf->bufs; // 用于读取resp缓存块的大小和个数 p->busy_size = u->conf->busy_buffers_size; p->upstream = u->peer.connection; // 和上游也就是源站的连接 p->downstream = c; // 和下游也就是和客户端的连接 p->pool = r->pool; p->log = c->log; p->limit_rate = u->conf->limit_rate; p->start_sec = ngx_time(); p->cacheable = u->cacheable || u->store; p->temp_file = ngx_pcalloc(r->pool, sizeof(ngx_temp_file_t)); if (p->temp_file == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } p->temp_file->file.fd = NGX_INVALID_FILE; p->temp_file->file.log = c->log; p->temp_file->path = u->conf->temp_path; p->temp_file->pool = r->pool; if (p->cacheable) { p->temp_file->persistent = 1; #if (NGX_HTTP_CACHE) if (r->cache && !r->cache->file_cache->use_temp_path) { p->temp_file->path = r->cache->file_cache->path; p->temp_file->file.name = r->cache->file.name; } #endif } else { p->temp_file->log_level = NGX_LOG_WARN; p->temp_file->warn = "an upstream response is buffered " "to a temporary file"; } p->max_temp_file_size = u->conf->max_temp_file_size; p->temp_file_write_size = u->conf->temp_file_write_size; #if (NGX_THREADS) if (clcf->aio == NGX_HTTP_AIO_THREADS && clcf->aio_write) { p->thread_handler = ngx_http_upstream_thread_handler; p->thread_ctx = r; } #endif p->preread_bufs = ngx_alloc_chain_link(r->pool); if (p->preread_bufs == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } p->preread_bufs->buf = &u->buffer; // 注意这里是指针 p->preread_bufs->next = NULL; u->buffer.recycled = 1; // 读响应头读回来的响应body大小 p->preread_size = u->buffer.last - u->buffer.pos; if (u->cacheable) { p->buf_to_file = ngx_calloc_buf(r->pool); if (p->buf_to_file == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } p->buf_to_file->start = u->buffer.start; p->buf_to_file->pos = u->buffer.start; p->buf_to_file->last = u->buffer.pos; p->buf_to_file->temporary = 1; } if (ngx_event_flags & NGX_USE_IOCP_EVENT) { /* the posted aio operation may corrupt a shadow buffer */ p->single_buf = 1; } /* TODO: p->free_bufs = 0 if use ngx_create_chain_of_bufs() */ p->free_bufs = 1; /* * event_pipe would do u->buffer.last += p->preread_size * as though these bytes were read */ // 注意这里也影响了p->preread_bufs。否则很难理解ngx_event_pipe_read_upstream u->buffer.last = u->buffer.pos; if (u->conf->cyclic_temp_file) { /* * we need to disable the use of sendfile() if we use cyclic temp file * because the writing a new data may interfere with sendfile() * that uses the same kernel file pages (at least on FreeBSD) */ p->cyclic_temp_file = 1; c->sendfile = 0; } else { p->cyclic_temp_file = 0; } p->read_timeout = u->conf->read_timeout; p->send_timeout = clcf->send_timeout; p->send_lowat = clcf->send_lowat; p->length = -1; // proxy 模块定义的是ngx_http_proxy_input_filter_init // 根据响应头状态码是否不需要body、是否是chunked编码 // 设置 u->pipe->input_filter = ngx_http_proxy_chunked_filter; 或 u->pipe->length 和 u->length if (u->input_filter_init && u->input_filter_init(p->input_ctx) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_ERROR); return; } // 两个回调函数主要是调用ngx_event_pipe函数 // ngx_event_pipe(p, 0) u->read_event_handler = ngx_http_upstream_process_upstream; // ngx_event_pipe(p, 1) r->write_event_handler = ngx_http_upstream_process_downstream; ngx_http_upstream_process_upstream(r, u); } // 另选择一台上游服务器连接 ```c static void ngx_http_upstream_next(ngx_http_request_t *r, ngx_http_upstream_t *u, ngx_uint_t ft_type) { ngx_msec_t timeout; ngx_uint_t status, state; ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http next upstream, %xi", ft_type); if (u->peer.sockaddr) { // 发生错误的上游服务器 if (ft_type == NGX_HTTP_UPSTREAM_FT_HTTP_403 || ft_type == NGX_HTTP_UPSTREAM_FT_HTTP_404) { state = NGX_PEER_NEXT; } else { state = NGX_PEER_FAILED; } // ngx_http_upstream_free_round_robin_peer u->peer.free(&u->peer, u->peer.data, state); u->peer.sockaddr = NULL; } if (ft_type == NGX_HTTP_UPSTREAM_FT_TIMEOUT) { ngx_log_error(NGX_LOG_ERR, r->connection->log, NGX_ETIMEDOUT, "upstream timed out"); } if (u->peer.cached && ft_type == NGX_HTTP_UPSTREAM_FT_ERROR) { /* TODO: inform balancer instead */ u->peer.tries++; } switch (ft_type) { case NGX_HTTP_UPSTREAM_FT_TIMEOUT: case NGX_HTTP_UPSTREAM_FT_HTTP_504: status = NGX_HTTP_GATEWAY_TIME_OUT; break; case NGX_HTTP_UPSTREAM_FT_HTTP_500: status = NGX_HTTP_INTERNAL_SERVER_ERROR; break; case NGX_HTTP_UPSTREAM_FT_HTTP_503: status = NGX_HTTP_SERVICE_UNAVAILABLE; break; case NGX_HTTP_UPSTREAM_FT_HTTP_403: status = NGX_HTTP_FORBIDDEN; break; case NGX_HTTP_UPSTREAM_FT_HTTP_404: status = NGX_HTTP_NOT_FOUND; break; case NGX_HTTP_UPSTREAM_FT_HTTP_429: status = NGX_HTTP_TOO_MANY_REQUESTS; break; /* * NGX_HTTP_UPSTREAM_FT_BUSY_LOCK and NGX_HTTP_UPSTREAM_FT_MAX_WAITING * never reach here */ default: status = NGX_HTTP_BAD_GATEWAY; } if (r->connection->error) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_CLIENT_CLOSED_REQUEST); return; } u->state->status = status; timeout = u->conf->next_upstream_timeout; // 已经向上游发送了请求,且是post方法或。。。 if (u->request_sent && (r->method & (NGX_HTTP_POST|NGX_HTTP_LOCK|NGX_HTTP_PATCH))) { // 上游不是幂等的 ft_type |= NGX_HTTP_UPSTREAM_FT_NON_IDEMPOTENT; } if (u->peer.tries == 0 || ((u->conf->next_upstream & ft_type) != ft_type) || (u->request_sent && r->request_body_no_buffering) || (timeout && ngx_current_msec - u->peer.start_time >= timeout)) { #if (NGX_HTTP_CACHE) if (u->cache_status == NGX_HTTP_CACHE_EXPIRED && ((u->conf->cache_use_stale & ft_type) || r->cache->stale_error)) { ngx_int_t rc; rc = u->reinit_request(r); if (rc != NGX_OK) { ngx_http_upstream_finalize_request(r, u, rc); return; } u->cache_status = NGX_HTTP_CACHE_STALE; rc = ngx_http_upstream_cache_send(r, u); if (rc == NGX_DONE) { return; } if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) { rc = NGX_HTTP_INTERNAL_SERVER_ERROR; } ngx_http_upstream_finalize_request(r, u, rc); return; } #endif ngx_http_upstream_finalize_request(r, u, status); return; } if (u->peer.connection) { ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "close http upstream connection: %d", u->peer.connection->fd); #if (NGX_HTTP_SSL) if (u->peer.connection->ssl) { u->peer.connection->ssl->no_wait_shutdown = 1; u->peer.connection->ssl->no_send_shutdown = 1; (void) ngx_ssl_shutdown(u->peer.connection); } #endif if (u->peer.connection->pool) { ngx_destroy_pool(u->peer.connection->pool); } ngx_close_connection(u->peer.connection); u->peer.connection = NULL; } ngx_http_upstream_connect(r, u); } ```c
结束上游请求
static void ngx_http_upstream_finalize_request(ngx_http_request_t *r, ngx_http_upstream_t *u, ngx_int_t rc) { ngx_uint_t flush; ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "finalize http upstream request: %i", rc); if (u->cleanup == NULL) { // 上游请求已经结束 /* the request was already finalized */ ngx_http_finalize_request(r, NGX_DONE); return; } *u->cleanup = NULL; u->cleanup = NULL; if (u->resolved && u->resolved->ctx) { ngx_resolve_name_done(u->resolved->ctx); u->resolved->ctx = NULL; } if (u->state && u->state->response_time) { u->state->response_time = ngx_current_msec - u->state->response_time; if (u->pipe && u->pipe->read_length) { u->state->bytes_received += u->pipe->read_length - u->pipe->preread_size; u->state->response_length = u->pipe->read_length; } } // 调用ngx_http_proxy_finalize_request u->finalize_request(r, rc); if (u->peer.free && u->peer.sockaddr) { u->peer.free(&u->peer, u->peer.data, 0); u->peer.sockaddr = NULL; } if (u->peer.connection) { #if (NGX_HTTP_SSL) /* TODO: do not shutdown persistent connection */ if (u->peer.connection->ssl) { /* * We send the "close notify" shutdown alert to the upstream only * and do not wait its "close notify" shutdown alert. * It is acceptable according to the TLS standard. */ u->peer.connection->ssl->no_wait_shutdown = 1; (void) ngx_ssl_shutdown(u->peer.connection); } #endif ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "close http upstream connection: %d", u->peer.connection->fd); if (u->peer.connection->pool) { ngx_destroy_pool(u->peer.connection->pool); } // 关闭上游连接 ngx_close_connection(u->peer.connection); } u->peer.connection = NULL; if (u->pipe && u->pipe->temp_file) { ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http upstream temp fd: %d", u->pipe->temp_file->file.fd); } if (u->store && u->pipe && u->pipe->temp_file && u->pipe->temp_file->file.fd != NGX_INVALID_FILE) { if (ngx_delete_file(u->pipe->temp_file->file.name.data) == NGX_FILE_ERROR) { ngx_log_error(NGX_LOG_CRIT, r->connection->log, ngx_errno, ngx_delete_file_n " \"%s\" failed", u->pipe->temp_file->file.name.data); } } #if (NGX_HTTP_CACHE) if (r->cache) { if (u->cacheable) { if (rc == NGX_HTTP_BAD_GATEWAY || rc == NGX_HTTP_GATEWAY_TIME_OUT) { time_t valid; valid = ngx_http_file_cache_valid(u->conf->cache_valid, rc); if (valid) { r->cache->valid_sec = ngx_time() + valid; r->cache->error = rc; } } } ngx_http_file_cache_free(r->cache, u->pipe->temp_file); } #endif if (r->subrequest_in_memory && u->headers_in.status_n >= NGX_HTTP_SPECIAL_RESPONSE) { u->buffer.last = u->buffer.pos; } r->read_event_handler = ngx_http_block_reading; if (rc == NGX_DECLINED) { return; } r->connection->log->action = "sending to client"; if (!u->header_sent || rc == NGX_HTTP_REQUEST_TIME_OUT || rc == NGX_HTTP_CLIENT_CLOSED_REQUEST || (u->pipe && u->pipe->downstream_error)) { ngx_http_finalize_request(r, rc); return; } flush = 0; if (rc >= NGX_HTTP_SPECIAL_RESPONSE) { rc = NGX_ERROR; flush = 1; } if (r->header_only) { ngx_http_finalize_request(r, rc); return; } if (rc == 0) { rc = ngx_http_send_special(r, NGX_HTTP_LAST); } else if (flush) { r->keepalive = 0; rc = ngx_http_send_special(r, NGX_HTTP_FLUSH); } ngx_http_finalize_request(r, rc); }
ngx_event_pipe.c
ngx_int_t ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write) { ngx_int_t rc; ngx_uint_t flags; ngx_event_t *rev, *wev; for ( ;; ) { if (do_write) { p->log->action = "sending to client"; rc = ngx_event_pipe_write_to_downstream(p); if (rc == NGX_ABORT) { return NGX_ABORT; } if (rc == NGX_BUSY) { return NGX_OK; } } p->read = 0; p->upstream_blocked = 0; p->log->action = "reading upstream"; if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) { return NGX_ABORT; } if (!p->read && !p->upstream_blocked) { break; } do_write = 1; } if (p->upstream->fd != (ngx_socket_t) -1) { rev = p->upstream->read; flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0; if (ngx_handle_read_event(rev, flags) != NGX_OK) { return NGX_ABORT; } if (!rev->delayed) { if (rev->active && !rev->ready) { ngx_add_timer(rev, p->read_timeout); } else if (rev->timer_set) { ngx_del_timer(rev); } } } if (p->downstream->fd != (ngx_socket_t) -1 && p->downstream->data == p->output_ctx) { wev = p->downstream->write; if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) { return NGX_ABORT; } if (!wev->delayed) { if (wev->active && !wev->ready) { ngx_add_timer(wev, p->send_timeout); } else if (wev->timer_set) { ngx_del_timer(wev); } } } return NGX_OK; }
u->pipe是resp body传输的核心,定义也非常贴切,就是相当于一根接通上下游的一个单向管道,把上游的body转到下游,其中input_filter是从上游读回resp body处理函数,output_filter是把body发送到下游的处理函数。 容易混淆的还有u->output 和 u->input_filter, 其中u->output 是把请求发送到上游的回调结构体,和 u->pipe 没关系。从ngx_http_read_client_request_body(r, ngx_http_upstream_init)开始,读取完body,调用 ngx_http_upstream_init -> ngx_http_upstream_init_request -> ngx_http_upstream_connect -> ngx_http_upstream_send_request -> ngx_http_upstream_send_request_body -> ngx_output_chain -> u->output.output_filter = ngx_chain_writer 而u->input_filter 是和u->pipe 有关系的, u->input_filter 也要区别于p->input_filter,这两个回调用函数作用类似都是处理上游resp body,前者是没有缓冲区(proxy_buffering off;)会调用,后者是有缓冲区(proxy_buffering on;)调用。
u->pipe
input_filter
output_filter
u->output
u->input_filter
ngx_http_read_client_request_body(r, ngx_http_upstream_init)
ngx_http_upstream_init
ngx_http_upstream_init_request
ngx_http_upstream_connect
ngx_http_upstream_send_request
ngx_http_upstream_send_request_body
ngx_output_chain
u->output.output_filter = ngx_chain_writer
p->input_filter
附录:关键函数
ngx_http_top_request_body_filter = ngx_http_request_body_save_filter; ngx_http_init_connection rev->handler = ngx_http_wait_request_handler; c->write->handler = ngx_http_empty_handler; rev->handler = ngx_http_process_request_line; rev->handler = ngx_http_process_request_headers; c->read->handler = ngx_http_request_handler; c->write->handler = ngx_http_request_handler; r->read_event_handler = ngx_http_block_reading; r->write_event_handler = ngx_http_core_run_phases; ngx_http_core_run_phases(r); ngx_http_core_content_phase r->write_event_handler = ngx_http_request_empty_handler; ngx_http_proxy_handler u->pipe->input_filter = ngx_http_proxy_copy_filter; u->pipe->input_ctx = r; u->input_filter_init = ngx_http_proxy_input_filter_init; u->input_filter = ngx_http_proxy_non_buffered_copy_filter; u->input_filter_ctx = r; r->read_event_handler = ngx_http_read_client_request_body_handler; r->write_event_handler = ngx_http_request_empty_handler; rc = ngx_http_do_read_client_request_body(r); upstream: c->write->handler = ngx_http_upstream_handler; c->read->handler = ngx_http_upstream_handler; u->write_event_handler = ngx_http_upstream_send_request_handler; u->read_event_handler = ngx_http_upstream_process_header; ngx_http_upstream_send_request ngx_http_upstream_send_request_body ngx_http_upstream_send_response p = u->pipe; p->output_filter = ngx_http_upstream_output_filter; p->output_ctx = r; p->upstream = u->peer.connection; p->downstream = c; u->read_event_handler = ngx_http_upstream_process_upstream; r->write_event_handler = ngx_http_upstream_process_downstream; ngx_http_upstream_process_upstream(r, u);
概述
upstream是nginx向上游发起tcp请求的一种机制。在nginx中有很多有用的模块都用到了该机制,例如proxy模块,memcache模块等。
upstream模块提供了两个配置指令:upstream和server来指定上游服务器地址。
指令解析
upstream机制的实现是在ngx_http_upstream.h|c, ngx_http_upstream_round_robin.h|c 这几个文件中。 同时使用upstream的模块(如proxy)也会初始化和调用ups模块的一些变量和函数,先从程序启动解析配置指令开始。调用逻辑在ngx_http_block函数中。
默认的round robin负载均衡策略初始化函数。
向上游发起链接请求,删除了部分epoll无关的代码。
向上游发送请求信息
上游响应处理函数
解析上游resp的header copy到下游request的headers_out
结束上游请求
ngx_event_pipe.c
总结
u->pipe
是resp body传输的核心,定义也非常贴切,就是相当于一根接通上下游的一个单向管道,把上游的body转到下游,其中input_filter
是从上游读回resp body处理函数,output_filter
是把body发送到下游的处理函数。 容易混淆的还有u->output
和u->input_filter
, 其中u->output
是把请求发送到上游的回调结构体,和u->pipe
没关系。从ngx_http_read_client_request_body(r, ngx_http_upstream_init)
开始,读取完body,调用ngx_http_upstream_init
->ngx_http_upstream_init_request
->ngx_http_upstream_connect
->ngx_http_upstream_send_request
->ngx_http_upstream_send_request_body
->ngx_output_chain
->u->output.output_filter = ngx_chain_writer
而u->input_filter
是和u->pipe
有关系的,u->input_filter
也要区别于p->input_filter
,这两个回调用函数作用类似都是处理上游resp body,前者是没有缓冲区(proxy_buffering off;)会调用,后者是有缓冲区(proxy_buffering on;)调用。附录:关键函数