Table of Contents
错误处理是一个系统里面重要而容易忽略的环节, 我们读代码比写代码简单, 很大程度上就是因为读代码时基本不用考虑错误处理逻辑.
twemproxy中, 主要的错误有:
两种错误处理:
我们先看conn上的几个回调
client: conn->recv = msg_recv; conn->recv_next = req_recv_next; conn->recv_done = req_recv_done; conn->send = msg_send; conn->send_next = rsp_send_next; conn->send_done = rsp_send_done; conn->close = client_close; conn->active = client_active; server: conn->recv = msg_recv; conn->recv_next = rsp_recv_next; conn->recv_done = rsp_recv_done; conn->send = msg_send; conn->send_next = req_send_next; conn->send_done = req_send_done; conn->close = server_close; conn->active = server_active;
最外层, 如果core_recv/core_send返回 非NC_Ok, 或者设置了 conn->done , conn->err , 直接关连接:
rstatus_t core_core(void *arg, uint32_t events) { if (events & EVENT_ERR) { core_error(ctx, conn); return NC_ERROR; } if (events & EVENT_READ) { status = core_recv(ctx, conn); if (status != NC_OK || conn->done || conn->err) { core_close(ctx, conn); return NC_ERROR; } } if (events & EVENT_WRITE) { status = core_send(ctx, conn); if (status != NC_OK || conn->done || conn->err) { core_close(ctx, conn); return NC_ERROR; } } return NC_OK; }
这里:
client_close_stats
conn->unref (暂时不管)
如果 conn->rmsg 存在, 说明连接的读缓冲区有内容未处理, 把这个rmsg 丢掉
conn->smsg, conn->imsg_q 应该是为空的 conn 用rmsg做读缓冲, 读到了就放到server_conn->omsg_q 里面, 所以clietn_conn->smsg没用, client_conn->imsg_q 也是完全没用 参考 |filename|/notes/redis/twemproxy.rst
如果 conn->omsg_q 里面有消息, 这些消息已经发给后端了, 正在等待后端响应:
for (msg = TAILQ_FIRST(&conn->omsg_q); msg != NULL; msg = nmsg) { nmsg = TAILQ_NEXT(msg, c_tqe); conn->dequeue_outq(ctx, conn, msg); if (msg->done) { //日志: 这个消息的rsp被我丢了. req_put(msg); } else { msg->swallow = 1; ASSERT(msg->request); ASSERT(msg->peer == NULL); //响应还没回来 //日志: 这个消息被我标记为 swallow log_debug(LOG_INFO, "close c %d schedule swallow of req %"PRIu64" " "len %"PRIu32" type %d", conn->sd, msg->id, msg->mlen, msg->type); } }
至于 swallow, 英语意思是 吞没, 只有在上面这种情况下, 才会设置swallow标志, 对于这样的消息, 它的rsp回来之后, 就不应该转给client了, 而直接丢掉就行:
static bool rsp_filter(struct context *ctx, struct conn *conn, struct msg *msg) { pmsg = TAILQ_FIRST(&conn->omsg_q); //这是req, 至于为什么用 TAILQ_FIRST, 此时还没有设置msg->peer. ASSERT(pmsg->peer == NULL); ASSERT(pmsg->request && !pmsg->done); if (pmsg->swallow) { conn->dequeue_outq(ctx, conn, pmsg); pmsg->done = 1; log_debug(LOG_INFO, "swallow rsp %"PRIu64" len %"PRIu32" of req " "%"PRIu64" on s %d", msg->id, msg->mlen, pmsg->id, conn->sd); rsp_put(msg); req_put(pmsg); return true; } }
另外在server_close的时候, 对server_conn->imsg_q 和server_conn->omsg_q, 如果有swallow req, 也直接丢掉(见server_close).
server_close_stats
conn->unref(conn)
把server_conn中 的imsg_q 和omsg_q 中的消息 设置
msg->done = 1 msg->error = 1 msg->err = conn->err;
检查当前 c_conn上有没有完成的req, 给返回(因为这里对一些req设置了msg->done, 所以有可能有完成的req):
if (raeq_done(c_conn, TAILQ_FIRST(&c_conn->omsg_q))) { event_add_out(ctx->evb, msg->owner); }
参看这个神图:
* * Client+ Proxy Server+ * (nutcracker) * . * msg_recv {read event} . msg_recv {read event} * + . + * | . | * \ . / * req_recv_next . rsp_recv_next * + . + * | . | Rsp * req_recv_done . rsp_recv_done <=== * + . + * | . | * Req \ . / * ===> req_filter* . *rsp_filter * + . + * | . | * \ . / * req_forward-// (a) . (c) \\-rsp_forward * . * . * msg_send {write event} . msg_send {write event} * + . + * | . | * Rsp' \ . / Req' * <=== rsp_send_next . req_send_next ===> * + . + * | . | * \ . / * rsp_send_done-// (d) . (b) //-req_send_done * * * (a) -> (b) -> (c) -> (d) is the normal flow of transaction consisting * of a single request response, where (a) and (b) handle request from * client, while (c) and (d) handle the corresponding response from the * server.
我们看core_recv 的调用链, 可能从什么地方返回这个错误码.
core_recv()
conn->recv(), 即msg_recv
如果解析到一条消息(MSG_PARSE_OK), 返回msg_parsed()
如果parser出错, 函数里面同时设置 conn->err, 并且返回 NC_ERROR
static rstatus_t msg_parse(struct context *ctx, struct conn *conn, struct msg *msg) { msg->parser(msg); switch (msg->result) { case MSG_PARSE_OK: status = msg_parsed(ctx, conn, msg); break; case MSG_PARSE_REPAIR: status = msg_repair(ctx, conn, msg); break; case MSG_PARSE_AGAIN: status = NC_OK; break; default: status = NC_ERROR; conn->err = errno; break; } return conn->err != 0 ? NC_ERROR : status; }
所以 msg->parser() 函数里面, 如果要返回错误, 是通过msg->result设置为一个错误码做的:
r->result = MSG_PARSE_ERROR;
接下来的重点是 msg_parsed() , 他的返回值也会直接返回到core_core:
static rstatus_t msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg) { conn->recv_done(ctx, conn, msg, nmsg); return NC_OK; } 这里挺重要, 这个函数虽然调用了conn->recv_done, 但是直接返回NC_OK, 而没有管conn->recv_done的返回值, 实际上, conn->recv_done() 没有返回值:: conn_recv_done_t recv_done; /* read done handler */ typedef void (*conn_recv_done_t)(struct context *, struct conn *, struct msg *, struct msg *);
到这里return链断了, 下面只能通过设置conn->err来表示错误 .
req_forward 如果出错, 是通过 req_forward_error 来告诉客户端的:
static void req_forward(struct context *ctx, struct conn *c_conn, struct msg *msg) { s_conn = server_pool_conn(ctx, c_conn->owner, key, keylen); if (s_conn == NULL) { req_forward_error(ctx, c_conn, msg); //这里并不会设置c_conn->err, 这时如果设置c_conn->err, 就会直接关连接, //这里的处理方法是: 设置msg->err, 于是后面会构造一个ERR rsp返回给客户端, 这并不算一个连接错误. return; } c_conn->enqueue_outq(ctx, c_conn, msg); //简单队列操作. s_conn->enqueue_inq(ctx, s_conn, msg); if (TAILQ_EMPTY(&s_conn->imsg_q)) { status = event_add_out(ctx->evb, s_conn); if (status != NC_OK) { req_forward_error(ctx, c_conn, msg); //这里会告诉客户端出错. s_conn->err = errno; //这里设置 s_conn->err, 关掉后端连接(什么时候处理 这里的s_conn->err????? 还有机会有事件么?) return; } }
req_forward_error 需要标记msg->err, 此时消息未被转发到后端, 此时为了告诉客户端出错, 需要:
- 在c_conn上开始 ev_out 事件
- 可写时, 构造一个ERR rsp返回给客户端.
具体过程如下:
static void req_forward_error(struct context *ctx, struct conn *conn, struct msg *msg) { rstatus_t status; ASSERT(conn->client && !conn->proxy); msg->done = 1; msg->error = 1; msg->err = errno; //(如果内存分配失败, errno会被设为12, 对应错误消息'Cannot allocate memory') if (req_done(conn, TAILQ_FIRST(&conn->omsg_q))) { status = event_add_out(ctx->evb, conn); if (status != NC_OK) { conn->err = errno; } } }
到这里, 这个cliet->proxy 的过程所有的出错路径都排查了.
这是另一个recv路径, 前半部分和(a)路径基本一样:
core_recv()
conn->recv(), 即msg_recv
msg_recv_chain(),
msg_parse(), 调用msg->parser(msg),
msg_parsed()
到这里return链断了, 下面只能通过设置conn->err来表示错误 .
rsp_recv_done() 和rsp_forward():
static void rsp_forward(struct context *ctx, struct conn *s_conn, struct msg *msg) { pmsg->peer = msg; msg->peer = pmsg; msg->pre_coalesce(msg); //void函数. c_conn = pmsg->owner; ASSERT(c_conn->client && !c_conn->proxy); if (req_done(c_conn, TAILQ_FIRST(&c_conn->omsg_q))) { //这里调用req_done. status = event_add_out(ctx->evb, c_conn); if (status != NC_OK) { c_conn->err = errno; } } }
这里rsp_forward比req_forward复杂, req_forward只是简单的dequeue_outq, enqueue_inq, event_add_out, 并没有太多的寄回设置conn->err.
rsp_forward则可能在pre_coalesce(), req_done(), post_coalesce() 函数里面设置.
pre_coalesce():
void redis_pre_coalesce(struct msg *r) { pr->frag_owner->nfrag_done++; switch (r->type) { case MSG_RSP_REDIS_INTEGER: xxx; case MSG_RSP_REDIS_MULTIBULK: xxx; case MSG_RSP_REDIS_STATUS: xxx; default: mbuf = STAILQ_FIRST(&r->mhdr); log_hexdump(LOG_ERR, mbuf->pos, mbuf_length(mbuf), "rsp fragment " "with unknown type %d", r->type); pr->error = 1; pr->err = EINVAL; //这里都只是设置了msg->err. break; } }
在mget-improve之前, req_done() post_coalesce 都不会做什么设置. TODO: 这里的post_coalesce我需要处理错误, 处理方法应该类似pre_coalesce, 设置msg->error, msg->err
接(c), rsp_forward 会在c_conn上开启out事件:
status = event_add_out(ctx->evb, c_conn);
当c_conn可写时, 会触发 msg_send->rsp_send_next->rsp_send_done 路径.
这里前几步和 (a) 也一样, 出错了直接通过返回值返回:
msg_send 可能调用msg_send_chain 和rsp_send_next.
rsp_send_next:
if (req_error(conn, pmsg)) { //这里也不会设置conn->err. 所以如果在msg层没有内存, 也会返回一个错误给客户端. 但是如果在rsp_make_error的时候再次没内存, 就只能关闭连接了. msg = rsp_make_error(ctx, conn, pmsg); }
msg_send_chain 可能通过返回值告诉最上层发生了错误 (直接调用conn_sendv):
static rstatus_t msg_send_chain(struct context *ctx, struct conn *conn, struct msg *msg) { conn->smsg = NULL; n = conn_sendv(conn, &sendv, nsend); conn->send_done(ctx, conn, msg); if (n >= 0) { return NC_OK; } return (n == NC_EAGAIN) ? NC_OK : NC_ERROR; } - 这里可能调用rsp_send_done, 单此时已经不能设置什么错误了
因为总的能逻辑就是 core_core 里面的这段逻辑:
status = core_recv(ctx, conn); if (status != NC_OK || conn->done || conn->err) { core_close(ctx, conn); return NC_ERROR; }
在处理请求中, 比如读msg的时候, 发现没有内存了, 或者解析出错了, 有两个方法返回错误:
如果在forward到后端时, 后端连接失败, 则可以通过 req_forward_error 函数, 设置 msg->err, 这时在 rsp_send_next 中会用rsp_make_error生成一个err response返回给客户端.
static rstatus_t msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg) { ... nmsg = msg_get(msg->owner, msg->request, conn->redis); if (nmsg == NULL) { mbuf_put(nbuf); return NC_ENOMEM; }
msg->ferror #是否有frag错误. msg->error #错误标志, 0/1 msg->err = errno; #errno.
比如memcache_pre_coalesce:
void redis_pre_coalesce(struct msg *r) { pr->frag_owner->nfrag_done++; switch (r->type) { case MSG_RSP_REDIS_INTEGER: xxx; case MSG_RSP_REDIS_MULTIBULK: if (pr->first_fragment) { mbuf = mbuf_get(); if (mbuf == NULL) { pr->error = 1; pr->err = EINVAL; return; } STAILQ_INSERT_HEAD(&r->mhdr, mbuf, next); } case MSG_RSP_REDIS_STATUS: xxx; default: mbuf = STAILQ_FIRST(&r->mhdr); log_hexdump(LOG_ERR, mbuf->pos, mbuf_length(mbuf), "rsp fragment " "with unknown type %d", r->type); pr->error = 1; pr->err = EINVAL; //这里都只是设置了msg->err. break; } }
void redis_pre_coalesce(struct msg *r) { if (pr->first_fragment) { mbuf = mbuf_get(); if (mbuf == NULL) { pr->error = 1; pr->err = EINVAL; return; } STAILQ_INSERT_HEAD(&r->mhdr, mbuf, next); } ... }
从前的msg_fragment是在msg_parsed里面, 还能通过return 返回错误. 因为作者认为在msg_parsed 之后, 消息必然在recv_done里面传给后端了, 不会出错了. 所以recv_done等函数都是void函数:
static rstatus_t msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg) { conn->recv_done(ctx, conn, msg, nmsg); return NC_OK; }
TODO 3: 我的fragement也得放到这里的msg_parsed里面, 否则的话, 就得修改 recv_done的返回值了, 这个动作更大些.