注册 登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

偶有所得,记录在此

有分享交流才有进步,永远不要固步自封

 
 
 

日志

 
 

[Nginx 源码分析] Fastcgi 模块(下)  

2011-01-25 15:33:40|  分类: 默认分类 |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
本文主要分析从upstream 读取数据、数据报文解析和向客户端输出这三部分功能。

从 upstream 读取的 FASTCGI 报文主要有三种:
  • upstream 处理正常的话,返回 FCGI_STDOUT 报文
  • 有错误,则返回 FASTCGI_STDERR  报文。
  • 结尾会有个 FCGI_END_REQUEST 报文

将 upstream 请求报文发送出去后,Nginx 会随即绑定 read event,等待处理 upstream 的返回。
u->read_event_handler = ngx_http_upstream_process_header

Http Header 部分处理流程大概如下:
1)  调用 c->recv 从 upstream 读取数据。
2)  调用 u->process_header 处理 Fastcgi 报文。process_header 为函数指针,指向 ngx_http_fastcgi_process_header。
3)  ngx_http_fastcgi_process_header 解析完报文,调用 ngx_http_parse_header_line 解析 HTTP 报文头。

首先是ngx_http_upstream_process_header

static void
ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    // ...
    for ( ;; ) {

        // 从 upstream 读取数据
        n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last);

        if (n == NGX_AGAIN) {
#if 0
            ngx_add_timer(rev, u->read_timeout);
#endif

            // 未完成,待下次继续读取
            if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
                ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }

            return;
        }

        if (n == 0) {
            ngx_log_error(NGX_LOG_ERR, c->log, 0,
                          "upstream prematurely closed connection");
        }

        if (n == NGX_ERROR || n == 0) {
            ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
            return;
        }

        u->buffer.last += n;

#if 0
        u->valid_header_in = 0;

        u->peer.cached = 0;
#endif

        // 处理 fastcgi 报文
        // 这里实际调用的是 ngx_http_fastcgi_process_header
        rc = u->process_header(r);

        // Fastcgi 报文未完,继续读
        if (rc == NGX_AGAIN) {

            if (u->buffer.pos == u->buffer.end) {
                ngx_log_error(NGX_LOG_ERR, c->log, 0,
                              "upstream sent too big header");

                ngx_http_upstream_next(r, u,
                                       NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
                return;
            }

            continue;
        }

        break;
    }
    // ....
   
    // copy http header 到 headers_out 域
    if (ngx_http_upstream_process_headers(r, u) != NGX_OK) {
        return;
    }
    // ...
    // 解析完 HTTP 报文头,开始向客户端发送 response,后面详细分析
    if (!r->subrequest_in_memory) {
        ngx_http_upstream_send_response(r, u);
        return;
    }


    // ...

处理 FASTCGI 报文的状态机
该状态机有以下几种状态:
typedef enum {
    ngx_http_fastcgi_st_version = 0,
    ngx_http_fastcgi_st_type,
    ngx_http_fastcgi_st_request_id_hi,
    ngx_http_fastcgi_st_request_id_lo,
    ngx_http_fastcgi_st_content_length_hi,
    ngx_http_fastcgi_st_content_length_lo,
    ngx_http_fastcgi_st_padding_length,
    ngx_http_fastcgi_st_reserved,
    ngx_http_fastcgi_st_data,
    ngx_http_fastcgi_st_padding
} ngx_http_fastcgi_state_e;
FASTCGI Header 报文的每个字段都算一个状态。

该状态机具体实现如下:
static ngx_int_t
ngx_http_fastcgi_process_record(ngx_http_request_t *r,
    ngx_http_fastcgi_ctx_t *f)
{
    u_char                     ch, *p;
    ngx_http_fastcgi_state_e   state;

    state = f->state;
    for (p = f->pos; p < f->last; p++) {

        ch = *p;
        switch (state) {

        case ngx_http_fastcgi_st_version:
            if (ch != 1) {
                ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                              "upstream sent unsupported FastCGI "
                              "protocol version: %d", ch);
                return NGX_ERROR;
            }
            // 转向 ngx_http_fastcgi_st_type 状态
            state = ngx_http_fastcgi_st_type;
            break;

        case ngx_http_fastcgi_st_type:
            switch (ch) {
            case NGX_HTTP_FASTCGI_STDOUT:
            case NGX_HTTP_FASTCGI_STDERR:
            case NGX_HTTP_FASTCGI_END_REQUEST:
                 // 保存下报文类型
                 f->type = (ngx_uint_t) ch;
                 break;
            default:
                ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                              "upstream sent invalid FastCGI "
                              "record type: %d", ch);
                return NGX_ERROR;

            }
            // 进入 ngx_http_fastcgi_st_request_id_hi 状态
            state = ngx_http_fastcgi_st_request_id_hi;
            break;
         // ...
        
         case ngx_http_fastcgi_st_reserved:
            state = ngx_http_fastcgi_st_data;

            ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                           "http fastcgi record length: %z", f->length);

            // 如果到 ngx_http_fastcgi_st_reserved 状态,则下一个是 data 域,返回 ok。
            f->pos = p + 1;
            f->state = state;

            return NGX_OK;
