Table of Contents
icomet是ideawu的作品, 基于libevent 的evhttp框架做的comet服务.
https://github.com/ideawu/icomet
ning@ning-laptop:~/test/icomet/src$ find . -name '*.cpp' | xargs cat | wc -l 2116 ning@ning-laptop:~/test/icomet/src$ find . -name '*.h' | xargs cat | wc -l 982
支持名字空间的config
注释里面说是是c的cfg这样:
struct config *cfg, *c;
cfg = cfg_load_file("cfg_test.conf");
if(!cfg){
return 0;
}
printf("proxy.php.host = %s\n", cfg_getstr(cfg, "proxy.php.host"));
printf("proxy.php.port = %d\n", cfg_getnum(cfg, "proxy.php.port"));
实际上, 格式较怪异:
# 有效行以 \t* 开头
proxy :
php =
host = 127.0.0.1
port = 8088
py :
host = 127.0.0.1
port = 8080
实际用起来还是有点费劲:
{
Config *cc = (Config *)conf->get("admin");
if(cc != NULL){
std::vector<Config *> *children = &cc->children;
std::vector<Config *>::iterator it;
for(it = children->begin(); it != children->end(); it++){
if((*it)->key != "allow"){
continue;
}
const char *ip = (*it)->str();
log_info(" allow %s", ip);
ip_filter->add_allow(ip);
}
}
}
和自己习惯不一样:
static const int LEVEL_NONE = (-1); static const int LEVEL_MIN = 0; static const int LEVEL_FATAL = 0; static const int LEVEL_ERROR = 1; static const int LEVEL_WARN = 2; static const int LEVEL_INFO = 3; static const int LEVEL_DEBUG = 4; static const int LEVEL_TRACE = 5; static const int LEVEL_MAX = 5;
支持按照文件大小切日志-能按小时切最好
可以设置写的时候加锁:
if(this->mutex){
pthread_mutex_lock(this->mutex);
}
fwrite(buf, len, 1, this->fp);
fflush(this->fp);
stats.w_curr += len;
stats.w_total += len;
if(rotate_size > 0 && stats.w_curr > rotate_size){
this->rotate();
}
if(this->mutex){
pthread_mutex_unlock(this->mutex);
}
因为 这里用的是 FILE *, 不能直接append.
daemon.h file.h ip_filter.h 黑白名单, list.h 链表 c++还需要这个么?:
template <class T>
class LinkedList{
strings.h 提供trim(), hexmem() parse_ip_port() 函数 objpool.h 因为用范型, 所以很有用的对象池, 可惜c用不了, icomet里面没用.
Server *serv = NULL;
void pub_handler(struct evhttp_request *req, void *arg){
CHECK_AUTH();
serv->pub(req, true);
}
void timer_cb(evutil_socket_t sig, short events, void *user_data){ //每秒一次.
rand();
serv->check_timeout();
}
int main(int argc, char **argv){
//读取配置
serv = new Server();
// /pub?cname=abc&content=hi
evhttp_set_cb(admin_http, "/pub", pub_handler, NULL);
// /sub?cname=abc&cb=jsonp&token=&seq=123&noop=123
evhttp_set_cb(front_http, "/sub", poll_handler, NULL);
}
为啥把pub 放在admin_http上
used_channels 所有已用channel链表 cname_channels 每个channel有一个名字, 用map存name->channel的关系.
Channel* Server::new_channel(const std::string &cname){
if(used_channels.size >= ServerConfig::max_channels){
return NULL;
}
log_debug("new channel: %s", cname.c_str());
Channel *channel = new Channel();
channel->serv = this;
channel->name = cname;
channel->create_token();
add_presence(PresenceOnline, channel->name);
used_channels.push_back(channel);
cname_channels[channel->name] = channel;
return channel;
}
check_timeout 每秒对每个channel上的每个sub, 检查是否过了一段时间的idle, 如果是idle, 就回复这个sub没有更新(noop()方法):
int Server::check_timeout(){
//log_debug("<");
LinkedList<Channel *>::Iterator it = used_channels.iterator();
while(Channel *channel = it.next()){
if(channel->subs.size == 0){
if(--channel->idle < 0){
this->free_channel(channel);
}
continue;
}
if(channel->idle < ServerConfig::channel_idles){
channel->idle = ServerConfig::channel_idles;
}
LinkedList<Subscriber *>::Iterator it2 = channel->subs.iterator();
while(Subscriber *sub = it2.next()){
if(++sub->idle <= ServerConfig::polling_idles){
continue;
}
sub->idle = 0;
sub->noop();
}
}
//log_debug(">");
return 0;
}
这个方法效率极低.
int Server::pub(struct evhttp_request *req, bool encoded){
channel = this->get_channel_by_name(cname);
// response to publisher
evhttp_add_header(req->output_headers, "Content-Type", "text/javascript; charset=utf-8");
struct evbuffer *buf = evbuffer_new();
evbuffer_add_printf(buf, "{\"type\":\"ok\"}");
evhttp_send_reply(req, 200, "OK", buf);
evbuffer_free(buf);
// push to subscribers
channel->send("data", content, encoded);
return 0;
}
int Server::sub(struct evhttp_request *req, Subscriber::Type sub_type){
HttpQuery query(req);
int seq = query.get_int("seq", 0);
int noop = query.get_int("noop", 0);
const char *cb = query.get_str("cb", "");
const char *token = query.get_str("token", "");
std::string cname = query.get_str("cname", "");
Channel *channel = this->get_channel_by_name(cname);
... //check
Subscriber *sub = new Subscriber();
sub->req = req;
sub->type = sub_type;
sub->idle = 0;
sub->seq_next = seq;
sub->seq_noop = noop;
sub->callback = cb;
channel->add_subscriber(sub); //添加进去.
subscribers ++;
sub->start();
return 0;
}
class Channel{
LinkedList<Subscriber *> subs;
int idle;
int seq_next;
std::string name;
std::string token;
std::vector<std::string> msg_list; //用一个vector保存msg
void add_subscriber(Subscriber *sub);
void send(const char *type, const char *content, bool encoded=true);
}
这是pub逻辑,
void Channel::send(const char *type, const char *content, bool encoded){
...
msg_list.push_back(content);
seq_next ++;
if(msg_list.size() >= ServerConfig::channel_buffer_size * 1.5){
std::vector<std::string>::iterator it;
it = msg_list.end() - ServerConfig::channel_buffer_size;
msg_list.assign(it, msg_list.end());
log_trace("resize msg_list to %d, seq_next: %d", msg_list.size(), seq_next);
}
LinkedList<Subscriber *>::Iterator it = subs.iterator();
while(Subscriber *sub = it.next()){
sub->send_chunk(this->seq_next, type, new_content.c_str());
}
}
sub->send_chunk 是决定向sub端发数据, 前面 serv->check_timeout() 里面sub->noop() 是决定向sub端回复说 没有消息
evhttp_add_header(req->output_headers, "Connection", "keep-alive"); evhttp_send_reply_chunk(this->req, buf);
evhttp_send_reply_chunk, 就是用Chunked方式回复数据.
消息存在内存中, 不能持久化.
一个channel如果一直活跃, 消息就不会删除, 越堆越多?(一个channel有个最大消息量):
if(msg_list.size() >= ServerConfig::channel_buffer_size * 1.5){
std::vector<std::string>::iterator it;
it = msg_list.end() - ServerConfig::channel_buffer_size;
msg_list.assign(it, msg_list.end());
log_trace("resize msg_list to %d, seq_next: %d", msg_list.size(), seq_next);
}
有channel timeout, 没有msg timeout
ideawu 提供了一个 c1000k 代码:
https://github.com/ideawu/c1000k
要测试c1000k 主要两个问题:
是server 端, 每个sokcet 大约需要占用0.5M内存(内核sokcet缓冲区内存), 所以总共需要500G内存.
ideawu是这样解决的: 1. server 端每accept一个连接, 都设置它的缓冲区大小为5k:
bufsize = 5000;
setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize));
setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize));
这样1000k连接只需要10G内存.
server 端开100个port, 这样每个客户端3w端口, 就可以开300w个连接.
这时候, 限制就只有fd个数限制了.
我启动的时候设置好limit就可以
$ limit -n 1024000 ./server 6000 $ limit -n 1024000 ./client 127.0.0.1 6000 可以一直跑到: connections: 1023897 error: Too many open files