Permalink: 2014-04-23 16:18:23 by ning in misc tags: all

icomet是ideawu的作品, 基于libevent 的evhttp框架做的comet服务.

https://github.com/ideawu/icomet

1   代码量

ning@ning-laptop:~/test/icomet/src$ find . -name '*.cpp' | xargs cat | wc -l
2116
ning@ning-laptop:~/test/icomet/src$ find . -name '*.h' | xargs cat | wc -l
982

2   utils

2.1   config

  • 支持名字空间的config

  • 支持保存到文件(dump)
    • 注释也会读入, 并记录行号, dump的时候可以保留注释, 这不错.
  • 是否支持默认值和覆盖?
    • 不支持.
  • 数组支持, 需要解析的时候特别解析:
    • allow xx
    • allow xxx

注释里面说是是c的cfg这样:

struct config *cfg, *c;

cfg = cfg_load_file("cfg_test.conf");
if(!cfg){
    return 0;
}
printf("proxy.php.host = %s\n", cfg_getstr(cfg, "proxy.php.host"));
printf("proxy.php.port = %d\n", cfg_getnum(cfg, "proxy.php.port"));

实际上, 格式较怪异:

# 有效行以 \t* 开头
proxy :
    php =
        host = 127.0.0.1
        port = 8088
    py :
        host = 127.0.0.1
        port = 8080

实际用起来还是有点费劲:

{
    Config *cc = (Config *)conf->get("admin");
    if(cc != NULL){
        std::vector<Config *> *children = &cc->children;
        std::vector<Config *>::iterator it;
        for(it = children->begin(); it != children->end(); it++){
            if((*it)->key != "allow"){
                continue;
            }
            const char *ip = (*it)->str();
            log_info("    allow %s", ip);
            ip_filter->add_allow(ip);
        }
    }
}

2.2   log

和自己习惯不一样:

static const int LEVEL_NONE     = (-1);
static const int LEVEL_MIN      = 0;
static const int LEVEL_FATAL    = 0;
static const int LEVEL_ERROR    = 1;
static const int LEVEL_WARN     = 2;
static const int LEVEL_INFO     = 3;
static const int LEVEL_DEBUG    = 4;
static const int LEVEL_TRACE    = 5;
static const int LEVEL_MAX      = 5;

支持按照文件大小切日志-能按小时切最好

可以设置写的时候加锁:

if(this->mutex){
    pthread_mutex_lock(this->mutex);
}
fwrite(buf, len, 1, this->fp);
fflush(this->fp);

stats.w_curr += len;
stats.w_total += len;
if(rotate_size > 0 && stats.w_curr > rotate_size){
    this->rotate();
}
if(this->mutex){
    pthread_mutex_unlock(this->mutex);
}

因为 这里用的是 FILE *, 不能直接append.

2.3   其它

daemon.h file.h ip_filter.h 黑白名单, list.h 链表 c++还需要这个么?:

template <class T>
class LinkedList{

strings.h 提供trim(), hexmem() parse_ip_port() 函数 objpool.h 因为用范型, 所以很有用的对象池, 可惜c用不了, icomet里面没用.

3   comet逻辑

3.1   comet-server.cpp main()

Server *serv = NULL;
void pub_handler(struct evhttp_request *req, void *arg){
    CHECK_AUTH();
    serv->pub(req, true);
}

void timer_cb(evutil_socket_t sig, short events, void *user_data){ //每秒一次.
    rand();
    serv->check_timeout();
}

int main(int argc, char **argv){
    //读取配置
    serv = new Server();
    // /pub?cname=abc&content=hi
    evhttp_set_cb(admin_http, "/pub", pub_handler, NULL);

    // /sub?cname=abc&cb=jsonp&token=&seq=123&noop=123
    evhttp_set_cb(front_http, "/sub", poll_handler, NULL);

}

为啥把pub 放在admin_http上

3.2   server.cpp server.h

used_channels 所有已用channel链表 cname_channels 每个channel有一个名字, 用map存name->channel的关系.

Channel* Server::new_channel(const std::string &cname){
    if(used_channels.size >= ServerConfig::max_channels){
        return NULL;
    }
    log_debug("new channel: %s", cname.c_str());

    Channel *channel = new Channel();
    channel->serv = this;
    channel->name = cname;
    channel->create_token();

    add_presence(PresenceOnline, channel->name);

    used_channels.push_back(channel);
    cname_channels[channel->name] = channel;

    return channel;
}

check_timeout 每秒对每个channel上的每个sub, 检查是否过了一段时间的idle, 如果是idle, 就回复这个sub没有更新(noop()方法):

int Server::check_timeout(){
    //log_debug("<");
    LinkedList<Channel *>::Iterator it = used_channels.iterator();
    while(Channel *channel = it.next()){
        if(channel->subs.size == 0){
            if(--channel->idle < 0){
                this->free_channel(channel);
            }
            continue;
        }
        if(channel->idle < ServerConfig::channel_idles){
            channel->idle = ServerConfig::channel_idles;
        }

        LinkedList<Subscriber *>::Iterator it2 = channel->subs.iterator();
        while(Subscriber *sub = it2.next()){
            if(++sub->idle <= ServerConfig::polling_idles){
                continue;
            }
            sub->idle = 0;
            sub->noop();
        }
    }
    //log_debug(">");
    return 0;
}

这个方法效率极低.

3.2.1   pub

int Server::pub(struct evhttp_request *req, bool encoded){
    channel = this->get_channel_by_name(cname);
    // response to publisher
    evhttp_add_header(req->output_headers, "Content-Type", "text/javascript; charset=utf-8");
    struct evbuffer *buf = evbuffer_new();

    evbuffer_add_printf(buf, "{\"type\":\"ok\"}");

    evhttp_send_reply(req, 200, "OK", buf);
    evbuffer_free(buf);

    // push to subscribers
    channel->send("data", content, encoded);
    return 0;
}

3.2.2   sub

int Server::sub(struct evhttp_request *req, Subscriber::Type sub_type){
    HttpQuery query(req);
    int seq = query.get_int("seq", 0);
    int noop = query.get_int("noop", 0);
    const char *cb = query.get_str("cb", "");
    const char *token = query.get_str("token", "");
    std::string cname = query.get_str("cname", "");

    Channel *channel = this->get_channel_by_name(cname);

    ... //check

    Subscriber *sub = new Subscriber();
    sub->req = req;
    sub->type = sub_type;
    sub->idle = 0;
    sub->seq_next = seq;
    sub->seq_noop = noop;
    sub->callback = cb;

    channel->add_subscriber(sub); //添加进去.
    subscribers ++;
    sub->start();

    return 0;
}

3.3   channel.cpp (pub端)

class Channel{
    LinkedList<Subscriber *> subs;
    int idle;
    int seq_next;
    std::string name;
    std::string token;
    std::vector<std::string> msg_list; //用一个vector保存msg
    void add_subscriber(Subscriber *sub);
    void send(const char *type, const char *content, bool encoded=true);
}

这是pub逻辑,

void Channel::send(const char *type, const char *content, bool encoded){
    ...
    msg_list.push_back(content);
    seq_next ++;
    if(msg_list.size() >= ServerConfig::channel_buffer_size * 1.5){
        std::vector<std::string>::iterator it;
        it = msg_list.end() - ServerConfig::channel_buffer_size;
        msg_list.assign(it, msg_list.end());
        log_trace("resize msg_list to %d, seq_next: %d", msg_list.size(), seq_next);
    }
    LinkedList<Subscriber *>::Iterator it = subs.iterator();
    while(Subscriber *sub = it.next()){
        sub->send_chunk(this->seq_next, type, new_content.c_str());
    }
}

sub->send_chunk 是决定向sub端发数据, 前面 serv->check_timeout() 里面sub->noop() 是决定向sub端回复说 没有消息

3.4   subscriber.cpp subscriber.h

evhttp_add_header(req->output_headers, "Connection", "keep-alive");
evhttp_send_reply_chunk(this->req, buf);

evhttp_send_reply_chunk, 就是用Chunked方式回复数据.

4   问题

  • 消息存在内存中, 不能持久化.

  • 一个channel如果一直活跃, 消息就不会删除, 越堆越多?(一个channel有个最大消息量):

    if(msg_list.size() >= ServerConfig::channel_buffer_size * 1.5){
        std::vector<std::string>::iterator it;
        it = msg_list.end() - ServerConfig::channel_buffer_size;
        msg_list.assign(it, msg_list.end());
        log_trace("resize msg_list to %d, seq_next: %d", msg_list.size(), seq_next);
    }
    
  • 有channel timeout, 没有msg timeout

4.1   1000K 连接的测试.

ideawu 提供了一个 c1000k 代码:

https://github.com/ideawu/c1000k

要测试c1000k 主要两个问题:

  1. 是server 端, 每个sokcet 大约需要占用0.5M内存(内核sokcet缓冲区内存), 所以总共需要500G内存.

  2. client端, client 只能开大约3w-5w个client端口(对每个server ip+port, 只能有3w-5w连接)
    • 相同的四元组只能存在一个.

ideawu是这样解决的: 1. server 端每accept一个连接, 都设置它的缓冲区大小为5k:

     bufsize = 5000;
     setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize));
     setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize));

这样1000k连接只需要10G内存.
  1. server 端开100个port, 这样每个客户端3w端口, 就可以开300w个连接.

  2. 这时候, 限制就只有fd个数限制了.

    我启动的时候设置好limit就可以

    $ limit -n 1024000 ./server 6000
    $ limit -n 1024000 ./client 127.0.0.1 6000
    
    可以一直跑到:
    connections: 1023897
    error: Too many open files
    

5   小结

  • 对于comet, evhttp 应该是最佳选择, 好于nginx.
  • 赞ideawu.
  • 消息不持久化,
  • 单机模式, 如何扩展?
  • 定时timer遍历, 对单进程服务器来说, 应该不可接受.

Comments