该自动机处理完成后,可以得到该 FASTCGI 报文的类型,data 域的长度,data 域的开始位置。

ngx_http_fastcgi_process_header 函数
static ngx_int_t
ngx_http_fastcgi_process_header(ngx_http_request_t *r)
{
    //............
    f = ngx_http_get_module_ctx(r, ngx_http_fastcgi_module);

    umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module);

    u = r->upstream;

    for ( ;; ) {

        // 调用上面的状态机分析 Fastcgi Header,最终得到报文类型、长度、data 域的起始位置
        if (f->state < ngx_http_fastcgi_st_data) {

            f->pos = u->buffer.pos;
            f->last = u->buffer.last;

            rc = ngx_http_fastcgi_process_record(r, f);

            u->buffer.pos = f->pos;
            u->buffer.last = f->last;

            if (rc == NGX_AGAIN) {
                return NGX_AGAIN;
            }
            //....
         }
         // ...
         // 忽略 FASTCGI_STDERR 报文处理,只分析 FASTCGI_STDOUT
        f->fastcgi_stdout = 1;

        start = u->buffer.pos;

        // 下面英文注释写的很清楚
        if (u->buffer.pos + f->length < u->buffer.last) {

            /*
             * set u->buffer.last to the end of the FastCGI record data
             * for ngx_http_parse_header_line()
             */

            last = u->buffer.last;
            u->buffer.last = u->buffer.pos + f->length;

        } else {
            last = NULL;
        }

        //开始解析 HTTP header
        for ( ;; ) {

            part_start = u->buffer.pos;
            part_end = u->buffer.last;

            // ngx_http_parse_header_line 又是一个状态机
            // 解析出 Header 的 name 和 value 这两部分的 pos 和 length
            rc = ngx_http_parse_header_line(r, &u->buffer, 1);
            if (rc == NGX_AGAIN) {
                break;
            }

            // 解析 HTTP Header 成功
            if (rc == NGX_OK) {

                /* a header line has been parsed successfully */

                // 创建一个 ngx_table_elt_t 来存储解析的 Header
                h = ngx_list_push(&u->headers_in.headers);
                if (h == NULL) {
                    return NGX_ERROR;
                }
                // ...
                    //... 设置 header key 和 value 的值
                    h->key.len = r->header_name_end - r->header_name_start;
                    h->value.len = r->header_end - r->header_start;

                    // 为 key 和 value 分配存储空间
                    // key 和 value 连续存储
                    h->key.data = ngx_pnalloc(r->pool,
                                              h->key.len + 1 + h->value.len + 1
                                              + h->key.len);
                    if (h->key.data == NULL) {
                        return NGX_ERROR;
                    }

                    h->value.data = h->key.data + h->key.len + 1;
                    h->lowcase_key = h->key.data + h->key.len + 1
                                     + h->value.len + 1;

                    // 复制 key 和 value 数据
                    ngx_cpystrn(h->key.data, r->header_name_start,
                                h->key.len + 1);
                    ngx_cpystrn(h->value.data, r->header_start,
                                h->value.len + 1);
                    //...
                   
                 // ...
                hh = ngx_hash_find(&umcf->headers_in_hash, h->hash,
                        h->lowcase_key, h->key.len);

                // 设置 r->headers_in 中该 field(key) 域的值
                // hh->handler 函数指针,具体每个 header 的实现,在 ngx_http_upstream.c 中
                if (hh && hh->handler(r, h, hh->offset) != NGX_OK) {
                    return NGX_ERROR;
                }
                //...
            }
          
            // 解析 HTTP Header 结束
            // 根据结果设置 http status 和 status_line
            if (rc == NGX_HTTP_PARSE_HEADER_DONE) {

                /* a whole header has been parsed successfully */

                ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                               "http fastcgi header done");

                // 根据结果设置 http status 和 status_line
                if (u->headers_in.status) {
                    status_line = &u->headers_in.status->value;

                    status = ngx_atoi(status_line->data, 3);

                    if (status == NGX_ERROR) {
                        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                                      "upstream sent invalid status \"%V\"",
                                      status_line);
                        return NGX_HTTP_UPSTREAM_INVALID_HEADER;
                    }

                    u->headers_in.status_n = status;
                    u->headers_in.status_line = *status_line;

                } else if (u->headers_in.location) {
                    u->headers_in.status_n = 302;
                    ngx_str_set(&u->headers_in.status_line,
                                "302 Moved Temporarily");

                } else {
                    u->headers_in.status_n = 200;
                    ngx_str_set(&u->headers_in.status_line, "200 OK");
                }

                if (u->state) {
                    u->state->status = u->headers_in.status_n;
                }

                break;
            }
        }
        //....
       
        // 设置 last 的值
        if (last) {
            u->buffer.last = last;
        }

        f->length -= u->buffer.pos - start;

        if (f->length == 0) {
            if (f->padding) {
                f->state = ngx_http_fastcgi_st_padding;
            } else {
                f->state = ngx_http_fastcgi_st_version;
            }
        }

        // 如果解析 HTTP Header 结束,直接返回
        if (rc == NGX_HTTP_PARSE_HEADER_DONE) {
            return NGX_OK;
        }

        if (rc == NGX_OK) {
            continue;
        }
