Permalink: 2014-07-24 15:12:18 by ning in redis tags: all redis

1   问题

  • 请求处理模型, 每个线程一个req?
  • 如何在kv上实现hash/zset/queue
  • 如何处理expire/ttl
  • 如何支持事务, binlog?
  • 如何主从同步.
  • LevelDB读有没有缓存, 如果没有缓存, 那么读性能 < iops

2   代码

代码量大约9000行:

ning@ning-laptop:~/test/ssdb/src$ cat *.cpp *.h | wc -l
7223

ning@ning-laptop:~/test/ssdb/src$ cat util/*.cpp util/*.h | wc -l
2404

协议解析:

link.cpp
link.h
link_redis.cpp
link_redis.h

数据类型:

t_hash.cpp
t_hash.h
t_kv.cpp
t_kv.h
t_queue.cpp
t_queue.h
t_zset.cpp
t_zset.h
proc_hash.cpp
proc_kv.cpp
proc_queue.cpp
proc_zset.cpp

#ttl 也相当于是一种数据类型.
ttl.cpp
ttl.h

主从同步:

binlog.cpp
binlog.h
slave.cpp
slave.h
#backend_dump.cpp
#backend_dump.h
backend_sync.cpp
backend_sync.h

服务框架:

serv.cpp
serv.h
ssdb.cpp
ssdb.h
ssdb-server.cpp

其它:

include.h
version.h
iterator.cpp
iterator.h

2.1   util

和icomet一样:

config.cpp
config.h
daemon.h
file.h
ip_filter.h
log.cpp
log.h
strings.h

epoll:

fde.cpp
fde_epoll.cpp
fde.h
fde_select.cpp

bytes.cpp bytes.h Bytes, Buffer 用于实现string的一些操作.

sorted_set.cpp sorted_set.h

SelectableQueue: 提供一个基于管道实现的Queue, 从而使得这个Queue 可以做Select.

2.2   请求处理模型

2.2.3   命令表

命令表:

#define PROC(c, f) {#c, f, 0, proc_##c, 0, 0, 0}
static Command commands[] = {
    PROC(get, "r"),
    PROC(set, "wt"),        //t表示在线程池里面run
    ...
    PROC(dump, "b"),        //b表示在后台run
}

全局变量:

static proc_map_t proc_map;

//启动时初始化 这个map
for(Command *cmd=commands; cmd->name; cmd++){
    proc_map[cmd->name] = cmd;
}

收到请求后, Server::proc函数会在proc_map里面查找

2.2.4   处理模型

ssdb-server.cpp

全局:

Config *conf = NULL;
SSDB *ssdb = NULL;
Link *serv_link = NULL;
IpFilter *ip_filter = NULL;

typedef std::vector<Link *> ready_list_t;
volatile bool quit = false;
volatile uint32_t g_ticks = 0;

main:

main(){
    init(); //load cfg, daemon, listen, init ipfilter, init SSDB, signal
    run(argc, argv);
}

run():

run(){
    Fdevents fdes;
    fdes.set(serv_link->fd(), FDEVENT_IN, 0, serv_link);            //监听socket
    fdes.set(serv.reader->fd(), FDEVENT_IN, 0, serv.reader);        //reader.SelectableQueue.fd
    fdes.set(serv.writer->fd(), FDEVENT_IN, 0, serv.writer);

    while(!quit){
        ready_list.clear();
        ready_list_2.clear();

        events = fdes.wait(50);
        for(int i=0; i<(int)events->size(); i++){
            if(fde->data.ptr == serv_link){
                //do accept
            }else if(fde->data.ptr == serv.reader || fde->data.ptr == serv.writer){
                //从子进程那里收结果, 这里比较复杂
                proc_result(job, fdes, ready_list_2);
            }else{
                if(fde->events & FDEVENT_IN){
                    int len = link->read();                 //用read
                    ready_list.push_back(link);                         //放到ready_list
                }else if(fde->events & FDEVENT_OUT){
                    int len = link->write();                //用write
                    ready_list.push_back(link);                         //放到ready_list
                }
            }
        }
        for(it = ready_list.begin(); it != ready_list.end(); it ++){
            Link *link = *it;
            const Request *req = link->recv();

            ProcJob job;
            job.link = link;
            serv.proc(&job);                        //这里面可能把Job发到一个线程去.
            if(job.result == PROC_THREAD){
                fdes.del(link->fd());
                continue;
            }
            if(job.result == PROC_BACKEND){
                fdes.del(link->fd());
                link_count --;
                continue;
            }
            //这里是直接处理的情况.
            if(proc_result(job, fdes, ready_list_2) == PROC_ERROR){
                link_count --;
            }
        }
        ready_list.swap(ready_list_2);
    }
}

所以请求可以在主线程里面处理, 也可能在线程池里面处理, 如果在线程里面处理, 就会返回PROC_THREAD或者PROC_BACKEND, 此时fd被摘掉, 等请求处理完, 发送response后, 再把fd加入epoll.

TODO: 为什么最后把ready_list_2 里面的元素放到ready_list里面去了, 但是到下一个循环一开始就 ready_list.clear();, 这个clear() 是不是不应该有?

2.2.5   serv.proc

这个函数会根据command_table里的标记, 决定是在主线程处理, 还是在线程池处理, 或者新开一个线程处理:

void Server::proc(ProcJob *job){
    proc_map_t::iterator it = proc_map.find(req->at(0));
    if(it == proc_map.end()){
        resp.push_back("client_error");
    }else{
        Command *cmd = it->second;
        job->cmd = cmd;
        if(cmd->flags & Command::FLAG_THREAD){              //标记为thread的cmd, 会被分到2个线程池里面去跑
            if(cmd->flags & Command::FLAG_WRITE){
                job->result = PROC_THREAD;
                writer->push(*job);
                return; /////
            }else if(cmd->flags & Command::FLAG_READ){
                job->result = PROC_THREAD;
                reader->push(*job);
                return; /////
            }else{
                log_error("bad command config: %s", cmd->name);
            }
        }

        proc_t p = cmd->proc;
        job->time_wait = 1000 *(millitime() - job->stime);
        job->result = (*p)(this, job->link, *req, &resp);   //直接在主线程处理.
        job->time_proc = 1000 *(millitime() - job->stime);
    }
}

这里调用cmd->proc, 就是prox_xxx 函数, 它们都是然后调用 SSDB 类的相应函数:

static int proc_hdel(Server *serv, Link *link, const Request &req, Response *resp){
    int ret = serv->ssdb->hdel(req[1], req[2]);
}

2.2.6   WorkerPool

thread.h 提供线程池(WorkerPool), ssdb里面一个writer 线程池, 一个reader 线程池:

WorkerPool<ProcWorker, ProcJob> *writer;
WorkerPool<ProcWorker, ProcJob> *reader;

writer = new WorkerPool<ProcWorker, ProcJob>("writer");
writer->start(WRITER_THREADS);                                  //1
reader = new WorkerPool<ProcWorker, ProcJob>("reader");
reader->start(READER_THREADS);                                  //10

每个WorkerPool有两个 Queue:

Queue<JOB> jobs;
SelectableQueue<JOB> results;

template<class W, class JOB>
int WorkerPool<W, JOB>::push(JOB job){
    return this->jobs.push(job);
}

template<class W, class JOB>
int WorkerPool<W, JOB>::pop(JOB *job){
    return this->results.pop(job);
}

template<class W, class JOB>
void* WorkerPool<W, JOB>::_run_worker(void *arg){
    while(1){
        JOB job;
        tp->jobs.pop(&job);
        worker->proc(&job);
        tp->results.push(job);
    }
}

SelectableQueue的fd, 在前面epoll_初始化的时候, 已经把这个fd加入监听, 每次一个job处理完成, 就会向SelectableQueue的fd上写一个字节, 这样主线程就能知道这个Job处理完了

worker->porc(&job) 的逻辑:

int Server::ProcWorker::proc(ProcJob *job){
    const Request *req = job->link->last_recv();
    Response resp;

    double stime = millitime();
    proc_t p = job->cmd->proc;
    job->result = (*p)(job->serv, job->link, *req, &resp);
    double etime = millitime();
    job->time_wait = 1000 * (stime - job->stime);
    job->time_proc = 1000 *(etime - stime);

    if(job->link->send(resp) == -1){
        job->result = PROC_ERROR;
    }else{
        log_debug("w:%.3f,p:%.3f, req: %s, resp: %s",
            job->time_wait, job->time_proc,
            serialize_req(*req).c_str(),
            serialize_req(resp).c_str());
    }
    return 0;
}

2.2.7   小结

这种请求处理模型没见过, 觉得很巧妙.

HERE

2.3   功能

redis ssdb
string string
hash hash
list queue(不等于list)
set
zset zset

hash的hset, hget 可以O(n)实现, list 的rpush/lpop也可以O(n) 实现, 但是list的 LINSERT, LINDEX是O(n), 所以ssdb没有实现, 只是实现了可以保持O(n)操作的queue

如何在kv上实现hash/zset/queue

kv:

DEF_PROC(get);
DEF_PROC(set);

hash:

DEF_PROC(hget);
DEF_PROC(hset);

zset:

DEF_PROC(zrank);
DEF_PROC(zrrank);
DEF_PROC(zrange);

queue:

DEF_PROC(qsize);
DEF_PROC(qfront);
DEF_PROC(qback);
DEF_PROC(qpush);

2.3.1   kv

get:

static int proc_get(Server *serv, Link *link, const Request &req, Response *resp){
    if(req.size() < 2){
        resp->push_back("client_error");
    }else{
        std::string val;
        int ret = serv->ssdb->get(req[1], &val);
        if(ret == 1){
            resp->push_back("ok");      //找到  这里是ok这个字符串, 后面会转为redis协议.
            resp->push_back(val);
        }else if(ret == 0){
            resp->push_back("not_found");
        }else{
            log_error("fail");
            resp->push_back("fail");
        }
    }
    return 0;
}

typedef std::vector<std::string> Response;

2.3.2   hash

实现了:

DEF_PROC(hsize);        // O(1)
DEF_PROC(hget);         // O(1)
DEF_PROC(hset);         // O(1)
DEF_PROC(hdel);         // O(1)
DEF_PROC(hincr);        // O(1)
DEF_PROC(hdecr);        // O(1)
DEF_PROC(hexists);      //O(1)

DEF_PROC(hclear);       // O(n)
DEF_PROC(hscan);        // O(n)
DEF_PROC(hrscan);       // O(n)
DEF_PROC(hkeys);        // O(n)
DEF_PROC(hvals);        // O(n)

hkey: {a: b} 存储结构:

_hkey : 1           #size = 1
_hkey_a : b         # hkey.a = b

hset_one时, 组一个新的key:

// returns the number of newly added items
static int hset_one(const SSDB *ssdb, const Bytes &name, const Bytes &key, const Bytes &val, char log_type){
    ...
    int ret = 0;
    std::string dbval;
    if(ssdb->hget(name, key, &dbval) == 0){ // not found
        std::string hkey = encode_hash_key(name, key); ///////////////////////////////组新key.
        ssdb->binlogs->Put(hkey, val.Slice());
        ssdb->binlogs->add(log_type, BinlogCommand::HSET, hkey);
        ret = 1;
    }
}

inline static
std::string encode_hash_key(const Bytes &name, const Bytes &key){
    std::string buf;
    buf.append(1, DataType::HASH);
    buf.append(1, (uint8_t)name.size());
    buf.append(name.data(), name.size());
    buf.append(1, '=');
    buf.append(key.data(), key.size());
    return buf;
}

专门存size的key, 参考:

static int incr_hsize(SSDB *ssdb, const Bytes &name, int64_t incr){
2.3.2.1   hscan/hkeys的实现

需要客户端传来, 从那个key scan到哪个key:

static int proc_hscan(Server *serv, Link *link, const Request &req, Response *resp){
    uint64_t limit = req[4].Uint64();
    HIterator *it = serv->ssdb->hscan(req[1], req[2], req[3], limit);
    resp->push_back("ok");
    while(it->next()){
        resp->push_back(it->key);
        resp->push_back(it->val);
    }
}

用法应该是:

hscan hkey a '' 100

只有ssdb协议支持hscan, 不支持redis的hscan.

2.3.3   queue

实现了这些命令:

{STRATEGY_AUTO,         "lpush",                "qpush_front",          REPLY_STATUS},
{STRATEGY_AUTO,         "rpush",                "qpush_back",           REPLY_STATUS},
{STRATEGY_AUTO,         "lpop",                 "qpop_front",           REPLY_BULK},
{STRATEGY_AUTO,         "rpop",                 "qpop_back",            REPLY_BULK},
{STRATEGY_AUTO,         "llen",                 "qsize",                        REPLY_INT},

只能在端上操作, 不能在list中间插入/删除等.

static uint64_t QFRONT_SEQ = 2;
static uint64_t QBACK_SEQ  = 3;
static uint64_t QITEM_MIN_SEQ = 10000;
static uint64_t QITEM_MAX_SEQ = 9223372036854775807ULL;
static uint64_t QITEM_SEQ_INIT = QITEM_MAX_SEQ/2;           //4611686018427387903

rpush qkey msg 后, 存储结构如下:

qkey_2: 4611686018427387903             //front下标
qkey_3:  4611686018427387904            //end下标
qkey_4611686018427387904                //msg

这里 qkey_4611686018427387904 后面这个数字在leveldb key里面是直接存binary格式, 8个字节.

2.3.4   zset

每个zset中的元素对应2个key: zset_key, zscore_key:

k2 = encode_zscore_key(name, key, new_score);
k0 = encode_zset_key(name, key);

如下操作:

127.0.0.1:8888> Zadd zkey 3 a
(integer) 1
127.0.0.1:8888> ZSCORE zkey a
"3"

存储结构:

zkey: 1                 #size
zkey_a_3: ''            #zscore_key
zkey_a: 3               #zset_key

每次zset, 先用zset_key 取的score, 构造zscore_key, 删除老记录.

再写新的zset_key和zscore_key.

2.3.4.1   zrank

利用zscore_key, 遍历:

int64_t SSDB::zrank(const Bytes &name, const Bytes &key) const{
    ZIterator *it = ziterator(this, name, "", "", "", INT_MAX, Iterator::FORWARD);
    uint64_t ret = 0;
    while(true){
        if(it->next() == false){
            ret = -1;
            break;
        }
        if(key == it->key){
            break;
        }
        ret ++;
    }
    delete it;
    return ret;
}

zrange类似

2.3.5   名字空间划分

LevelDB里面, 每种key都有一个前缀:

class DataType{
public:
    static const char SYNCLOG       = 1;
    static const char KV            = 'k';
    static const char HASH          = 'h'; // hashmap(sorted by key)
    static const char HSIZE         = 'H';
    static const char ZSET          = 's'; // key => score
    static const char ZSCORE        = 'z'; // key|score => ""
    static const char ZSIZE         = 'Z';
    static const char QUEUE         = 'q';
    static const char QSIZE         = 'Q';
    static const char MIN_PREFIX = HASH;
    static const char MAX_PREFIX = ZSET;
};

2.4   主从相关

2.4.1   binlog

class Binlog class BinlogQueue class Transaction

class Transaction{
private:
    BinlogQueue *logs;
public:
    Transaction(BinlogQueue *logs){
        this->logs = logs;
        logs->mutex.lock();
        logs->begin();
    }

    ~Transaction(){
        // it is safe to call rollback after commit
        logs->rollback();
        logs->mutex.unlock();
    }
};

2.4.2   BinlogQueue

整个SSDB只有一个BinlogQueue, 而且和数据存放在同一个leveldb里面:

ssdb->binlogs = new BinlogQueue(ssdb->db);

启动SSDB时, 申请一个BinlogQueue对象, seek 到最后一条binlog, (最后一条是用 encode_seq_key(UINT64_MAX) ) 然后启动一个线程, 来删Binlog:

int err = pthread_create(&tid, NULL, &BinlogQueue::log_clean_thread_func, this);

具体写操作的时候:

int SSDB::set(const Bytes &key, const Bytes &val, char log_type){
    Transaction trans(binlogs);                                     //这里开始加锁

    std::string buf = encode_kv_key(key);
    binlogs->Put(buf, val.Slice());                                 //这里是真正的写操作.
    binlogs->add(log_type, BinlogCommand::KSET, buf);               //这里是记录一条日志, 说我对这个key, 做了一次set操作 (没记录value, 难道同步的时候再去取value?)
    leveldb::Status s = binlogs->commit();                          //两个操作一起写
}

其实这里 ssdb->binlogs 相当于存储层, 所有 set/del leveldb 读写操作都是通过 ssdb->binlog 进行的, 但是Get操作却不是通过ssdb->binlog() 操作的:

int64_t SSDB::qsize(const Bytes &name){
    std::string key = encode_qsize_key(name);
    std::string val;

    leveldb::Status s;
    s = db->Get(leveldb::ReadOptions(), key, &val);
}

这就不太统一, 不方便换下面的存储引擎.

2.4.3   Binlog

有多种类型:

class BinlogCommand{
public:
    static const char NONE  = 0;
    static const char KSET  = 1;
    static const char KDEL  = 2;
    static const char HSET  = 3;
    static const char HDEL  = 4;
    static const char ZSET  = 5;
    static const char ZDEL  = 6;

    static const char BEGIN  = 7;
    static const char END    = 8;
};

都是只记录key:

ssdb->binlogs->add(log_type, BinlogCommand::HSET, hkey);

2.4.4   主从同步

由Master 主动向从数据, 一个Server 有一个BackendSync实例:

Server::Server(SSDB *ssdb){
    this->ssdb = ssdb;
    backend_sync = new BackendSync(ssdb);
    ...
}

2.4.5   slave

void Slave::start(){
    load_status();
    log_debug("last_seq: %" PRIu64 ", last_key: %s",
        last_seq, hexmem(last_key.data(), last_key.size()).c_str());

    thread_quit = false;
    int err = pthread_create(&run_thread_tid, NULL, &Slave::_run_thread, this);
    if(err != 0){
        log_error("can't create thread: %s", strerror(err));
    }
}

启动时load last_seq , 然后连上master, 发一个 sync140 告诉服务器从哪里开始发binlog:

sprintf(seq_buf, "%" PRIu64 "", this->last_seq);
const char *type = is_mirror? "mirror" : "sync";
link->send("sync140", seq_buf, this->last_key, type);

当Slave通过sync命令连上来, Master就会从这个socket把最新的更新发给Slave:

static int proc_sync140(Server *serv, Link *link, const Request &req, Response *resp){
    serv->backend_sync->proc(link);
    return PROC_BACKEND;
}

int BackendSync::Client::sync(BinlogQueue *logs){
    Binlog log;
    ret = logs->find_next(expect_seq, &log);

    switch(log.cmd()){
        case BinlogCommand::KSET:
        case BinlogCommand::HSET:
        case BinlogCommand::ZSET:
            ret = backend->ssdb->raw_get(log.key(), &val);
            if(ret == -1){
                log_error("fd: %d, raw_get error!", link->fd());
            }else if(ret == 0){
                //log_debug("%s", hexmem(log.key().data(), log.key().size()).c_str());
                log_trace("fd: %d, skip not found: %s", link->fd(), log.dumps().c_str());
            }else{
                log_trace("fd: %d, %s", link->fd(), log.dumps().c_str());
                link->send(log.repr(), val);
            }

因为binlog只记录了key, 所以这里会再查一次, 把value查出来一起发过去(需要一次读)

问题: 基准数据怎么过去呢? => 可以拷贝.

2.4.6   dump

backend_dump.cpp
backend_dump.h

有个dump命令:

PROC(dump, "b"),

相当于redis 的keys,

收到这个命令, 服务器新开一个线程, 把所有数据通过一个socket发过来, 问题:

  1. 非redis 协议
  2. 容易断
  3. 有scan, 应该就不需要这个了 (不过这个简单, 简单一个命令就可以做backup了)
ning@ning-laptop ~/idning-github/ndb$ printf '*1\r\n$4\r\ndump\r\n' | socat - TCP:localhost:8888,shut-close | xxd
0000000: 350a 6265 6769 6e0a 0a33 0a73 6574 0a39  5.begin..3.set.9
0000010: 0a01 0000 0000 001e 953a 0a32 370a 3a95  .........:.27.:.
0000020: 1e00 0000 0000 0101 6b6b 6579 3a30 3030  ........kkey:000
0000030: 3030 3234 3833 3631 320a 0a33 0a73 6574  002483612..3.set
0000040: 0a39 0a01 0000 0000 001e 953b 0a32 370a  .9.........;.27.
0000050: 3b95 1e00 0000 0000 0101 6b6b 6579 3a30  ;.........kkey:0
0000060: 3030 3030 3738 3833 3133 310a 0a33 0a73  00007883131..3.s
0000070: 6574 0a39 0a01 0000 0000 001e 953c 0a32  et.9.........<.2
0000080: 370a 3c95 1e00 0000 0000 0101 6b6b 6579  7.<.........kkey
0000090: 3a30 3030 3030 3236 3435 3335 320a 0a33  :000002645352..3
00000a0: 0a73 6574 0a39 0a01 0000 0000 001e 953d  .set.9.........=
00000b0: 0a32 370a 3d95 1e00 0000 0000 0101 6b6b  .27.=.........kk
00000c0: 6579 3a30 3030 3030 3935 3033 3539 380a  ey:000009503598.
00000d0: 0a33 0a73 6574 0a39 0a01 0000 0000 001e  .3.set.9........
00000e0: 953e 0a32 370a 3e95 1e00 0000 0000 0101  .>.27.>.........
00000f0: 6b6b 6579 3a30 3030 3030 3938 3038 3139  kkey:00000980819
0000100: 360a 0a33 0a73 6574 0a39 0a01 0000 0000  6..3.set.9......
0000110: 001e 953f 0a32 370a 3f95 1e00 0000 0000  ...?.27.?.......
0000120: 0101 6b6b 6579 3a30 3030 3030 3238 3232  ..kkey:000002822
0000130: 3337 360a 0a33 0a73 6574 0a39 0a01 0000  376..3.set.9....
0000140: 0000 001e 9540 0a32 370a 4095 1e00 0000  .....@.27.@.....
0000150: 0000 0101 6b6b 6579 3a30 3030 3030 3134  ....kkey:0000014
0000160: 3936 3935 320a 0a33 0a73 6574 0a39 0a01  96952..3.set.9..
0000170: 0000 0000 001e 9541 0a32 370a 4195 1e00  .......A.27.A...
0000180: 0000 0000 0101 6b6b 6579 3a30 3030 3030  ......kkey:00000
0000190: 3432 3033 3036 370a 0a33 0a73 6574 0a39  4203067..3.set.9
00001a0: 0a01 0000 0000 001e 9542 0a32 370a 4295  .........B.27.B.
00001b0: 1e00 0000 0000 0101 6b6b 6579 3a30 3030  ........kkey:000
00001c0: 3030 3139 3133 3036 320a 0a33 0a73 6574  001913062..3.set
00001d0: 0a39 0a01 0000 0000 001e 9543 0a32 370a  .9.........C.27.
00001e0: 4395 1e00 0000 0000 0101 6b6b 6579 3a30  C.........kkey:0
00001f0: 3030 3030 3637 3432 3136 310a 0a33 0a65  00006742161..3.e
0000200: 6e64 0a32 0a31 300a 0a                   nd.2.10..

2.5   其它

2.5.1   incr 如何保证原子性

通过Transaction上的锁实现.

2.5.2   如何实现expire

ssdb为每个带有过期设置的key, 保存了2个结构:

  1. 内存里面的一个sorted_set (全量)
  2. leveldb里面以 EXPIRATION_LIST_KEY 开头的一系列key.

ssdb 里面保留这个key:

#define EXPIRATION_LIST_KEY "\xff\xff\xff\xff\xff|EXPIRE_LIST|KV"

用这个key作为一个大zset(基于ssdb在leveldb上提供的zset), 当需要设置一个key的ttl时, 就向这个zset里面设置某个key的超时时间:

int ExpirationHandler::set_ttl(const Bytes &key, int ttl){
    int64_t expired = time_ms() + ttl * 1000;
    char data[30];
    int size = snprintf(data, sizeof(data), "%" PRId64, expired);
    if(size <= 0){
        log_error("snprintf return error!");
        return -1;
    }

    Locking l(&mutex);
    int ret = ssdb->zset(this->list_name, key, Bytes(data, size));

}

同时还会放到一个内存的sorted_set(expiration_keys)里面去:

expiration_keys.add(key.String(), expired);

ssdb启动后, 会有一个线程把所有 EXPIRATION_LIST_KEY 这个 zset 里面的所有key扫描出来, 放到内存的 expiration_keys 里.

回收时, 由一个线程从 expiration_keys 里面取出每个key, 把key和它对应的expire记录删掉:

if(handler->expiration_keys.front(&key, &score)){
    int64_t now = time_ms();
    if(score <= now){
        log_debug("expired %s", key->c_str());
        ssdb->del(*key);
        ssdb->zdel(handler->list_name, *key);
        handler->expiration_keys.pop_front();
        continue;
    }
}

问题是: 所有的key都要装到内存, 内存会比较大, 而读的时候又没有利用上这个内存.

4   问题

4.1   读性能的问题

benchmark结果: 100G数据 时, 读性能稳定在大约5000qps

4.2   如何compact

貌似是一个命令. 需要人工调用.

4.3   所有expire的key记录在内存

如果有10亿条, 每条100字节, 就需要100G+内存.

4.4   兼容问题

4.4.1   del 兼容

因为leveldb 的接口, 删除实际上是一个写操作(写为空串). 所以删除接口不能返回这个key在是真的删了, 还是本来就不存在.

所以在删除一个不存在的key时 for redis:

ning@ning-laptop ~/idning-github/redis/src$ redis-cli -p 2000 del xxx
(integer) 0

for ssdb:

ning@ning-laptop ~/idning-github/redis/src$ redis-cli -p 8888 del xxx
(integer) 1

4.4.2   ttl一个不存在的key

ssdb:

127.0.0.1:8888> ttl k
(error) ERR

redis:

127.0.0.1:5527> ttl k
(integer) -2

4.4.3   没有expire命令

改为通过ttl命令设置expire.

4.4.5   scan类

4.4.5.1   scan

一次在一个连接上吐回所有key, 而且要全放到内存resp里面再一次吐出:

static int proc_scan(Server *serv, Link *link, const Request &req, Response *resp){
    if(req.size() < 4){
        resp->push_back("client_error");
    }else{
        uint64_t limit = req[3].Uint64();
        KIterator *it = serv->ssdb->scan(req[1], req[2], limit);
        resp->push_back("ok");
        while(it->next()){
            resp->push_back(it->key);
            resp->push_back(it->val);
        }
        delete it;
    }
    return 0;
}
4.4.5.2   没有redis 的hscan

只有ssdb协议支持hscan, 用法是给出范围.

支持redis的hgetall, hkeys, hvals

5   小结

  • ssdb 设计上, expire用区别于主key的另一个key存储
  • oplog只记录key

Comments