Table of Contents
代码量大约9000行:
ning@ning-laptop:~/test/ssdb/src$ cat *.cpp *.h | wc -l 7223 ning@ning-laptop:~/test/ssdb/src$ cat util/*.cpp util/*.h | wc -l 2404
协议解析:
link.cpp link.h link_redis.cpp link_redis.h
数据类型:
t_hash.cpp t_hash.h t_kv.cpp t_kv.h t_queue.cpp t_queue.h t_zset.cpp t_zset.h proc_hash.cpp proc_kv.cpp proc_queue.cpp proc_zset.cpp #ttl 也相当于是一种数据类型. ttl.cpp ttl.h
主从同步:
binlog.cpp binlog.h slave.cpp slave.h #backend_dump.cpp #backend_dump.h backend_sync.cpp backend_sync.h
服务框架:
serv.cpp serv.h ssdb.cpp ssdb.h ssdb-server.cpp
其它:
include.h version.h iterator.cpp iterator.h
和icomet一样:
config.cpp config.h daemon.h file.h ip_filter.h log.cpp log.h strings.h
epoll:
fde.cpp fde_epoll.cpp fde.h fde_select.cpp
bytes.cpp bytes.h Bytes, Buffer 用于实现string的一些操作.
sorted_set.cpp sorted_set.h
SelectableQueue: 提供一个基于管道实现的Queue, 从而使得这个Queue 可以做Select.
class Link{ int sock; bool noblock_; static int min_recv_buf; static int min_send_buf; std::vector<Bytes> recv_data; //recv_data 是解析后的bulk链表. Buffer *input; Buffer *output; double create_time; double active_time; int read(); //从网络读到input int write(); //从output写到网络 int flush(); //调用wirte. const std::vector<Bytes>* recv(); //从input解析到recv_data int send(const Bytes &s1); //写到output int send(const Bytes &s1, const Bytes &s2); int send(const Bytes &s1, const Bytes &s2, const Bytes &s3); ... const std::vector<Bytes>* response(); //通过read() 和recv() 读取response. const std::vector<Bytes>* request(const Bytes &s1); //铜鼓send() 和flush() 写request. const std::vector<Bytes>* request(const Bytes &s1, const Bytes &s2); ... }
这里:
两个函数, 处理对象是input, output:
send_resp(Buffer *output, const std::vector<std::string> &resp) //发送redis格式的响应, resp是一个 string 构成的vector. parse_req(Buffer *input) //读redis格式的请求 recv_req(Buffer *input) //调用parse_req 之后调用convert 转为内部cmd格式.
parse_req 对input的处理是, 如果一次没有解析到一个完整的命令, 那么input buffer不动, 下次继续.
命令表:
#define PROC(c, f) {#c, f, 0, proc_##c, 0, 0, 0} static Command commands[] = { PROC(get, "r"), PROC(set, "wt"), //t表示在线程池里面run ... PROC(dump, "b"), //b表示在后台run }
全局变量:
static proc_map_t proc_map; //启动时初始化 这个map for(Command *cmd=commands; cmd->name; cmd++){ proc_map[cmd->name] = cmd; }
收到请求后, Server::proc函数会在proc_map里面查找
ssdb-server.cpp
全局:
Config *conf = NULL; SSDB *ssdb = NULL; Link *serv_link = NULL; IpFilter *ip_filter = NULL; typedef std::vector<Link *> ready_list_t; volatile bool quit = false; volatile uint32_t g_ticks = 0;
main:
main(){ init(); //load cfg, daemon, listen, init ipfilter, init SSDB, signal run(argc, argv); }
run():
run(){ Fdevents fdes; fdes.set(serv_link->fd(), FDEVENT_IN, 0, serv_link); //监听socket fdes.set(serv.reader->fd(), FDEVENT_IN, 0, serv.reader); //reader.SelectableQueue.fd fdes.set(serv.writer->fd(), FDEVENT_IN, 0, serv.writer); while(!quit){ ready_list.clear(); ready_list_2.clear(); events = fdes.wait(50); for(int i=0; i<(int)events->size(); i++){ if(fde->data.ptr == serv_link){ //do accept }else if(fde->data.ptr == serv.reader || fde->data.ptr == serv.writer){ //从子进程那里收结果, 这里比较复杂 proc_result(job, fdes, ready_list_2); }else{ if(fde->events & FDEVENT_IN){ int len = link->read(); //用read ready_list.push_back(link); //放到ready_list }else if(fde->events & FDEVENT_OUT){ int len = link->write(); //用write ready_list.push_back(link); //放到ready_list } } } for(it = ready_list.begin(); it != ready_list.end(); it ++){ Link *link = *it; const Request *req = link->recv(); ProcJob job; job.link = link; serv.proc(&job); //这里面可能把Job发到一个线程去. if(job.result == PROC_THREAD){ fdes.del(link->fd()); continue; } if(job.result == PROC_BACKEND){ fdes.del(link->fd()); link_count --; continue; } //这里是直接处理的情况. if(proc_result(job, fdes, ready_list_2) == PROC_ERROR){ link_count --; } } ready_list.swap(ready_list_2); } }
所以请求可以在主线程里面处理, 也可能在线程池里面处理, 如果在线程里面处理, 就会返回PROC_THREAD或者PROC_BACKEND, 此时fd被摘掉, 等请求处理完, 发送response后, 再把fd加入epoll.
TODO: 为什么最后把ready_list_2 里面的元素放到ready_list里面去了, 但是到下一个循环一开始就 ready_list.clear();, 这个clear() 是不是不应该有?
这个函数会根据command_table里的标记, 决定是在主线程处理, 还是在线程池处理, 或者新开一个线程处理:
void Server::proc(ProcJob *job){ proc_map_t::iterator it = proc_map.find(req->at(0)); if(it == proc_map.end()){ resp.push_back("client_error"); }else{ Command *cmd = it->second; job->cmd = cmd; if(cmd->flags & Command::FLAG_THREAD){ //标记为thread的cmd, 会被分到2个线程池里面去跑 if(cmd->flags & Command::FLAG_WRITE){ job->result = PROC_THREAD; writer->push(*job); return; ///// }else if(cmd->flags & Command::FLAG_READ){ job->result = PROC_THREAD; reader->push(*job); return; ///// }else{ log_error("bad command config: %s", cmd->name); } } proc_t p = cmd->proc; job->time_wait = 1000 *(millitime() - job->stime); job->result = (*p)(this, job->link, *req, &resp); //直接在主线程处理. job->time_proc = 1000 *(millitime() - job->stime); } }
这里调用cmd->proc, 就是prox_xxx 函数, 它们都是然后调用 SSDB 类的相应函数:
static int proc_hdel(Server *serv, Link *link, const Request &req, Response *resp){ int ret = serv->ssdb->hdel(req[1], req[2]); }
thread.h 提供线程池(WorkerPool), ssdb里面一个writer 线程池, 一个reader 线程池:
WorkerPool<ProcWorker, ProcJob> *writer; WorkerPool<ProcWorker, ProcJob> *reader; writer = new WorkerPool<ProcWorker, ProcJob>("writer"); writer->start(WRITER_THREADS); //1 reader = new WorkerPool<ProcWorker, ProcJob>("reader"); reader->start(READER_THREADS); //10
每个WorkerPool有两个 Queue:
Queue<JOB> jobs; SelectableQueue<JOB> results; template<class W, class JOB> int WorkerPool<W, JOB>::push(JOB job){ return this->jobs.push(job); } template<class W, class JOB> int WorkerPool<W, JOB>::pop(JOB *job){ return this->results.pop(job); } template<class W, class JOB> void* WorkerPool<W, JOB>::_run_worker(void *arg){ while(1){ JOB job; tp->jobs.pop(&job); worker->proc(&job); tp->results.push(job); } }
SelectableQueue的fd, 在前面epoll_初始化的时候, 已经把这个fd加入监听, 每次一个job处理完成, 就会向SelectableQueue的fd上写一个字节, 这样主线程就能知道这个Job处理完了
worker->porc(&job) 的逻辑:
int Server::ProcWorker::proc(ProcJob *job){ const Request *req = job->link->last_recv(); Response resp; double stime = millitime(); proc_t p = job->cmd->proc; job->result = (*p)(job->serv, job->link, *req, &resp); double etime = millitime(); job->time_wait = 1000 * (stime - job->stime); job->time_proc = 1000 *(etime - stime); if(job->link->send(resp) == -1){ job->result = PROC_ERROR; }else{ log_debug("w:%.3f,p:%.3f, req: %s, resp: %s", job->time_wait, job->time_proc, serialize_req(*req).c_str(), serialize_req(resp).c_str()); } return 0; }
redis | ssdb |
---|---|
string | string |
hash | hash |
list | queue(不等于list) |
set | 无 |
zset | zset |
hash的hset, hget 可以O(n)实现, list 的rpush/lpop也可以O(n) 实现, 但是list的 LINSERT, LINDEX是O(n), 所以ssdb没有实现, 只是实现了可以保持O(n)操作的queue
如何在kv上实现hash/zset/queue
kv:
DEF_PROC(get); DEF_PROC(set);
hash:
DEF_PROC(hget); DEF_PROC(hset);
zset:
DEF_PROC(zrank); DEF_PROC(zrrank); DEF_PROC(zrange);
queue:
DEF_PROC(qsize); DEF_PROC(qfront); DEF_PROC(qback); DEF_PROC(qpush);
get:
static int proc_get(Server *serv, Link *link, const Request &req, Response *resp){ if(req.size() < 2){ resp->push_back("client_error"); }else{ std::string val; int ret = serv->ssdb->get(req[1], &val); if(ret == 1){ resp->push_back("ok"); //找到 这里是ok这个字符串, 后面会转为redis协议. resp->push_back(val); }else if(ret == 0){ resp->push_back("not_found"); }else{ log_error("fail"); resp->push_back("fail"); } } return 0; } typedef std::vector<std::string> Response;
实现了:
DEF_PROC(hsize); // O(1) DEF_PROC(hget); // O(1) DEF_PROC(hset); // O(1) DEF_PROC(hdel); // O(1) DEF_PROC(hincr); // O(1) DEF_PROC(hdecr); // O(1) DEF_PROC(hexists); //O(1) DEF_PROC(hclear); // O(n) DEF_PROC(hscan); // O(n) DEF_PROC(hrscan); // O(n) DEF_PROC(hkeys); // O(n) DEF_PROC(hvals); // O(n)
hkey: {a: b} 存储结构:
_hkey : 1 #size = 1 _hkey_a : b # hkey.a = b
hset_one时, 组一个新的key:
// returns the number of newly added items static int hset_one(const SSDB *ssdb, const Bytes &name, const Bytes &key, const Bytes &val, char log_type){ ... int ret = 0; std::string dbval; if(ssdb->hget(name, key, &dbval) == 0){ // not found std::string hkey = encode_hash_key(name, key); ///////////////////////////////组新key. ssdb->binlogs->Put(hkey, val.Slice()); ssdb->binlogs->add(log_type, BinlogCommand::HSET, hkey); ret = 1; } } inline static std::string encode_hash_key(const Bytes &name, const Bytes &key){ std::string buf; buf.append(1, DataType::HASH); buf.append(1, (uint8_t)name.size()); buf.append(name.data(), name.size()); buf.append(1, '='); buf.append(key.data(), key.size()); return buf; }
专门存size的key, 参考:
static int incr_hsize(SSDB *ssdb, const Bytes &name, int64_t incr){
需要客户端传来, 从那个key scan到哪个key:
static int proc_hscan(Server *serv, Link *link, const Request &req, Response *resp){ uint64_t limit = req[4].Uint64(); HIterator *it = serv->ssdb->hscan(req[1], req[2], req[3], limit); resp->push_back("ok"); while(it->next()){ resp->push_back(it->key); resp->push_back(it->val); } }
用法应该是:
hscan hkey a '' 100
只有ssdb协议支持hscan, 不支持redis的hscan.
实现了这些命令:
{STRATEGY_AUTO, "lpush", "qpush_front", REPLY_STATUS}, {STRATEGY_AUTO, "rpush", "qpush_back", REPLY_STATUS}, {STRATEGY_AUTO, "lpop", "qpop_front", REPLY_BULK}, {STRATEGY_AUTO, "rpop", "qpop_back", REPLY_BULK}, {STRATEGY_AUTO, "llen", "qsize", REPLY_INT},
只能在端上操作, 不能在list中间插入/删除等.
static uint64_t QFRONT_SEQ = 2; static uint64_t QBACK_SEQ = 3; static uint64_t QITEM_MIN_SEQ = 10000; static uint64_t QITEM_MAX_SEQ = 9223372036854775807ULL; static uint64_t QITEM_SEQ_INIT = QITEM_MAX_SEQ/2; //4611686018427387903
rpush qkey msg 后, 存储结构如下:
qkey_2: 4611686018427387903 //front下标 qkey_3: 4611686018427387904 //end下标 qkey_4611686018427387904 //msg
这里 qkey_4611686018427387904 后面这个数字在leveldb key里面是直接存binary格式, 8个字节.
每个zset中的元素对应2个key: zset_key, zscore_key:
k2 = encode_zscore_key(name, key, new_score); k0 = encode_zset_key(name, key);
如下操作:
127.0.0.1:8888> Zadd zkey 3 a (integer) 1 127.0.0.1:8888> ZSCORE zkey a "3"
存储结构:
zkey: 1 #size zkey_a_3: '' #zscore_key zkey_a: 3 #zset_key
每次zset, 先用zset_key 取的score, 构造zscore_key, 删除老记录.
再写新的zset_key和zscore_key.
利用zscore_key, 遍历:
int64_t SSDB::zrank(const Bytes &name, const Bytes &key) const{ ZIterator *it = ziterator(this, name, "", "", "", INT_MAX, Iterator::FORWARD); uint64_t ret = 0; while(true){ if(it->next() == false){ ret = -1; break; } if(key == it->key){ break; } ret ++; } delete it; return ret; }
zrange类似
LevelDB里面, 每种key都有一个前缀:
class DataType{ public: static const char SYNCLOG = 1; static const char KV = 'k'; static const char HASH = 'h'; // hashmap(sorted by key) static const char HSIZE = 'H'; static const char ZSET = 's'; // key => score static const char ZSCORE = 'z'; // key|score => "" static const char ZSIZE = 'Z'; static const char QUEUE = 'q'; static const char QSIZE = 'Q'; static const char MIN_PREFIX = HASH; static const char MAX_PREFIX = ZSET; };
class Binlog class BinlogQueue class Transaction
class Transaction{ private: BinlogQueue *logs; public: Transaction(BinlogQueue *logs){ this->logs = logs; logs->mutex.lock(); logs->begin(); } ~Transaction(){ // it is safe to call rollback after commit logs->rollback(); logs->mutex.unlock(); } };
整个SSDB只有一个BinlogQueue, 而且和数据存放在同一个leveldb里面:
ssdb->binlogs = new BinlogQueue(ssdb->db);
启动SSDB时, 申请一个BinlogQueue对象, seek 到最后一条binlog, (最后一条是用 encode_seq_key(UINT64_MAX) ) 然后启动一个线程, 来删Binlog:
int err = pthread_create(&tid, NULL, &BinlogQueue::log_clean_thread_func, this);
具体写操作的时候:
int SSDB::set(const Bytes &key, const Bytes &val, char log_type){ Transaction trans(binlogs); //这里开始加锁 std::string buf = encode_kv_key(key); binlogs->Put(buf, val.Slice()); //这里是真正的写操作. binlogs->add(log_type, BinlogCommand::KSET, buf); //这里是记录一条日志, 说我对这个key, 做了一次set操作 (没记录value, 难道同步的时候再去取value?) leveldb::Status s = binlogs->commit(); //两个操作一起写 }
其实这里 ssdb->binlogs 相当于存储层, 所有 set/del leveldb 读写操作都是通过 ssdb->binlog 进行的, 但是Get操作却不是通过ssdb->binlog() 操作的:
int64_t SSDB::qsize(const Bytes &name){ std::string key = encode_qsize_key(name); std::string val; leveldb::Status s; s = db->Get(leveldb::ReadOptions(), key, &val); }
这就不太统一, 不方便换下面的存储引擎.
有多种类型:
class BinlogCommand{ public: static const char NONE = 0; static const char KSET = 1; static const char KDEL = 2; static const char HSET = 3; static const char HDEL = 4; static const char ZSET = 5; static const char ZDEL = 6; static const char BEGIN = 7; static const char END = 8; };
都是只记录key:
ssdb->binlogs->add(log_type, BinlogCommand::HSET, hkey);
由Master 主动向从数据, 一个Server 有一个BackendSync实例:
Server::Server(SSDB *ssdb){ this->ssdb = ssdb; backend_sync = new BackendSync(ssdb); ... }
void Slave::start(){ load_status(); log_debug("last_seq: %" PRIu64 ", last_key: %s", last_seq, hexmem(last_key.data(), last_key.size()).c_str()); thread_quit = false; int err = pthread_create(&run_thread_tid, NULL, &Slave::_run_thread, this); if(err != 0){ log_error("can't create thread: %s", strerror(err)); } }
启动时load last_seq , 然后连上master, 发一个 sync140 告诉服务器从哪里开始发binlog:
sprintf(seq_buf, "%" PRIu64 "", this->last_seq); const char *type = is_mirror? "mirror" : "sync"; link->send("sync140", seq_buf, this->last_key, type);
当Slave通过sync命令连上来, Master就会从这个socket把最新的更新发给Slave:
static int proc_sync140(Server *serv, Link *link, const Request &req, Response *resp){ serv->backend_sync->proc(link); return PROC_BACKEND; } int BackendSync::Client::sync(BinlogQueue *logs){ Binlog log; ret = logs->find_next(expect_seq, &log); switch(log.cmd()){ case BinlogCommand::KSET: case BinlogCommand::HSET: case BinlogCommand::ZSET: ret = backend->ssdb->raw_get(log.key(), &val); if(ret == -1){ log_error("fd: %d, raw_get error!", link->fd()); }else if(ret == 0){ //log_debug("%s", hexmem(log.key().data(), log.key().size()).c_str()); log_trace("fd: %d, skip not found: %s", link->fd(), log.dumps().c_str()); }else{ log_trace("fd: %d, %s", link->fd(), log.dumps().c_str()); link->send(log.repr(), val); }
因为binlog只记录了key, 所以这里会再查一次, 把value查出来一起发过去(需要一次读)
问题: 基准数据怎么过去呢? => 可以拷贝.
backend_dump.cpp backend_dump.h
有个dump命令:
PROC(dump, "b"),
相当于redis 的keys,
收到这个命令, 服务器新开一个线程, 把所有数据通过一个socket发过来, 问题:
ning@ning-laptop ~/idning-github/ndb$ printf '*1\r\n$4\r\ndump\r\n' | socat - TCP:localhost:8888,shut-close | xxd 0000000: 350a 6265 6769 6e0a 0a33 0a73 6574 0a39 5.begin..3.set.9 0000010: 0a01 0000 0000 001e 953a 0a32 370a 3a95 .........:.27.:. 0000020: 1e00 0000 0000 0101 6b6b 6579 3a30 3030 ........kkey:000 0000030: 3030 3234 3833 3631 320a 0a33 0a73 6574 002483612..3.set 0000040: 0a39 0a01 0000 0000 001e 953b 0a32 370a .9.........;.27. 0000050: 3b95 1e00 0000 0000 0101 6b6b 6579 3a30 ;.........kkey:0 0000060: 3030 3030 3738 3833 3133 310a 0a33 0a73 00007883131..3.s 0000070: 6574 0a39 0a01 0000 0000 001e 953c 0a32 et.9.........<.2 0000080: 370a 3c95 1e00 0000 0000 0101 6b6b 6579 7.<.........kkey 0000090: 3a30 3030 3030 3236 3435 3335 320a 0a33 :000002645352..3 00000a0: 0a73 6574 0a39 0a01 0000 0000 001e 953d .set.9.........= 00000b0: 0a32 370a 3d95 1e00 0000 0000 0101 6b6b .27.=.........kk 00000c0: 6579 3a30 3030 3030 3935 3033 3539 380a ey:000009503598. 00000d0: 0a33 0a73 6574 0a39 0a01 0000 0000 001e .3.set.9........ 00000e0: 953e 0a32 370a 3e95 1e00 0000 0000 0101 .>.27.>......... 00000f0: 6b6b 6579 3a30 3030 3030 3938 3038 3139 kkey:00000980819 0000100: 360a 0a33 0a73 6574 0a39 0a01 0000 0000 6..3.set.9...... 0000110: 001e 953f 0a32 370a 3f95 1e00 0000 0000 ...?.27.?....... 0000120: 0101 6b6b 6579 3a30 3030 3030 3238 3232 ..kkey:000002822 0000130: 3337 360a 0a33 0a73 6574 0a39 0a01 0000 376..3.set.9.... 0000140: 0000 001e 9540 0a32 370a 4095 1e00 0000 .....@.27.@..... 0000150: 0000 0101 6b6b 6579 3a30 3030 3030 3134 ....kkey:0000014 0000160: 3936 3935 320a 0a33 0a73 6574 0a39 0a01 96952..3.set.9.. 0000170: 0000 0000 001e 9541 0a32 370a 4195 1e00 .......A.27.A... 0000180: 0000 0000 0101 6b6b 6579 3a30 3030 3030 ......kkey:00000 0000190: 3432 3033 3036 370a 0a33 0a73 6574 0a39 4203067..3.set.9 00001a0: 0a01 0000 0000 001e 9542 0a32 370a 4295 .........B.27.B. 00001b0: 1e00 0000 0000 0101 6b6b 6579 3a30 3030 ........kkey:000 00001c0: 3030 3139 3133 3036 320a 0a33 0a73 6574 001913062..3.set 00001d0: 0a39 0a01 0000 0000 001e 9543 0a32 370a .9.........C.27. 00001e0: 4395 1e00 0000 0000 0101 6b6b 6579 3a30 C.........kkey:0 00001f0: 3030 3030 3637 3432 3136 310a 0a33 0a65 00006742161..3.e 0000200: 6e64 0a32 0a31 300a 0a nd.2.10..
通过Transaction上的锁实现.
ssdb为每个带有过期设置的key, 保存了2个结构:
ssdb 里面保留这个key:
#define EXPIRATION_LIST_KEY "\xff\xff\xff\xff\xff|EXPIRE_LIST|KV"
用这个key作为一个大zset(基于ssdb在leveldb上提供的zset), 当需要设置一个key的ttl时, 就向这个zset里面设置某个key的超时时间:
int ExpirationHandler::set_ttl(const Bytes &key, int ttl){ int64_t expired = time_ms() + ttl * 1000; char data[30]; int size = snprintf(data, sizeof(data), "%" PRId64, expired); if(size <= 0){ log_error("snprintf return error!"); return -1; } Locking l(&mutex); int ret = ssdb->zset(this->list_name, key, Bytes(data, size)); }
同时还会放到一个内存的sorted_set(expiration_keys)里面去:
expiration_keys.add(key.String(), expired);
ssdb启动后, 会有一个线程把所有 EXPIRATION_LIST_KEY 这个 zset 里面的所有key扫描出来, 放到内存的 expiration_keys 里.
回收时, 由一个线程从 expiration_keys 里面取出每个key, 把key和它对应的expire记录删掉:
if(handler->expiration_keys.front(&key, &score)){ int64_t now = time_ms(); if(score <= now){ log_debug("expired %s", key->c_str()); ssdb->del(*key); ssdb->zdel(handler->list_name, *key); handler->expiration_keys.pop_front(); continue; } }
问题是: 所有的key都要装到内存, 内存会比较大, 而读的时候又没有利用上这个内存.
benchmark结果: 100G数据 时, 读性能稳定在大约5000qps
貌似是一个命令. 需要人工调用.
如果有10亿条, 每条100字节, 就需要100G+内存.
因为leveldb 的接口, 删除实际上是一个写操作(写为空串). 所以删除接口不能返回这个key在是真的删了, 还是本来就不存在.
所以在删除一个不存在的key时 for redis:
ning@ning-laptop ~/idning-github/redis/src$ redis-cli -p 2000 del xxx (integer) 0
for ssdb:
ning@ning-laptop ~/idning-github/redis/src$ redis-cli -p 8888 del xxx (integer) 1
ssdb:
127.0.0.1:8888> ttl k (error) ERR
redis:
127.0.0.1:5527> ttl k (integer) -2
改为通过ttl命令设置expire.
一次在一个连接上吐回所有key, 而且要全放到内存resp里面再一次吐出:
static int proc_scan(Server *serv, Link *link, const Request &req, Response *resp){ if(req.size() < 4){ resp->push_back("client_error"); }else{ uint64_t limit = req[3].Uint64(); KIterator *it = serv->ssdb->scan(req[1], req[2], limit); resp->push_back("ok"); while(it->next()){ resp->push_back(it->key); resp->push_back(it->val); } delete it; } return 0; }