Header 解析部分到此结束。

向客户端发送 Response 部分
上面分析过的 ngx_http_upstream_process_header 函数最后,调用 ngx_http_upstream_send_response 开始向客户端发送 Response。

Response 的发送,分两部分进行。首先调用 ngx_http_send_header 发送 http header;然后建立一个 nginx event pipe,一边从 upstream 读取数据,一边调用 output filter ,向客户端写入。

先看 ngx_http_upstream_send_response 函数
static void
ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    //...
    // 先发送 http header,进入 ngx_http_top_header_filter 链
    rc = ngx_http_send_header(r);
    // ...
    u->header_sent = 1;
    // ...
    p = u->pipe;
   
    // 设置好 ngx_http_output_filter 链,准备处理 upstream body
    p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter;
    p->output_ctx = r;
    p->tag = u->output.tag;
    p->bufs = u->conf->bufs;
    p->busy_size = u->conf->busy_buffers_size;
    p->upstream = u->peer.connection;
    p->downstream = c;
    p->pool = r->pool;
    p->log = c->log;
    // ...
   
    // 设置 read event 来读取 upstream 数据
    u->read_event_handler = ngx_http_upstream_process_upstream;
   
    // 设置 write event 来处理 upstream 返回的数据,并进行 output_filter 系列处理输出
    r->write_event_handler = ngx_http_upstream_process_downstream;

    // 先从 upstream 读取数据
    ngx_http_upstream_process_upstream(r, u);
}

ngx_http_upstream_process_upstream 和
ngx_http_upstream_process_downstream 最终都是调用 ngx_event_pipe 这个函数进行读取和写入的,下面分析下 ngx_event_pipe。
ngx_int_t
ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
{
    u_int         flags;
    ngx_int_t     rc;
    ngx_event_t  *rev, *wev;

    for ( ;; ) {
        // do_write 为 1,处理写事件
        if (do_write) {
            p->log->action = "sending to client";

            // 进入 output_filter 链,向客户端写数据
            rc = ngx_event_pipe_write_to_downstream(p);

            if (rc == NGX_ABORT) {
                return NGX_ABORT;
            }

            if (rc == NGX_BUSY) {
                return NGX_OK;
            }
        }

        // do_write 为 0,处理读事件,从 upstream 读数据
        p->read = 0;
        p->upstream_blocked = 0;

        p->log->action = "reading upstream";

        // 从 upstream 读数据
        if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
            return NGX_ABORT;
        }

        if (!p->read && !p->upstream_blocked) {
            break;
        }

        do_write = 1;
    }

    // 如果 upstream 未准备好,设置 read event,待下次读
    if (p->upstream->fd != -1) {
        rev = p->upstream->read;

        flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;

        if (ngx_handle_read_event(rev, flags) != NGX_OK) {
            return NGX_ABORT;
        }

        if (rev->active && !rev->ready) {
            ngx_add_timer(rev, p->read_timeout);

        } else if (rev->timer_set) {
            ngx_del_timer(rev);
        }
    }

    // 设置 write event
    if (p->downstream->fd != -1 && p->downstream->data == p->output_ctx) {
        wev = p->downstream->write;
        if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
            return NGX_ABORT;
        }

        if (!wev->delayed) {
            if (wev->active && !wev->ready) {
                ngx_add_timer(wev, p->send_timeout);

            } else if (wev->timer_set) {
                ngx_del_timer(wev);
            }
        }
    }

    return NGX_OK;
}

ngx_event_pipe_read_upstream 做很多 buffer 分配和管理工作,最终调用下面函数完成读取
n = p->upstream->recv_chain(p->upstream, chain);

ngx_event_pipe_write_to_downstream 最终调用 output_filter,进入 filter 链,进行输出
rc = p->output_filter(p->output_ctx, out);

----
感谢代码发芽网提供的语法高亮服务。
  评论这张
 
阅读(4230)| 评论(0)
推荐 转载

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2017