vislee / leevis.com

Blog
87 stars 13 forks source link

nginx http pipeline 处理代码分析 #187

Open vislee opened 2 years ago

vislee commented 2 years ago

背景

近期遇到一个问题,源站接受请求后无响应或者响应时间非常长,客户端长连接测试。 一个连接发了多个请求,等不到响应时关闭连接。查看access日志时只有第一个请求的一条日志4,error日志也没详细的错误。 接下来就看看nginx是怎么处理http的pipeline请求的。

代码分析

先看个测试例子:


use Test::Nginx::Socket::Lua;
use Cwd qw(cwd);

log_level('debug');

repeat_each(1);
plan tests => repeat_each() * (7 * blocks());

no_long_string();

run_tests();

__DATA__

=== TEST 1: test

--- http_config
    variables_hash_max_size 2048;

    # server {
    #     listen 127.0.0.1:8082;

    #     access_log off;

    #     location / {
    #         content_by_lua_block {
    #             -- ngx.sleep(10)
    #             ngx.print("OK!")
    #             ngx.exit(ngx.HTTP_OK)
    #         }
    #     }
    # }

--- config
    location /t/ {
        proxy_read_timeout 500s;
        proxy_send_timeout 500s;
        proxy_pass http://127.0.0.1:8082;
    }

--- pipelined_requests eval
["GET /t/label/1", "GET /t/label/2", "GET /t/label/3"]
--- more_headers
Connection: keep-alive
--- no_error_log
[error]
--- timeout: 20

http请求到达时先调用ngx_http_init_connection函数初始化请求结构体, 接着接受请求行,调用ngx_http_process_request_line函数处理请求行。 接着接受请求头,调用ngx_http_process_request_headers函数处理请求头。注意到,请求头有个长连接标志的头Connection: keep-alive ,调用了下面的回调函数处理。


static ngx_int_t
ngx_http_process_connection(ngx_http_request_t *r, ngx_table_elt_t *h,
    ngx_uint_t offset)
{
    if (ngx_strcasestrn(h->value.data, "close", 5 - 1)) {
        r->headers_in.connection_type = NGX_HTTP_CONNECTION_CLOSE;

    } else if (ngx_strcasestrn(h->value.data, "keep-alive", 10 - 1)) {
        r->headers_in.connection_type = NGX_HTTP_CONNECTION_KEEP_ALIVE;
    }

    return NGX_OK;
}

处理完请求头后,调用ngx_http_process_request函数,在该函数中有如下代码:

    c->read->handler = ngx_http_request_handler;
    c->write->handler = ngx_http_request_handler;
    r->read_event_handler = ngx_http_block_reading;

可读函数应用层回调函数被赋值为ngx_http_block_reading,从字面意思就是阻止再读,就是不读请求内容了。 接着调用了ngx_http_handler函数,该函数中根据r->headers_in.connection_type的值赋值r->keepalive = 1; 接着调用了ngx_http_core_run_phases函数处理nginx的10个阶段,CONTENT阶段会调用ngx_http_core_content_phase回调函数,该函数有如下代码:

    if (r->content_handler) {
        r->write_event_handler = ngx_http_request_empty_handler;
        ngx_http_finalize_request(r, r->content_handler(r));
        return NGX_OK;
    }

而r->content_handler是在FIND_CONFIG阶段的回调函数ngx_http_core_find_config_phase中调用了ngx_http_update_location_config函数,该函数中会根据ua浏览器和配置决定是否保持长连接,并赋值r->content_handler = clcf->handler;反向代理模式,该handler被赋值为ngx_http_proxy_handler函数,该函数有下面调用:

    rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);

先调用ngx_http_read_client_request_body读取请求body,然后调用ngx_http_upstream_init初始化上游连接调用ngx_http_upstream_send_request发送请求,向上游发送完请求后,上游连接可读时,传输层回调函数ngx_http_upstream_handler被调用,从而应用层函数ngx_http_upstream_process_header被调用,处理完相应头后,调用 ngx_http_upstream_send_response发送响应。 发送完毕后返回到了ngx_http_core_content_phase函数中的,ngx_http_finalize_request(r, r->content_handler(r));代码处。 然后调用了ngx_http_finalize_connection,该函数有如下代码:

    if (!ngx_terminate
         && !ngx_exiting
         && r->keepalive
         && clcf->keepalive_timeout > 0)
    {
        ngx_http_set_keepalive(r);
        return;
    }

接下来,重点看下ngx_http_set_keepalive函数:


static void
ngx_http_set_keepalive(ngx_http_request_t *r)
{
    int                        tcp_nodelay;
    ngx_buf_t                 *b, *f;
    ngx_chain_t               *cl, *ln;
    ngx_event_t               *rev, *wev;
    ngx_connection_t          *c;
    ngx_http_connection_t     *hc;
    ngx_http_core_loc_conf_t  *clcf;

    c = r->connection;
    rev = c->read;

    clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "set http keepalive handler");

    if (r->discard_body) {
        r->write_event_handler = ngx_http_request_empty_handler;
        r->lingering_time = ngx_time() + (time_t) (clcf->lingering_time / 1000);
        ngx_add_timer(rev, clcf->lingering_timeout);
        return;
    }

    c->log->action = "closing request";

    hc = r->http_connection;
    b = r->header_in;

    // 读回到pipeline下一个请求的内容
    if (b->pos < b->last) {

        /* the pipelined request */

        if (b != c->buffer) {  // 处理当前请求时重新分配过内存

            /*
             * If the large header buffers were allocated while the previous
             * request processing then we do not use c->buffer for
             * the pipelined request (see ngx_http_create_request()).
             *
             * Now we would move the large header buffers to the free list.
             */

            for (cl = hc->busy; cl; /* void */) {
                ln = cl;
                cl = cl->next;

                if (ln->buf == b) {
                    ngx_free_chain(c->pool, ln);
                    continue;
                }

                f = ln->buf;
                f->pos = f->start;
                f->last = f->start;

                ln->next = hc->free;
                hc->free = ln;
            }

            cl = ngx_alloc_chain_link(c->pool);
            if (cl == NULL) {
                ngx_http_close_request(r, 0);
                return;
            }

            cl->buf = b;
            cl->next = NULL;

            hc->busy = cl;
            hc->nbusy = 1;
        }
    }

    /* guard against recursive call from ngx_http_finalize_connection() */
    r->keepalive = 0;

    ngx_http_free_request(r, 0);

    c->data = hc;

    // 传输层可读事件加入epoll监听起来
    if (ngx_handle_read_event(rev, 0) != NGX_OK) {
        ngx_http_close_connection(c);
        return;
    }

    wev = c->write;
    wev->handler = ngx_http_empty_handler;

    if (b->pos < b->last) {
        // 当前已读到了下个请求的内容了
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "pipelined request");

        c->log->action = "reading client pipelined request line";

        r = ngx_http_create_request(c);
        if (r == NULL) {
            ngx_http_close_connection(c);
            return;
        }

        r->pipeline = 1;

        c->data = r;

        c->sent = 0;
        c->destroyed = 0;

        if (rev->timer_set) {
            ngx_del_timer(rev);
        }

        rev->handler = ngx_http_process_request_line;
        ngx_post_event(rev, &ngx_posted_events);
        return;
    }

    /*
     * To keep a memory footprint as small as possible for an idle keepalive
     * connection we try to free c->buffer's memory if it was allocated outside
     * the c->pool.  The large header buffers are always allocated outside the
     * c->pool and are freed too.
     */

    b = c->buffer;

    if (ngx_pfree(c->pool, b->start) == NGX_OK) {

        /*
         * the special note for ngx_http_keepalive_handler() that
         * c->buffer's memory was freed
         */

        b->pos = NULL;

    } else {
        b->pos = b->start;
        b->last = b->start;
    }

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0, "hc free: %p",
                   hc->free);

    if (hc->free) {
        for (cl = hc->free; cl; /* void */) {
            ln = cl;
            cl = cl->next;
            ngx_pfree(c->pool, ln->buf->start);
            ngx_free_chain(c->pool, ln);
        }

        hc->free = NULL;
    }

    ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0, "hc busy: %p %i",
                   hc->busy, hc->nbusy);

    if (hc->busy) {
        for (cl = hc->busy; cl; /* void */) {
            ln = cl;
            cl = cl->next;
            ngx_pfree(c->pool, ln->buf->start);
            ngx_free_chain(c->pool, ln);
        }

        hc->busy = NULL;
        hc->nbusy = 0;
    }

#if (NGX_HTTP_SSL)
    if (c->ssl) {
        ngx_ssl_free_buffer(c);
    }
#endif

    // 下个请求还没收到
    rev->handler = ngx_http_keepalive_handler;

    if (wev->active && (ngx_event_flags & NGX_USE_LEVEL_EVENT)) {
        if (ngx_del_event(wev, NGX_WRITE_EVENT, 0) != NGX_OK) {
            ngx_http_close_connection(c);
            return;
        }
    }

    c->log->action = "keepalive";

    if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
        if (ngx_tcp_push(c->fd) == -1) {
            ngx_connection_error(c, ngx_socket_errno, ngx_tcp_push_n " failed");
            ngx_http_close_connection(c);
            return;
        }

        c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;
        tcp_nodelay = ngx_tcp_nodelay_and_tcp_nopush ? 1 : 0;

    } else {
        tcp_nodelay = 1;
    }

    if (tcp_nodelay && clcf->tcp_nodelay && ngx_tcp_nodelay(c) != NGX_OK) {
        ngx_http_close_connection(c);
        return;
    }

#if 0
    /* if ngx_http_request_t was freed then we need some other place */
    r->http_state = NGX_HTTP_KEEPALIVE_STATE;
#endif

    c->idle = 1;
    ngx_reusable_connection(c, 1);

    // 和客户端的长连接保持的时间,keepalive_timeout配置指定。
    ngx_add_timer(rev, clcf->keepalive_timeout);

    if (rev->ready) {
        ngx_post_event(rev, &ngx_posted_events);
    }
}

总结

一条tcp连接上,发送多个请求,只有在当前请求处理完毕正确返回后,保持住长连接,才会处理下一个请求。