Permalink: 2014-07-23 09:37:30 by ning in redis tags: all

错误处理是一个系统里面重要而容易忽略的环节, 我们读代码比写代码简单, 很大程度上就是因为读代码时基本不用考虑错误处理逻辑.

twemproxy中, 主要的错误有:

  • 超内存 需要处理的地方最多
  • 连接后端错误
  • server/cliet连接断掉
  • msg 超时
  • 后端返回错误(E)

1   如何处理错误

两种错误处理:

  1. 关连接:
    • 比如内存不够.
  2. 向客户读返回一个 Err response:
    • 能识别的逻辑错误, 比如连后端失败.

我们先看conn上的几个回调

client:

    conn->recv = msg_recv;
    conn->recv_next = req_recv_next;
    conn->recv_done = req_recv_done;

    conn->send = msg_send;
    conn->send_next = rsp_send_next;
    conn->send_done = rsp_send_done;
    conn->close = client_close;
    conn->active = client_active;

server:

    conn->recv = msg_recv;
    conn->recv_next = rsp_recv_next;
    conn->recv_done = rsp_recv_done;

    conn->send = msg_send;
    conn->send_next = req_send_next;
    conn->send_done = req_send_done;

    conn->close = server_close;
    conn->active = server_active;

最外层, 如果core_recv/core_send返回 非NC_Ok, 或者设置了 conn->done , conn->err , 直接关连接:

rstatus_t
core_core(void *arg, uint32_t events)
{
    if (events & EVENT_ERR) {
        core_error(ctx, conn);
        return NC_ERROR;
    }

    if (events & EVENT_READ) {
        status = core_recv(ctx, conn);
        if (status != NC_OK || conn->done || conn->err) {
            core_close(ctx, conn);
            return NC_ERROR;
        }
    }

    if (events & EVENT_WRITE) {
        status = core_send(ctx, conn);
        if (status != NC_OK || conn->done || conn->err) {
            core_close(ctx, conn);
            return NC_ERROR;
        }
    }

    return NC_OK;
}

这里:

  • core_recv 就是 conn->recv
  • core_send 就是 conn->send
  • core_close 会调用 server_close 或者 client_close

1.1   client_close

  1. client_close_stats

  2. conn->unref (暂时不管)

  3. 如果 conn->rmsg 存在, 说明连接的读缓冲区有内容未处理, 把这个rmsg 丢掉

  4. conn->smsg, conn->imsg_q 应该是为空的 conn 用rmsg做读缓冲, 读到了就放到server_conn->omsg_q 里面, 所以clietn_conn->smsg没用, client_conn->imsg_q 也是完全没用 参考 |filename|/notes/redis/twemproxy.rst

  5. 如果 conn->omsg_q 里面有消息, 这些消息已经发给后端了, 正在等待后端响应:

    for (msg = TAILQ_FIRST(&conn->omsg_q); msg != NULL; msg = nmsg) {
        nmsg = TAILQ_NEXT(msg, c_tqe);
    
        conn->dequeue_outq(ctx, conn, msg);
    
        if (msg->done) {
            //日志: 这个消息的rsp被我丢了.
            req_put(msg);
        } else {
            msg->swallow = 1;
    
            ASSERT(msg->request);
            ASSERT(msg->peer == NULL); //响应还没回来
    
            //日志: 这个消息被我标记为 swallow
            log_debug(LOG_INFO, "close c %d schedule swallow of req %"PRIu64" "
                      "len %"PRIu32" type %d", conn->sd, msg->id, msg->mlen,
                      msg->type);
        }
    }
    

至于 swallow, 英语意思是 吞没, 只有在上面这种情况下, 才会设置swallow标志, 对于这样的消息, 它的rsp回来之后, 就不应该转给client了, 而直接丢掉就行:

static bool
rsp_filter(struct context *ctx, struct conn *conn, struct msg *msg)
{
    pmsg = TAILQ_FIRST(&conn->omsg_q); //这是req, 至于为什么用 TAILQ_FIRST, 此时还没有设置msg->peer.

    ASSERT(pmsg->peer == NULL);
    ASSERT(pmsg->request && !pmsg->done);
    if (pmsg->swallow) {
        conn->dequeue_outq(ctx, conn, pmsg);
        pmsg->done = 1;

        log_debug(LOG_INFO, "swallow rsp %"PRIu64" len %"PRIu32" of req "
                  "%"PRIu64" on s %d", msg->id, msg->mlen, pmsg->id,
                  conn->sd);

        rsp_put(msg);
        req_put(pmsg);
        return true;
    }
}

