Table of Contents
代码量大约9000行:
ning@ning-laptop:~/test/ssdb/src$ cat *.cpp *.h | wc -l 7223 ning@ning-laptop:~/test/ssdb/src$ cat util/*.cpp util/*.h | wc -l 2404
协议解析:
link.cpp link.h link_redis.cpp link_redis.h
数据类型:
t_hash.cpp t_hash.h t_kv.cpp t_kv.h t_queue.cpp t_queue.h t_zset.cpp t_zset.h proc_hash.cpp proc_kv.cpp proc_queue.cpp proc_zset.cpp #ttl 也相当于是一种数据类型. ttl.cpp ttl.h
主从同步:
binlog.cpp binlog.h slave.cpp slave.h #backend_dump.cpp #backend_dump.h backend_sync.cpp backend_sync.h
服务框架:
serv.cpp serv.h ssdb.cpp ssdb.h ssdb-server.cpp
其它:
include.h version.h iterator.cpp iterator.h
和icomet一样:
config.cpp config.h daemon.h file.h ip_filter.h log.cpp log.h strings.h
epoll:
fde.cpp fde_epoll.cpp fde.h fde_select.cpp
bytes.cpp bytes.h Bytes, Buffer 用于实现string的一些操作.
sorted_set.cpp sorted_set.h
SelectableQueue: 提供一个基于管道实现的Queue, 从而使得这个Queue 可以做Select.
class Link{
int sock;
bool noblock_;
static int min_recv_buf;
static int min_send_buf;
std::vector<Bytes> recv_data; //recv_data 是解析后的bulk链表.
Buffer *input;
Buffer *output;
double create_time;
double active_time;
int read(); //从网络读到input
int write(); //从output写到网络
int flush(); //调用wirte.
const std::vector<Bytes>* recv(); //从input解析到recv_data
int send(const Bytes &s1); //写到output
int send(const Bytes &s1, const Bytes &s2);
int send(const Bytes &s1, const Bytes &s2, const Bytes &s3);
...
const std::vector<Bytes>* response(); //通过read() 和recv() 读取response.
const std::vector<Bytes>* request(const Bytes &s1); //铜鼓send() 和flush() 写request.
const std::vector<Bytes>* request(const Bytes &s1, const Bytes &s2);
...
}
这里:
两个函数, 处理对象是input, output:
send_resp(Buffer *output, const std::vector<std::string> &resp) //发送redis格式的响应, resp是一个 string 构成的vector. parse_req(Buffer *input) //读redis格式的请求 recv_req(Buffer *input) //调用parse_req 之后调用convert 转为内部cmd格式.
parse_req 对input的处理是, 如果一次没有解析到一个完整的命令, 那么input buffer不动, 下次继续.
命令表:
#define PROC(c, f) {#c, f, 0, proc_##c, 0, 0, 0}
static Command commands[] = {
PROC(get, "r"),
PROC(set, "wt"), //t表示在线程池里面run
...
PROC(dump, "b"), //b表示在后台run
}
全局变量:
static proc_map_t proc_map;
//启动时初始化 这个map
for(Command *cmd=commands; cmd->name; cmd++){
proc_map[cmd->name] = cmd;
}
收到请求后, Server::proc函数会在proc_map里面查找
ssdb-server.cpp
全局:
Config *conf = NULL; SSDB *ssdb = NULL; Link *serv_link = NULL; IpFilter *ip_filter = NULL; typedef std::vector<Link *> ready_list_t; volatile bool quit = false; volatile uint32_t g_ticks = 0;
main:
main(){
init(); //load cfg, daemon, listen, init ipfilter, init SSDB, signal
run(argc, argv);
}
run():
run(){
Fdevents fdes;
fdes.set(serv_link->fd(), FDEVENT_IN, 0, serv_link); //监听socket
fdes.set(serv.reader->fd(), FDEVENT_IN, 0, serv.reader); //reader.SelectableQueue.fd
fdes.set(serv.writer->fd(), FDEVENT_IN, 0, serv.writer);
while(!quit){
ready_list.clear();
ready_list_2.clear();
events = fdes.wait(50);
for(int i=0; i<(int)events->size(); i++){
if(fde->data.ptr == serv_link){
//do accept
}else if(fde->data.ptr == serv.reader || fde->data.ptr == serv.writer){
//从子进程那里收结果, 这里比较复杂
proc_result(job, fdes, ready_list_2);
}else{
if(fde->events & FDEVENT_IN){
int len = link->read(); //用read
ready_list.push_back(link); //放到ready_list
}else if(fde->events & FDEVENT_OUT){
int len = link->write(); //用write
ready_list.push_back(link); //放到ready_list
}
}
}
for(it = ready_list.begin(); it != ready_list.end(); it ++){
Link *link = *it;
const Request *req = link->recv();
ProcJob job;
job.link = link;
serv.proc(&job); //这里面可能把Job发到一个线程去.
if(job.result == PROC_THREAD){
fdes.del(link->fd());
continue;
}
if(job.result == PROC_BACKEND){
fdes.del(link->fd());
link_count --;
continue;
}
//这里是直接处理的情况.
if(proc_result(job, fdes, ready_list_2) == PROC_ERROR){
link_count --;
}
}
ready_list.swap(ready_list_2);
}
}
所以请求可以在主线程里面处理, 也可能在线程池里面处理, 如果在线程里面处理, 就会返回PROC_THREAD或者PROC_BACKEND, 此时fd被摘掉, 等请求处理完, 发送response后, 再把fd加入epoll.
TODO: 为什么最后把ready_list_2 里面的元素放到ready_list里面去了, 但是到下一个循环一开始就 ready_list.clear();, 这个clear() 是不是不应该有?
这个函数会根据command_table里的标记, 决定是在主线程处理, 还是在线程池处理, 或者新开一个线程处理:
void Server::proc(ProcJob *job){
proc_map_t::iterator it = proc_map.find(req->at(0));
if(it == proc_map.end()){
resp.push_back("client_error");
}else{
Command *cmd = it->second;
job->cmd = cmd;
if(cmd->flags & Command::FLAG_THREAD){ //标记为thread的cmd, 会被分到2个线程池里面去跑
if(cmd->flags & Command::FLAG_WRITE){
job->result = PROC_THREAD;
writer->push(*job);
return; /////
}else if(cmd->flags & Command::FLAG_READ){
job->result = PROC_THREAD;
reader->push(*job);
return; /////
}else{
log_error("bad command config: %s", cmd->name);
}
}
proc_t p = cmd->proc;
job->time_wait = 1000 *(millitime() - job->stime);
job->result = (*p)(this, job->link, *req, &resp); //直接在主线程处理.
job->time_proc = 1000 *(millitime() - job->stime);
}
}
这里调用cmd->proc, 就是prox_xxx 函数, 它们都是然后调用 SSDB 类的相应函数:
static int proc_hdel(Server *serv, Link *link, const Request &req, Response *resp){
int ret = serv->ssdb->hdel(req[1], req[2]);
}
thread.h 提供线程池(WorkerPool), ssdb里面一个writer 线程池, 一个reader 线程池:
WorkerPool<ProcWorker, ProcJob> *writer;
WorkerPool<ProcWorker, ProcJob> *reader;
writer = new WorkerPool<ProcWorker, ProcJob>("writer");
writer->start(WRITER_THREADS); //1
reader = new WorkerPool<ProcWorker, ProcJob>("reader");
reader->start(READER_THREADS); //10
每个WorkerPool有两个 Queue:
Queue<JOB> jobs;
SelectableQueue<JOB> results;
template<class W, class JOB>
int WorkerPool<W, JOB>::push(JOB job){
return this->jobs.push(job);
}
template<class W, class JOB>
int WorkerPool<W, JOB>::pop(JOB *job){
return this->results.pop(job);
}
template<class W, class JOB>
void* WorkerPool<W, JOB>::_run_worker(void *arg){
while(1){
JOB job;
tp->jobs.pop(&job);
worker->proc(&job);
tp->results.push(job);
}
}
SelectableQueue的fd, 在前面epoll_初始化的时候, 已经把这个fd加入监听, 每次一个job处理完成, 就会向SelectableQueue的fd上写一个字节, 这样主线程就能知道这个Job处理完了
worker->porc(&job) 的逻辑:
int Server::ProcWorker::proc(ProcJob *job){
const Request *req = job->link->last_recv();
Response resp;
double stime = millitime();
proc_t p = job->cmd->proc;
job->result = (*p)(job->serv, job->link, *req, &resp);
double etime = millitime();
job->time_wait = 1000 * (stime - job->stime);
job->time_proc = 1000 *(etime - stime);
if(job->link->send(resp) == -1){
job->result = PROC_ERROR;
}else{
log_debug("w:%.3f,p:%.3f, req: %s, resp: %s",
job->time_wait, job->time_proc,
serialize_req(*req).c_str(),
serialize_req(resp).c_str());
}
return 0;
}
| redis | ssdb |
|---|---|
| string | string |
| hash | hash |
| list | queue(不等于list) |
| set | 无 |
| zset | zset |
hash的hset, hget 可以O(n)实现, list 的rpush/lpop也可以O(n) 实现, 但是list的 LINSERT, LINDEX是O(n), 所以ssdb没有实现, 只是实现了可以保持O(n)操作的queue
如何在kv上实现hash/zset/queue
kv:
DEF_PROC(get); DEF_PROC(set);
hash:
DEF_PROC(hget); DEF_PROC(hset);
zset:
DEF_PROC(zrank); DEF_PROC(zrrank); DEF_PROC(zrange);
queue:
DEF_PROC(qsize); DEF_PROC(qfront); DEF_PROC(qback); DEF_PROC(qpush);
get:
static int proc_get(Server *serv, Link *link, const Request &req, Response *resp){
if(req.size() < 2){
resp->push_back("client_error");
}else{
std::string val;
int ret = serv->ssdb->get(req[1], &val);
if(ret == 1){
resp->push_back("ok"); //找到 这里是ok这个字符串, 后面会转为redis协议.
resp->push_back(val);
}else if(ret == 0){
resp->push_back("not_found");
}else{
log_error("fail");
resp->push_back("fail");
}
}
return 0;
}
typedef std::vector<std::string> Response;
实现了:
DEF_PROC(hsize); // O(1) DEF_PROC(hget); // O(1) DEF_PROC(hset); // O(1) DEF_PROC(hdel); // O(1) DEF_PROC(hincr); // O(1) DEF_PROC(hdecr); // O(1) DEF_PROC(hexists); //O(1) DEF_PROC(hclear); // O(n) DEF_PROC(hscan); // O(n) DEF_PROC(hrscan); // O(n) DEF_PROC(hkeys); // O(n) DEF_PROC(hvals); // O(n)
hkey: {a: b} 存储结构:
_hkey : 1 #size = 1 _hkey_a : b # hkey.a = b
hset_one时, 组一个新的key:
// returns the number of newly added items
static int hset_one(const SSDB *ssdb, const Bytes &name, const Bytes &key, const Bytes &val, char log_type){
...
int ret = 0;
std::string dbval;
if(ssdb->hget(name, key, &dbval) == 0){ // not found
std::string hkey = encode_hash_key(name, key); ///////////////////////////////组新key.
ssdb->binlogs->Put(hkey, val.Slice());
ssdb->binlogs->add(log_type, BinlogCommand::HSET, hkey);
ret = 1;
}
}
inline static
std::string encode_hash_key(const Bytes &name, const Bytes &key){
std::string buf;
buf.append(1, DataType::HASH);
buf.append(1, (uint8_t)name.size());
buf.append(name.data(), name.size());
buf.append(1, '=');
buf.append(key.data(), key.size());
return buf;
}
专门存size的key, 参考:
static int incr_hsize(SSDB *ssdb, const Bytes &name, int64_t incr){
需要客户端传来, 从那个key scan到哪个key:
static int proc_hscan(Server *serv, Link *link, const Request &req, Response *resp){
uint64_t limit = req[4].Uint64();
HIterator *it = serv->ssdb->hscan(req[1], req[2], req[3], limit);
resp->push_back("ok");
while(it->next()){
resp->push_back(it->key);
resp->push_back(it->val);
}
}
用法应该是:
hscan hkey a '' 100
只有ssdb协议支持hscan, 不支持redis的hscan.
实现了这些命令:
{STRATEGY_AUTO, "lpush", "qpush_front", REPLY_STATUS},
{STRATEGY_AUTO, "rpush", "qpush_back", REPLY_STATUS},
{STRATEGY_AUTO, "lpop", "qpop_front", REPLY_BULK},
{STRATEGY_AUTO, "rpop", "qpop_back", REPLY_BULK},
{STRATEGY_AUTO, "llen", "qsize", REPLY_INT},
只能在端上操作, 不能在list中间插入/删除等.
static uint64_t QFRONT_SEQ = 2; static uint64_t QBACK_SEQ = 3; static uint64_t QITEM_MIN_SEQ = 10000; static uint64_t QITEM_MAX_SEQ = 9223372036854775807ULL; static uint64_t QITEM_SEQ_INIT = QITEM_MAX_SEQ/2; //4611686018427387903
rpush qkey msg 后, 存储结构如下:
qkey_2: 4611686018427387903 //front下标 qkey_3: 4611686018427387904 //end下标 qkey_4611686018427387904 //msg
这里 qkey_4611686018427387904 后面这个数字在leveldb key里面是直接存binary格式, 8个字节.
每个zset中的元素对应2个key: zset_key, zscore_key:
k2 = encode_zscore_key(name, key, new_score); k0 = encode_zset_key(name, key);
如下操作:
127.0.0.1:8888> Zadd zkey 3 a (integer) 1 127.0.0.1:8888> ZSCORE zkey a "3"
存储结构:
zkey: 1 #size zkey_a_3: '' #zscore_key zkey_a: 3 #zset_key
每次zset, 先用zset_key 取的score, 构造zscore_key, 删除老记录.
再写新的zset_key和zscore_key.
利用zscore_key, 遍历:
int64_t SSDB::zrank(const Bytes &name, const Bytes &key) const{
ZIterator *it = ziterator(this, name, "", "", "", INT_MAX, Iterator::FORWARD);
uint64_t ret = 0;
while(true){
if(it->next() == false){
ret = -1;
break;
}
if(key == it->key){
break;
}
ret ++;
}
delete it;
return ret;
}
zrange类似
LevelDB里面, 每种key都有一个前缀:
class DataType{
public:
static const char SYNCLOG = 1;
static const char KV = 'k';
static const char HASH = 'h'; // hashmap(sorted by key)
static const char HSIZE = 'H';
static const char ZSET = 's'; // key => score
static const char ZSCORE = 'z'; // key|score => ""
static const char ZSIZE = 'Z';
static const char QUEUE = 'q';
static const char QSIZE = 'Q';
static const char MIN_PREFIX = HASH;
static const char MAX_PREFIX = ZSET;
};
class Binlog class BinlogQueue class Transaction
class Transaction{
private:
BinlogQueue *logs;
public:
Transaction(BinlogQueue *logs){
this->logs = logs;
logs->mutex.lock();
logs->begin();
}
~Transaction(){
// it is safe to call rollback after commit
logs->rollback();
logs->mutex.unlock();
}
};
整个SSDB只有一个BinlogQueue, 而且和数据存放在同一个leveldb里面:
ssdb->binlogs = new BinlogQueue(ssdb->db);
启动SSDB时, 申请一个BinlogQueue对象, seek 到最后一条binlog, (最后一条是用 encode_seq_key(UINT64_MAX) ) 然后启动一个线程, 来删Binlog:
int err = pthread_create(&tid, NULL, &BinlogQueue::log_clean_thread_func, this);
具体写操作的时候:
int SSDB::set(const Bytes &key, const Bytes &val, char log_type){
Transaction trans(binlogs); //这里开始加锁
std::string buf = encode_kv_key(key);
binlogs->Put(buf, val.Slice()); //这里是真正的写操作.
binlogs->add(log_type, BinlogCommand::KSET, buf); //这里是记录一条日志, 说我对这个key, 做了一次set操作 (没记录value, 难道同步的时候再去取value?)
leveldb::Status s = binlogs->commit(); //两个操作一起写
}
其实这里 ssdb->binlogs 相当于存储层, 所有 set/del leveldb 读写操作都是通过 ssdb->binlog 进行的, 但是Get操作却不是通过ssdb->binlog() 操作的:
int64_t SSDB::qsize(const Bytes &name){
std::string key = encode_qsize_key(name);
std::string val;
leveldb::Status s;
s = db->Get(leveldb::ReadOptions(), key, &val);
}
这就不太统一, 不方便换下面的存储引擎.
有多种类型:
class BinlogCommand{
public:
static const char NONE = 0;
static const char KSET = 1;
static const char KDEL = 2;
static const char HSET = 3;
static const char HDEL = 4;
static const char ZSET = 5;
static const char ZDEL = 6;
static const char BEGIN = 7;
static const char END = 8;
};
都是只记录key:
ssdb->binlogs->add(log_type, BinlogCommand::HSET, hkey);
由Master 主动向从数据, 一个Server 有一个BackendSync实例:
Server::Server(SSDB *ssdb){
this->ssdb = ssdb;
backend_sync = new BackendSync(ssdb);
...
}
void Slave::start(){
load_status();
log_debug("last_seq: %" PRIu64 ", last_key: %s",
last_seq, hexmem(last_key.data(), last_key.size()).c_str());
thread_quit = false;
int err = pthread_create(&run_thread_tid, NULL, &Slave::_run_thread, this);
if(err != 0){
log_error("can't create thread: %s", strerror(err));
}
}
启动时load last_seq , 然后连上master, 发一个 sync140 告诉服务器从哪里开始发binlog:
sprintf(seq_buf, "%" PRIu64 "", this->last_seq);
const char *type = is_mirror? "mirror" : "sync";
link->send("sync140", seq_buf, this->last_key, type);
当Slave通过sync命令连上来, Master就会从这个socket把最新的更新发给Slave:
static int proc_sync140(Server *serv, Link *link, const Request &req, Response *resp){
serv->backend_sync->proc(link);
return PROC_BACKEND;
}
int BackendSync::Client::sync(BinlogQueue *logs){
Binlog log;
ret = logs->find_next(expect_seq, &log);
switch(log.cmd()){
case BinlogCommand::KSET:
case BinlogCommand::HSET:
case BinlogCommand::ZSET:
ret = backend->ssdb->raw_get(log.key(), &val);
if(ret == -1){
log_error("fd: %d, raw_get error!", link->fd());
}else if(ret == 0){
//log_debug("%s", hexmem(log.key().data(), log.key().size()).c_str());
log_trace("fd: %d, skip not found: %s", link->fd(), log.dumps().c_str());
}else{
log_trace("fd: %d, %s", link->fd(), log.dumps().c_str());
link->send(log.repr(), val);
}
因为binlog只记录了key, 所以这里会再查一次, 把value查出来一起发过去(需要一次读)
问题: 基准数据怎么过去呢? => 可以拷贝.
backend_dump.cpp backend_dump.h
有个dump命令:
PROC(dump, "b"),
相当于redis 的keys,
收到这个命令, 服务器新开一个线程, 把所有数据通过一个socket发过来, 问题:
ning@ning-laptop ~/idning-github/ndb$ printf '*1\r\n$4\r\ndump\r\n' | socat - TCP:localhost:8888,shut-close | xxd 0000000: 350a 6265 6769 6e0a 0a33 0a73 6574 0a39 5.begin..3.set.9 0000010: 0a01 0000 0000 001e 953a 0a32 370a 3a95 .........:.27.:. 0000020: 1e00 0000 0000 0101 6b6b 6579 3a30 3030 ........kkey:000 0000030: 3030 3234 3833 3631 320a 0a33 0a73 6574 002483612..3.set 0000040: 0a39 0a01 0000 0000 001e 953b 0a32 370a .9.........;.27. 0000050: 3b95 1e00 0000 0000 0101 6b6b 6579 3a30 ;.........kkey:0 0000060: 3030 3030 3738 3833 3133 310a 0a33 0a73 00007883131..3.s 0000070: 6574 0a39 0a01 0000 0000 001e 953c 0a32 et.9.........<.2 0000080: 370a 3c95 1e00 0000 0000 0101 6b6b 6579 7.<.........kkey 0000090: 3a30 3030 3030 3236 3435 3335 320a 0a33 :000002645352..3 00000a0: 0a73 6574 0a39 0a01 0000 0000 001e 953d .set.9.........= 00000b0: 0a32 370a 3d95 1e00 0000 0000 0101 6b6b .27.=.........kk 00000c0: 6579 3a30 3030 3030 3935 3033 3539 380a ey:000009503598. 00000d0: 0a33 0a73 6574 0a39 0a01 0000 0000 001e .3.set.9........ 00000e0: 953e 0a32 370a 3e95 1e00 0000 0000 0101 .>.27.>......... 00000f0: 6b6b 6579 3a30 3030 3030 3938 3038 3139 kkey:00000980819 0000100: 360a 0a33 0a73 6574 0a39 0a01 0000 0000 6..3.set.9...... 0000110: 001e 953f 0a32 370a 3f95 1e00 0000 0000 ...?.27.?....... 0000120: 0101 6b6b 6579 3a30 3030 3030 3238 3232 ..kkey:000002822 0000130: 3337 360a 0a33 0a73 6574 0a39 0a01 0000 376..3.set.9.... 0000140: 0000 001e 9540 0a32 370a 4095 1e00 0000 .....@.27.@..... 0000150: 0000 0101 6b6b 6579 3a30 3030 3030 3134 ....kkey:0000014 0000160: 3936 3935 320a 0a33 0a73 6574 0a39 0a01 96952..3.set.9.. 0000170: 0000 0000 001e 9541 0a32 370a 4195 1e00 .......A.27.A... 0000180: 0000 0000 0101 6b6b 6579 3a30 3030 3030 ......kkey:00000 0000190: 3432 3033 3036 370a 0a33 0a73 6574 0a39 4203067..3.set.9 00001a0: 0a01 0000 0000 001e 9542 0a32 370a 4295 .........B.27.B. 00001b0: 1e00 0000 0000 0101 6b6b 6579 3a30 3030 ........kkey:000 00001c0: 3030 3139 3133 3036 320a 0a33 0a73 6574 001913062..3.set 00001d0: 0a39 0a01 0000 0000 001e 9543 0a32 370a .9.........C.27. 00001e0: 4395 1e00 0000 0000 0101 6b6b 6579 3a30 C.........kkey:0 00001f0: 3030 3030 3637 3432 3136 310a 0a33 0a65 00006742161..3.e 0000200: 6e64 0a32 0a31 300a 0a nd.2.10..
通过Transaction上的锁实现.
ssdb为每个带有过期设置的key, 保存了2个结构:
ssdb 里面保留这个key:
#define EXPIRATION_LIST_KEY "\xff\xff\xff\xff\xff|EXPIRE_LIST|KV"
用这个key作为一个大zset(基于ssdb在leveldb上提供的zset), 当需要设置一个key的ttl时, 就向这个zset里面设置某个key的超时时间:
int ExpirationHandler::set_ttl(const Bytes &key, int ttl){
int64_t expired = time_ms() + ttl * 1000;
char data[30];
int size = snprintf(data, sizeof(data), "%" PRId64, expired);
if(size <= 0){
log_error("snprintf return error!");
return -1;
}
Locking l(&mutex);
int ret = ssdb->zset(this->list_name, key, Bytes(data, size));
}
同时还会放到一个内存的sorted_set(expiration_keys)里面去:
expiration_keys.add(key.String(), expired);
ssdb启动后, 会有一个线程把所有 EXPIRATION_LIST_KEY 这个 zset 里面的所有key扫描出来, 放到内存的 expiration_keys 里.
回收时, 由一个线程从 expiration_keys 里面取出每个key, 把key和它对应的expire记录删掉:
if(handler->expiration_keys.front(&key, &score)){
int64_t now = time_ms();
if(score <= now){
log_debug("expired %s", key->c_str());
ssdb->del(*key);
ssdb->zdel(handler->list_name, *key);
handler->expiration_keys.pop_front();
continue;
}
}
问题是: 所有的key都要装到内存, 内存会比较大, 而读的时候又没有利用上这个内存.
benchmark结果: 100G数据 时, 读性能稳定在大约5000qps
貌似是一个命令. 需要人工调用.
如果有10亿条, 每条100字节, 就需要100G+内存.
因为leveldb 的接口, 删除实际上是一个写操作(写为空串). 所以删除接口不能返回这个key在是真的删了, 还是本来就不存在.
所以在删除一个不存在的key时 for redis:
ning@ning-laptop ~/idning-github/redis/src$ redis-cli -p 2000 del xxx (integer) 0
for ssdb:
ning@ning-laptop ~/idning-github/redis/src$ redis-cli -p 8888 del xxx (integer) 1
ssdb:
127.0.0.1:8888> ttl k (error) ERR
redis:
127.0.0.1:5527> ttl k (integer) -2
改为通过ttl命令设置expire.
一次在一个连接上吐回所有key, 而且要全放到内存resp里面再一次吐出:
static int proc_scan(Server *serv, Link *link, const Request &req, Response *resp){
if(req.size() < 4){
resp->push_back("client_error");
}else{
uint64_t limit = req[3].Uint64();
KIterator *it = serv->ssdb->scan(req[1], req[2], limit);
resp->push_back("ok");
while(it->next()){
resp->push_back(it->key);
resp->push_back(it->val);
}
delete it;
}
return 0;
}