Table of Contents
错误处理是一个系统里面重要而容易忽略的环节, 我们读代码比写代码简单, 很大程度上就是因为读代码时基本不用考虑错误处理逻辑.
twemproxy中, 主要的错误有:
两种错误处理:
我们先看conn上的几个回调
client:
conn->recv = msg_recv;
conn->recv_next = req_recv_next;
conn->recv_done = req_recv_done;
conn->send = msg_send;
conn->send_next = rsp_send_next;
conn->send_done = rsp_send_done;
conn->close = client_close;
conn->active = client_active;
server:
conn->recv = msg_recv;
conn->recv_next = rsp_recv_next;
conn->recv_done = rsp_recv_done;
conn->send = msg_send;
conn->send_next = req_send_next;
conn->send_done = req_send_done;
conn->close = server_close;
conn->active = server_active;
最外层, 如果core_recv/core_send返回 非NC_Ok, 或者设置了 conn->done , conn->err , 直接关连接:
rstatus_t
core_core(void *arg, uint32_t events)
{
if (events & EVENT_ERR) {
core_error(ctx, conn);
return NC_ERROR;
}
if (events & EVENT_READ) {
status = core_recv(ctx, conn);
if (status != NC_OK || conn->done || conn->err) {
core_close(ctx, conn);
return NC_ERROR;
}
}
if (events & EVENT_WRITE) {
status = core_send(ctx, conn);
if (status != NC_OK || conn->done || conn->err) {
core_close(ctx, conn);
return NC_ERROR;
}
}
return NC_OK;
}
这里:
client_close_stats
conn->unref (暂时不管)
如果 conn->rmsg 存在, 说明连接的读缓冲区有内容未处理, 把这个rmsg 丢掉
conn->smsg, conn->imsg_q 应该是为空的 conn 用rmsg做读缓冲, 读到了就放到server_conn->omsg_q 里面, 所以clietn_conn->smsg没用, client_conn->imsg_q 也是完全没用 参考 |filename|/notes/redis/twemproxy.rst
如果 conn->omsg_q 里面有消息, 这些消息已经发给后端了, 正在等待后端响应:
for (msg = TAILQ_FIRST(&conn->omsg_q); msg != NULL; msg = nmsg) {
nmsg = TAILQ_NEXT(msg, c_tqe);
conn->dequeue_outq(ctx, conn, msg);
if (msg->done) {
//日志: 这个消息的rsp被我丢了.
req_put(msg);
} else {
msg->swallow = 1;
ASSERT(msg->request);
ASSERT(msg->peer == NULL); //响应还没回来
//日志: 这个消息被我标记为 swallow
log_debug(LOG_INFO, "close c %d schedule swallow of req %"PRIu64" "
"len %"PRIu32" type %d", conn->sd, msg->id, msg->mlen,
msg->type);
}
}
至于 swallow, 英语意思是 吞没, 只有在上面这种情况下, 才会设置swallow标志, 对于这样的消息, 它的rsp回来之后, 就不应该转给client了, 而直接丢掉就行:
static bool
rsp_filter(struct context *ctx, struct conn *conn, struct msg *msg)
{
pmsg = TAILQ_FIRST(&conn->omsg_q); //这是req, 至于为什么用 TAILQ_FIRST, 此时还没有设置msg->peer.
ASSERT(pmsg->peer == NULL);
ASSERT(pmsg->request && !pmsg->done);
if (pmsg->swallow) {
conn->dequeue_outq(ctx, conn, pmsg);
pmsg->done = 1;
log_debug(LOG_INFO, "swallow rsp %"PRIu64" len %"PRIu32" of req "
"%"PRIu64" on s %d", msg->id, msg->mlen, pmsg->id,
conn->sd);
rsp_put(msg);
req_put(pmsg);
return true;
}
}
另外在server_close的时候, 对server_conn->imsg_q 和server_conn->omsg_q, 如果有swallow req, 也直接丢掉(见server_close).
server_close_stats
conn->unref(conn)
把server_conn中 的imsg_q 和omsg_q 中的消息 设置
msg->done = 1 msg->error = 1 msg->err = conn->err;
检查当前 c_conn上有没有完成的req, 给返回(因为这里对一些req设置了msg->done, 所以有可能有完成的req):
if (raeq_done(c_conn, TAILQ_FIRST(&c_conn->omsg_q))) {
event_add_out(ctx->evb, msg->owner);
}
参看这个神图:
*
* Client+ Proxy Server+
* (nutcracker)
* .
* msg_recv {read event} . msg_recv {read event}
* + . +
* | . |
* \ . /
* req_recv_next . rsp_recv_next
* + . +
* | . | Rsp
* req_recv_done . rsp_recv_done <===
* + . +
* | . |
* Req \ . /
* ===> req_filter* . *rsp_filter
* + . +
* | . |
* \ . /
* req_forward-// (a) . (c) \\-rsp_forward
* .
* .
* msg_send {write event} . msg_send {write event}
* + . +
* | . |
* Rsp' \ . / Req'
* <=== rsp_send_next . req_send_next ===>
* + . +
* | . |
* \ . /
* rsp_send_done-// (d) . (b) //-req_send_done
*
*
* (a) -> (b) -> (c) -> (d) is the normal flow of transaction consisting
* of a single request response, where (a) and (b) handle request from
* client, while (c) and (d) handle the corresponding response from the
* server.
我们看core_recv 的调用链, 可能从什么地方返回这个错误码.
core_recv()
conn->recv(), 即msg_recv
如果解析到一条消息(MSG_PARSE_OK), 返回msg_parsed()
如果parser出错, 函数里面同时设置 conn->err, 并且返回 NC_ERROR
static rstatus_t
msg_parse(struct context *ctx, struct conn *conn, struct msg *msg)
{
msg->parser(msg);
switch (msg->result) {
case MSG_PARSE_OK:
status = msg_parsed(ctx, conn, msg);
break;
case MSG_PARSE_REPAIR:
status = msg_repair(ctx, conn, msg);
break;
case MSG_PARSE_AGAIN:
status = NC_OK;
break;
default:
status = NC_ERROR;
conn->err = errno;
break;
}
return conn->err != 0 ? NC_ERROR : status;
}
所以 msg->parser() 函数里面, 如果要返回错误, 是通过msg->result设置为一个错误码做的:
r->result = MSG_PARSE_ERROR;
接下来的重点是 msg_parsed() , 他的返回值也会直接返回到core_core:
static rstatus_t
msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg)
{
conn->recv_done(ctx, conn, msg, nmsg);
return NC_OK;
}
这里挺重要, 这个函数虽然调用了conn->recv_done, 但是直接返回NC_OK, 而没有管conn->recv_done的返回值, 实际上, conn->recv_done() 没有返回值::
conn_recv_done_t recv_done; /* read done handler */
typedef void (*conn_recv_done_t)(struct context *, struct conn *, struct msg *, struct msg *);
到这里return链断了, 下面只能通过设置conn->err来表示错误 .
req_forward 如果出错, 是通过 req_forward_error 来告诉客户端的:
static void
req_forward(struct context *ctx, struct conn *c_conn, struct msg *msg)
{
s_conn = server_pool_conn(ctx, c_conn->owner, key, keylen);
if (s_conn == NULL) {
req_forward_error(ctx, c_conn, msg); //这里并不会设置c_conn->err, 这时如果设置c_conn->err, 就会直接关连接,
//这里的处理方法是: 设置msg->err, 于是后面会构造一个ERR rsp返回给客户端, 这并不算一个连接错误.
return;
}
c_conn->enqueue_outq(ctx, c_conn, msg); //简单队列操作.
s_conn->enqueue_inq(ctx, s_conn, msg);
if (TAILQ_EMPTY(&s_conn->imsg_q)) {
status = event_add_out(ctx->evb, s_conn);
if (status != NC_OK) {
req_forward_error(ctx, c_conn, msg); //这里会告诉客户端出错.
s_conn->err = errno; //这里设置 s_conn->err, 关掉后端连接(什么时候处理 这里的s_conn->err????? 还有机会有事件么?)
return;
}
}
req_forward_error 需要标记msg->err, 此时消息未被转发到后端, 此时为了告诉客户端出错, 需要:
- 在c_conn上开始 ev_out 事件
- 可写时, 构造一个ERR rsp返回给客户端.
具体过程如下:
static void
req_forward_error(struct context *ctx, struct conn *conn, struct msg *msg)
{
rstatus_t status;
ASSERT(conn->client && !conn->proxy);
msg->done = 1;
msg->error = 1;
msg->err = errno; //(如果内存分配失败, errno会被设为12, 对应错误消息'Cannot allocate memory')
if (req_done(conn, TAILQ_FIRST(&conn->omsg_q))) {
status = event_add_out(ctx->evb, conn);
if (status != NC_OK) {
conn->err = errno;
}
}
}
到这里, 这个cliet->proxy 的过程所有的出错路径都排查了.
这是另一个recv路径, 前半部分和(a)路径基本一样:
core_recv()
conn->recv(), 即msg_recv
msg_recv_chain(),
msg_parse(), 调用msg->parser(msg),
msg_parsed()
到这里return链断了, 下面只能通过设置conn->err来表示错误 .
rsp_recv_done() 和rsp_forward():
static void
rsp_forward(struct context *ctx, struct conn *s_conn, struct msg *msg)
{
pmsg->peer = msg;
msg->peer = pmsg;
msg->pre_coalesce(msg); //void函数.
c_conn = pmsg->owner;
ASSERT(c_conn->client && !c_conn->proxy);
if (req_done(c_conn, TAILQ_FIRST(&c_conn->omsg_q))) { //这里调用req_done.
status = event_add_out(ctx->evb, c_conn);
if (status != NC_OK) {
c_conn->err = errno;
}
}
}
这里rsp_forward比req_forward复杂, req_forward只是简单的dequeue_outq, enqueue_inq, event_add_out, 并没有太多的寄回设置conn->err.
rsp_forward则可能在pre_coalesce(), req_done(), post_coalesce() 函数里面设置.
pre_coalesce():
void
redis_pre_coalesce(struct msg *r)
{
pr->frag_owner->nfrag_done++;
switch (r->type) {
case MSG_RSP_REDIS_INTEGER:
xxx;
case MSG_RSP_REDIS_MULTIBULK:
xxx;
case MSG_RSP_REDIS_STATUS:
xxx;
default:
mbuf = STAILQ_FIRST(&r->mhdr);
log_hexdump(LOG_ERR, mbuf->pos, mbuf_length(mbuf), "rsp fragment "
"with unknown type %d", r->type);
pr->error = 1;
pr->err = EINVAL; //这里都只是设置了msg->err.
break;
}
}
在mget-improve之前, req_done() post_coalesce 都不会做什么设置. TODO: 这里的post_coalesce我需要处理错误, 处理方法应该类似pre_coalesce, 设置msg->error, msg->err
接(c), rsp_forward 会在c_conn上开启out事件:
status = event_add_out(ctx->evb, c_conn);
当c_conn可写时, 会触发 msg_send->rsp_send_next->rsp_send_done 路径.
这里前几步和 (a) 也一样, 出错了直接通过返回值返回:
msg_send 可能调用msg_send_chain 和rsp_send_next.
rsp_send_next:
if (req_error(conn, pmsg)) { //这里也不会设置conn->err. 所以如果在msg层没有内存, 也会返回一个错误给客户端. 但是如果在rsp_make_error的时候再次没内存, 就只能关闭连接了.
msg = rsp_make_error(ctx, conn, pmsg);
}
msg_send_chain 可能通过返回值告诉最上层发生了错误 (直接调用conn_sendv):
static rstatus_t
msg_send_chain(struct context *ctx, struct conn *conn, struct msg *msg)
{
conn->smsg = NULL;
n = conn_sendv(conn, &sendv, nsend);
conn->send_done(ctx, conn, msg);
if (n >= 0) {
return NC_OK;
}
return (n == NC_EAGAIN) ? NC_OK : NC_ERROR;
}
- 这里可能调用rsp_send_done, 单此时已经不能设置什么错误了
因为总的能逻辑就是 core_core 里面的这段逻辑:
status = core_recv(ctx, conn);
if (status != NC_OK || conn->done || conn->err) {
core_close(ctx, conn);
return NC_ERROR;
}
在处理请求中, 比如读msg的时候, 发现没有内存了, 或者解析出错了, 有两个方法返回错误:
如果在forward到后端时, 后端连接失败, 则可以通过 req_forward_error 函数, 设置 msg->err, 这时在 rsp_send_next 中会用rsp_make_error生成一个err response返回给客户端.
static rstatus_t
msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg)
{
...
nmsg = msg_get(msg->owner, msg->request, conn->redis);
if (nmsg == NULL) {
mbuf_put(nbuf);
return NC_ENOMEM;
}
msg->ferror #是否有frag错误. msg->error #错误标志, 0/1 msg->err = errno; #errno.
比如memcache_pre_coalesce:
void
redis_pre_coalesce(struct msg *r)
{
pr->frag_owner->nfrag_done++;
switch (r->type) {
case MSG_RSP_REDIS_INTEGER:
xxx;
case MSG_RSP_REDIS_MULTIBULK:
if (pr->first_fragment) {
mbuf = mbuf_get();
if (mbuf == NULL) {
pr->error = 1;
pr->err = EINVAL;
return;
}
STAILQ_INSERT_HEAD(&r->mhdr, mbuf, next);
}
case MSG_RSP_REDIS_STATUS:
xxx;
default:
mbuf = STAILQ_FIRST(&r->mhdr);
log_hexdump(LOG_ERR, mbuf->pos, mbuf_length(mbuf), "rsp fragment "
"with unknown type %d", r->type);
pr->error = 1;
pr->err = EINVAL; //这里都只是设置了msg->err.
break;
}
}
void
redis_pre_coalesce(struct msg *r)
{
if (pr->first_fragment) {
mbuf = mbuf_get();
if (mbuf == NULL) {
pr->error = 1;
pr->err = EINVAL;
return;
}
STAILQ_INSERT_HEAD(&r->mhdr, mbuf, next);
}
...
}
从前的msg_fragment是在msg_parsed里面, 还能通过return 返回错误. 因为作者认为在msg_parsed 之后, 消息必然在recv_done里面传给后端了, 不会出错了. 所以recv_done等函数都是void函数:
static rstatus_t
msg_parsed(struct context *ctx, struct conn *conn, struct msg *msg)
{
conn->recv_done(ctx, conn, msg, nmsg);
return NC_OK;
}
TODO 3: 我的fragement也得放到这里的msg_parsed里面, 否则的话, 就得修改 recv_done的返回值了, 这个动作更大些.