Table of Contents
icomet是ideawu的作品, 基于libevent 的evhttp框架做的comet服务.
https://github.com/ideawu/icomet
ning@ning-laptop:~/test/icomet/src$ find . -name '*.cpp' | xargs cat | wc -l 2116 ning@ning-laptop:~/test/icomet/src$ find . -name '*.h' | xargs cat | wc -l 982
支持名字空间的config
注释里面说是是c的cfg这样:
struct config *cfg, *c; cfg = cfg_load_file("cfg_test.conf"); if(!cfg){ return 0; } printf("proxy.php.host = %s\n", cfg_getstr(cfg, "proxy.php.host")); printf("proxy.php.port = %d\n", cfg_getnum(cfg, "proxy.php.port"));
实际上, 格式较怪异:
# 有效行以 \t* 开头 proxy : php = host = 127.0.0.1 port = 8088 py : host = 127.0.0.1 port = 8080
实际用起来还是有点费劲:
{ Config *cc = (Config *)conf->get("admin"); if(cc != NULL){ std::vector<Config *> *children = &cc->children; std::vector<Config *>::iterator it; for(it = children->begin(); it != children->end(); it++){ if((*it)->key != "allow"){ continue; } const char *ip = (*it)->str(); log_info(" allow %s", ip); ip_filter->add_allow(ip); } } }
和自己习惯不一样:
static const int LEVEL_NONE = (-1); static const int LEVEL_MIN = 0; static const int LEVEL_FATAL = 0; static const int LEVEL_ERROR = 1; static const int LEVEL_WARN = 2; static const int LEVEL_INFO = 3; static const int LEVEL_DEBUG = 4; static const int LEVEL_TRACE = 5; static const int LEVEL_MAX = 5;
支持按照文件大小切日志-能按小时切最好
可以设置写的时候加锁:
if(this->mutex){ pthread_mutex_lock(this->mutex); } fwrite(buf, len, 1, this->fp); fflush(this->fp); stats.w_curr += len; stats.w_total += len; if(rotate_size > 0 && stats.w_curr > rotate_size){ this->rotate(); } if(this->mutex){ pthread_mutex_unlock(this->mutex); }
因为 这里用的是 FILE *, 不能直接append.
daemon.h file.h ip_filter.h 黑白名单, list.h 链表 c++还需要这个么?:
template <class T> class LinkedList{
strings.h 提供trim(), hexmem() parse_ip_port() 函数 objpool.h 因为用范型, 所以很有用的对象池, 可惜c用不了, icomet里面没用.
Server *serv = NULL; void pub_handler(struct evhttp_request *req, void *arg){ CHECK_AUTH(); serv->pub(req, true); } void timer_cb(evutil_socket_t sig, short events, void *user_data){ //每秒一次. rand(); serv->check_timeout(); } int main(int argc, char **argv){ //读取配置 serv = new Server(); // /pub?cname=abc&content=hi evhttp_set_cb(admin_http, "/pub", pub_handler, NULL); // /sub?cname=abc&cb=jsonp&token=&seq=123&noop=123 evhttp_set_cb(front_http, "/sub", poll_handler, NULL); }
为啥把pub 放在admin_http上
used_channels 所有已用channel链表 cname_channels 每个channel有一个名字, 用map存name->channel的关系.
Channel* Server::new_channel(const std::string &cname){ if(used_channels.size >= ServerConfig::max_channels){ return NULL; } log_debug("new channel: %s", cname.c_str()); Channel *channel = new Channel(); channel->serv = this; channel->name = cname; channel->create_token(); add_presence(PresenceOnline, channel->name); used_channels.push_back(channel); cname_channels[channel->name] = channel; return channel; }
check_timeout 每秒对每个channel上的每个sub, 检查是否过了一段时间的idle, 如果是idle, 就回复这个sub没有更新(noop()方法):
int Server::check_timeout(){ //log_debug("<"); LinkedList<Channel *>::Iterator it = used_channels.iterator(); while(Channel *channel = it.next()){ if(channel->subs.size == 0){ if(--channel->idle < 0){ this->free_channel(channel); } continue; } if(channel->idle < ServerConfig::channel_idles){ channel->idle = ServerConfig::channel_idles; } LinkedList<Subscriber *>::Iterator it2 = channel->subs.iterator(); while(Subscriber *sub = it2.next()){ if(++sub->idle <= ServerConfig::polling_idles){ continue; } sub->idle = 0; sub->noop(); } } //log_debug(">"); return 0; }
这个方法效率极低.
int Server::pub(struct evhttp_request *req, bool encoded){ channel = this->get_channel_by_name(cname); // response to publisher evhttp_add_header(req->output_headers, "Content-Type", "text/javascript; charset=utf-8"); struct evbuffer *buf = evbuffer_new(); evbuffer_add_printf(buf, "{\"type\":\"ok\"}"); evhttp_send_reply(req, 200, "OK", buf); evbuffer_free(buf); // push to subscribers channel->send("data", content, encoded); return 0; }
int Server::sub(struct evhttp_request *req, Subscriber::Type sub_type){ HttpQuery query(req); int seq = query.get_int("seq", 0); int noop = query.get_int("noop", 0); const char *cb = query.get_str("cb", ""); const char *token = query.get_str("token", ""); std::string cname = query.get_str("cname", ""); Channel *channel = this->get_channel_by_name(cname); ... //check Subscriber *sub = new Subscriber(); sub->req = req; sub->type = sub_type; sub->idle = 0; sub->seq_next = seq; sub->seq_noop = noop; sub->callback = cb; channel->add_subscriber(sub); //添加进去. subscribers ++; sub->start(); return 0; }
class Channel{ LinkedList<Subscriber *> subs; int idle; int seq_next; std::string name; std::string token; std::vector<std::string> msg_list; //用一个vector保存msg void add_subscriber(Subscriber *sub); void send(const char *type, const char *content, bool encoded=true); }
这是pub逻辑,
void Channel::send(const char *type, const char *content, bool encoded){ ... msg_list.push_back(content); seq_next ++; if(msg_list.size() >= ServerConfig::channel_buffer_size * 1.5){ std::vector<std::string>::iterator it; it = msg_list.end() - ServerConfig::channel_buffer_size; msg_list.assign(it, msg_list.end()); log_trace("resize msg_list to %d, seq_next: %d", msg_list.size(), seq_next); } LinkedList<Subscriber *>::Iterator it = subs.iterator(); while(Subscriber *sub = it.next()){ sub->send_chunk(this->seq_next, type, new_content.c_str()); } }
sub->send_chunk 是决定向sub端发数据, 前面 serv->check_timeout() 里面sub->noop() 是决定向sub端回复说 没有消息
evhttp_add_header(req->output_headers, "Connection", "keep-alive"); evhttp_send_reply_chunk(this->req, buf);
evhttp_send_reply_chunk, 就是用Chunked方式回复数据.
消息存在内存中, 不能持久化.
一个channel如果一直活跃, 消息就不会删除, 越堆越多?(一个channel有个最大消息量):
if(msg_list.size() >= ServerConfig::channel_buffer_size * 1.5){ std::vector<std::string>::iterator it; it = msg_list.end() - ServerConfig::channel_buffer_size; msg_list.assign(it, msg_list.end()); log_trace("resize msg_list to %d, seq_next: %d", msg_list.size(), seq_next); }
有channel timeout, 没有msg timeout
ideawu 提供了一个 c1000k 代码:
https://github.com/ideawu/c1000k
要测试c1000k 主要两个问题:
是server 端, 每个sokcet 大约需要占用0.5M内存(内核sokcet缓冲区内存), 所以总共需要500G内存.
ideawu是这样解决的: 1. server 端每accept一个连接, 都设置它的缓冲区大小为5k:
bufsize = 5000; setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)); setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)); 这样1000k连接只需要10G内存.
server 端开100个port, 这样每个客户端3w端口, 就可以开300w个连接.
这时候, 限制就只有fd个数限制了.
我启动的时候设置好limit就可以
$ limit -n 1024000 ./server 6000 $ limit -n 1024000 ./client 127.0.0.1 6000 可以一直跑到: connections: 1023897 error: Too many open files