另外在server_close的时候, 对server_conn->imsg_q 和server_conn->omsg_q, 如果有swallow req, 也直接丢掉(见server_close).

1.2   server_close

  1. server_close_stats

  2. conn->unref(conn)

  3. 把server_conn中 的imsg_q 和omsg_q 中的消息 设置

    msg->done = 1
    msg->error = 1
    msg->err = conn->err;
    
  4. 检查当前 c_conn上有没有完成的req, 给返回(因为这里对一些req设置了msg->done, 所以有可能有完成的req):

    if (raeq_done(c_conn, TAILQ_FIRST(&c_conn->omsg_q))) {
        event_add_out(ctx->evb, msg->owner);
    }
    

2   各个回调中应该怎么返回错误和错误检查

参看这个神图:

*
*             Client+             Proxy           Server+
*                              (nutcracker)
*                                   .
*       msg_recv {read event}       .       msg_recv {read event}
*         +                         .                         +
*         |                         .                         |
*         \                         .                         /
*         req_recv_next             .             rsp_recv_next
*           +                       .                       +
*           |                       .                       |       Rsp
*           req_recv_done           .           rsp_recv_done      <===
*             +                     .                     +
*             |                     .                     |
*    Req      \                     .                     /
*    ===>     req_filter*           .           *rsp_filter
*               +                   .                   +
*               |                   .                   |
*               \                   .                   /
*               req_forward-//  (a) . (c)  \\-rsp_forward
*                                   .
*                                   .
*       msg_send {write event}      .      msg_send {write event}
*         +                         .                         +
*         |                         .                         |
*    Rsp' \                         .                         /     Req'
*   <===  rsp_send_next             .             req_send_next     ===>
*           +                       .                       +
*           |                       .                       |
*           \                       .                       /
*           rsp_send_done-//    (d) . (b)    //-req_send_done
*
*
* (a) -> (b) -> (c) -> (d) is the normal flow of transaction consisting
* of a single request response, where (a) and (b) handle request from
* client, while (c) and (d) handle the corresponding response from the
* server.

2.1   (a)client->proxy 路径

我们看core_recv 的调用链, 可能从什么地方返回这个错误码.

  • core_recv()

  • conn->recv(), 即msg_recv

  • msg_recv_chain(),
    • 无内存: return NC_ENOMEM;
    • 读返回EAGAIN: return NC_OK,
    • 读出错: return NC_ERROR
    • 返回msg_parse()
  • msg_parse(), 调用msg->parser(msg),
    • 如果解析到一条消息(MSG_PARSE_OK), 返回msg_parsed()

    • 如果需要REPAIR, 返回msg_repair()
      • msg_repair() 就是简单split一下, 可能返回NC_ENOMEM.
    • 如果parser出错, 函数里面同时设置 conn->err, 并且返回 NC_ERROR

      static rstatus_t
      msg_parse(struct context *ctx, struct conn *conn, struct msg *msg)
      {
          msg->parser(msg);
      
          switch (msg->result) {
          case MSG_PARSE_OK:
              status = msg_parsed(ctx, conn, msg);
              break;
      
          case MSG_PARSE_REPAIR:
              status = msg_repair(ctx, conn, msg);
              break;
      
          case MSG_PARSE_AGAIN:
              status = NC_OK;
              break;
      
          default:
              status = NC_ERROR;
              conn->err = errno;
              break;
          }
      
          return conn->err != 0 ? NC_ERROR : status;
      }
      
    • 所以 msg->parser() 函数里面, 如果要返回错误, 是通过msg->result设置为一个错误码做的:

      r->result = MSG_PARSE_ERROR;
      
  • 接下来的重点是 msg_parsed() , 他的返回值也会直接返回到core_core:

     static rstatus_t
     msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg)
     {
         conn->recv_done(ctx, conn, msg, nmsg);
    
         return NC_OK;
     }
    
    这里挺重要, 这个函数虽然调用了conn->recv_done, 但是直接返回NC_OK, 而没有管conn->recv_done的返回值, 实际上, conn->recv_done() 没有返回值::
    
     conn_recv_done_t   recv_done;     /* read done handler */
     typedef void (*conn_recv_done_t)(struct context *, struct conn *, struct msg *, struct msg *);
    
  • 到这里return链断了, 下面只能通过设置conn->err来表示错误 .

  • req_recv_done调用:
    • req_forward (同样是一个void函数) TODO: 这里在fragement里面, 我需要处理错误, 处理方法是设置c_conn->err, 这还不好处理..
  • req_forward 如果出错, 是通过 req_forward_error 来告诉客户端的:

    static void
    req_forward(struct context *ctx, struct conn *c_conn, struct msg *msg)
    {
        s_conn = server_pool_conn(ctx, c_conn->owner, key, keylen);
        if (s_conn == NULL) {
            req_forward_error(ctx, c_conn, msg);                //这里并不会设置c_conn->err, 这时如果设置c_conn->err, 就会直接关连接,
                                                                //这里的处理方法是: 设置msg->err, 于是后面会构造一个ERR rsp返回给客户端, 这并不算一个连接错误.
            return;
        }
    
        c_conn->enqueue_outq(ctx, c_conn, msg);                 //简单队列操作.
        s_conn->enqueue_inq(ctx, s_conn, msg);
    
        if (TAILQ_EMPTY(&s_conn->imsg_q)) {
            status = event_add_out(ctx->evb, s_conn);
            if (status != NC_OK) {
                req_forward_error(ctx, c_conn, msg);            //这里会告诉客户端出错.
                s_conn->err = errno;                            //这里设置 s_conn->err, 关掉后端连接(什么时候处理 这里的s_conn->err????? 还有机会有事件么?)
                return;
            }
        }
    
  • req_forward_error 需要标记msg->err, 此时消息未被转发到后端, 此时为了告诉客户端出错, 需要:

    1. 在c_conn上开始 ev_out 事件
    2. 可写时, 构造一个ERR rsp返回给客户端.

    具体过程如下:

    static void
    req_forward_error(struct context *ctx, struct conn *conn, struct msg *msg)
    {
        rstatus_t status;
    
        ASSERT(conn->client && !conn->proxy);
    
        msg->done = 1;
        msg->error = 1;
        msg->err = errno;  //(如果内存分配失败, errno会被设为12, 对应错误消息'Cannot allocate memory')
    
        if (req_done(conn, TAILQ_FIRST(&conn->omsg_q))) {
            status = event_add_out(ctx->evb, conn);
            if (status != NC_OK) {
                conn->err = errno;
            }
        }
    }
    

到这里, 这个cliet->proxy 的过程所有的出错路径都排查了.

2.3   (c)backend->proxy

这是另一个recv路径, 前半部分和(a)路径基本一样:

  • core_recv()

  • conn->recv(), 即msg_recv

  • msg_recv_chain(),

  • msg_parse(), 调用msg->parser(msg),

  • msg_parsed()

  • 到这里return链断了, 下面只能通过设置conn->err来表示错误 .

  • rsp_recv_done() 和rsp_forward():

    static void
    rsp_forward(struct context *ctx, struct conn *s_conn, struct msg *msg)
    {
        pmsg->peer = msg;
        msg->peer = pmsg;
    
        msg->pre_coalesce(msg);                                         //void函数.
    
        c_conn = pmsg->owner;
        ASSERT(c_conn->client && !c_conn->proxy);
    
        if (req_done(c_conn, TAILQ_FIRST(&c_conn->omsg_q))) {           //这里调用req_done.
            status = event_add_out(ctx->evb, c_conn);
            if (status != NC_OK) {
                c_conn->err = errno;
            }
        }
    }
    
  • 这里rsp_forward比req_forward复杂, req_forward只是简单的dequeue_outq, enqueue_inq, event_add_out, 并没有太多的寄回设置conn->err.

    rsp_forward则可能在pre_coalesce(), req_done(), post_coalesce() 函数里面设置.

  • pre_coalesce():

    void
    redis_pre_coalesce(struct msg *r)
    {
        pr->frag_owner->nfrag_done++;
    
        switch (r->type) {
        case MSG_RSP_REDIS_INTEGER:
            xxx;
        case MSG_RSP_REDIS_MULTIBULK:
            xxx;
        case MSG_RSP_REDIS_STATUS:
            xxx;
        default:
            mbuf = STAILQ_FIRST(&r->mhdr);
            log_hexdump(LOG_ERR, mbuf->pos, mbuf_length(mbuf), "rsp fragment "
                        "with unknown type %d", r->type);
            pr->error = 1;
            pr->err = EINVAL;       //这里都只是设置了msg->err.
            break;
        }
    }
    
  • 在mget-improve之前, req_done() post_coalesce 都不会做什么设置. TODO: 这里的post_coalesce我需要处理错误, 处理方法应该类似pre_coalesce, 设置msg->error, msg->err

2.4   (d)proxy->client

接(c), rsp_forward 会在c_conn上开启out事件:

status = event_add_out(ctx->evb, c_conn);

当c_conn可写时, 会触发 msg_send->rsp_send_next->rsp_send_done 路径.

这里前几步和 (a) 也一样, 出错了直接通过返回值返回:

  • msg_send 可能调用msg_send_chain 和rsp_send_next.

  • rsp_send_next:

    if (req_error(conn, pmsg)) { //这里也不会设置conn->err. 所以如果在msg层没有内存, 也会返回一个错误给客户端. 但是如果在rsp_make_error的时候再次没内存, 就只能关闭连接了.
        msg = rsp_make_error(ctx, conn, pmsg);
    }
    
  • msg_send_chain 可能通过返回值告诉最上层发生了错误 (直接调用conn_sendv):

    static rstatus_t
    msg_send_chain(struct context *ctx, struct conn *conn, struct msg *msg)
    {
        conn->smsg = NULL;
        n = conn_sendv(conn, &sendv, nsend);
    
        conn->send_done(ctx, conn, msg);
    
        if (n >= 0) {
            return NC_OK;
        }
        return (n == NC_EAGAIN) ? NC_OK : NC_ERROR;
    }
    - 这里可能调用rsp_send_done, 单此时已经不能设置什么错误了
    

3   小结

因为总的能逻辑就是 core_core 里面的这段逻辑:

status = core_recv(ctx, conn);
if (status != NC_OK || conn->done || conn->err) {
    core_close(ctx, conn);
    return NC_ERROR;
}

在处理请求中, 比如读msg的时候, 发现没有内存了, 或者解析出错了, 有两个方法返回错误:

  • 通过返回值在core_recv 返回一个非NC_OK值.
  • 设置conn->err 即可,

如果在forward到后端时, 后端连接失败, 则可以通过 req_forward_error 函数, 设置 msg->err, 这时在 rsp_send_next 中会用rsp_make_error生成一个err response返回给客户端.

  • 我们在forward时遇到内存不够, 调用req_forward_error就够了.
  • 收到response时遇到内存不够, 手动设置 msg->err就行.

3.1   可能设置conn->err的地方

  • parse失败.
  • 调nc_read() nc_writev(), server_connect() 出错.
  • 调用event_add_xx(), event_del_xx()
  • req_get()/rsp_get() 调用msg_get() 无内存返回NULL
  • rsp_make_error 无内存返回NULL
  • 发生timeout

3.2   可能通过return 方式返回错误

static rstatus_t
msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg)
{
    ...
    nmsg = msg_get(msg->owner, msg->request, conn->redis);
    if (nmsg == NULL) {
        mbuf_put(nbuf);
        return NC_ENOMEM;
    }

3.3   可能设置msg->err的地方

msg->ferror         #是否有frag错误.
msg->error          #错误标志, 0/1
msg->err = errno;   #errno.

比如memcache_pre_coalesce:

void
redis_pre_coalesce(struct msg *r)
{
    pr->frag_owner->nfrag_done++;

    switch (r->type) {
    case MSG_RSP_REDIS_INTEGER:
        xxx;
    case MSG_RSP_REDIS_MULTIBULK:
    if (pr->first_fragment) {
        mbuf = mbuf_get();
        if (mbuf == NULL) {
            pr->error = 1;
            pr->err = EINVAL;
            return;
        }
        STAILQ_INSERT_HEAD(&r->mhdr, mbuf, next);
    }
    case MSG_RSP_REDIS_STATUS:
        xxx;
    default:
        mbuf = STAILQ_FIRST(&r->mhdr);
        log_hexdump(LOG_ERR, mbuf->pos, mbuf_length(mbuf), "rsp fragment "
                    "with unknown type %d", r->type);
        pr->error = 1;
        pr->err = EINVAL;       //这里都只是设置了msg->err.
        break;
    }
}
void
redis_pre_coalesce(struct msg *r)
{
    if (pr->first_fragment) {
        mbuf = mbuf_get();
        if (mbuf == NULL) {
            pr->error = 1;
            pr->err = EINVAL;
            return;
        }
        STAILQ_INSERT_HEAD(&r->mhdr, mbuf, next);
    }
    ...
}

4   what i do

从前的msg_fragment是在msg_parsed里面, 还能通过return 返回错误. 因为作者认为在msg_parsed 之后, 消息必然在recv_done里面传给后端了, 不会出错了. 所以recv_done等函数都是void函数:

static rstatus_t
msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg)
{
    conn->recv_done(ctx, conn, msg, nmsg);

    return NC_OK;
}

TODO 3: 我的fragement也得放到这里的msg_parsed里面, 否则的话, 就得修改 recv_done的返回值了, 这个动作更大些.

Comments