分1024个slot
zk保存拓扑.
go实现proxy, 无状态.
预分配: 1024个slot (单实例5G, 大约可以支撑5-10T规模)
平滑扩容.
notes:
必须保证不能同时有多个 slots 处于迁移状态 => 决定了很慢, 不能扩展到很大的集群(不过够用了)
redis-benchmark -p 10000 -r 10000 -t set -n 10000000
单核4w 多核 + pipeline(-P 10): 4-5w
多核6w 多核 + pipeline(-P 10): 20w cpu占用700%, 每个核剩下10%左右.
很不错!
redis中每个key会多存一份(slots hash表), 如果key比较大, 很浪费redis内存( 大约1.5倍 ).
主从切换不是自动, 需要手动操作(建议自动操作, 操作后人工后验检查)
不能并发迁移,扩容会很慢, 影响集群规模.
直接把redis代码包进来了, 对redis的后续升级, bugfix等难以merge.
使用crc32做sharding, 不是很均匀, 不过分了slot, 所以没关系了.
需要一个部署工具.
DATE=`date +'%Y%m%d%H%M'`
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
# clean & start zk
cd ~/xredis/zookeeper-3.4.6 && ./bin/zkServer.sh stop
rm /tmp/zookeeper -rf
pkill -f codis-config
cd ~/xredis/zookeeper-3.4.6 && ./bin/zkServer.sh start
cd $DIR
nohup ./bin/codis-config -c sample/config.ini dashboard &
sleep 3
echo 'dash running'
./bin/codis-config -c sample/config.ini server add 0 localhost:2000 master
./bin/codis-config -c sample/config.ini server add 0 localhost:3000 slave
./bin/codis-config -c sample/config.ini server add 1 localhost:2001 master
./bin/codis-config -c sample/config.ini server add 1 localhost:3001 slave
./bin/codis-config -c sample/config.ini server add 2 localhost:2002 master
./bin/codis-config -c sample/config.ini server add 2 localhost:3002 slave
./bin/codis-config -c sample/config.ini server add 3 localhost:2003 master
./bin/codis-config -c sample/config.ini server add 3 localhost:3003 slave
./bin/codis-config -c sample/config.ini slot init
./bin/codis-config -c sample/config.ini slot range-set 0 511 0 online
./bin/codis-config -c sample/config.ini slot range-set 512 1023 1 online
./bin/codis-proxy -c sample/config.ini -L ./log/proxy.log --cpu=8 --addr=0.0.0.0:19000 --http-addr=0.0.0.0:11000
zk 设计:
/codis/db_{xx}
{xx} means 产品名, 如: /codis/db_sync , /codis/db_applist
/codis/db_{xx}/servers/group_{N}/{ server addr (e.g. 127.0.0.1:6379) }
存储真实的 redis 组 (主master、从slave), N为一个自定义的整数编号, in JSON, 内容包括服务器地址,
角色(master or slave)等信息
/codis/db_{xx}/slots/slot_{N}
举例, zk中存储的数据如下:
$ zk-shell 127.1:2181 Welcome to zk-shell (1.0.05) (CONNECTING) /> (CONNECTED) /> tree ├── zk │ ├── codis │ │ ├── db_test │ │ │ ├── migrate_manager │ │ │ ├── fence │ │ │ ├── servers │ │ │ │ ├── group_0 │ │ │ │ │ ├── localhost:2000 │ │ │ │ │ ├── localhost:3000 │ │ │ │ ├── group_1 │ │ │ │ │ ├── localhost:2001 │ │ │ │ │ ├── localhost:3001 │ │ │ ├── slots │ │ │ │ ├── slot_0 │ │ │ │ ├── slot_1 │ │ │ │ ├── slot_2 │ │ │ │ ├── slot_3 │ │ │ │ ├── ... │ │ │ │ ├── ... │ │ │ │ ├── slot_1023 │ │ │ ├── proxy | │ │ │ ├── proxy_1 │ │ │ ├── migrate_tasks │ │ │ ├── LOCK │ │ │ ├── actions │ │ │ │ ├── 0000000004 │ │ │ │ ├── 0000000010 │ │ │ │ ├── 0000000006 │ │ │ │ ├── 0000000008 │ │ │ │ ├── 0000000000 │ │ │ │ ├── 0000000002 │ │ │ ├── ActionResponse │ │ │ │ ├── 0000000004 │ │ │ │ ├── 0000000010 │ │ │ │ ├── 0000000006 │ │ │ │ ├── 0000000008 │ │ │ │ ├── 0000000000 │ │ │ │ ├── 0000000002
其中几种节点数据:
(CONNECTED) /> get zk/codis/db_test/servers/group_0/localhost:2000
{"type":"master","group_id":0,"addr":"localhost:2000"}
(CONNECTED) /> get zk/codis/db_test/slots/slot_0
{"product_name":"test","id":0,"group_id":1,"state":{"status":"online","migrate_status":{"from":-1,"to":-1},"last_op_ts":"0"}}
(CONNECTED) /> get zk/codis/db_test/proxy/proxy_1
{"id":"proxy_1","addr":"127.1:19000","last_event":"","last_event_ts":0,"state":"offline","description":"","debug_var_addr":"127.1:11000","pid":12438,"start_at":"2015-04-28 15:20:23.739459751 +0800 CST"}
ext/redis-2.8.13/
dictadd/dictDel/dictResize的时候都要在每个slot里面操作
增加一系列命令(slotsxxx) slots.c
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
...
dict *hash_slots[HASH_SLOTS_SIZE];
} redisDb;
initServer() {
for (i = 0; i < HASH_SLOTS_SIZE; i ++) {
server.db[j].hash_slots[i] = dictCreate(&hashSlotType, NULL);
}
void dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr);
int retval = dictAdd(db->dict, copy, val);
do {
uint32_t crc;
int slot = slots_num(key->ptr, &crc);
dictAdd(db->hash_slots[slot], sdsdup(key->ptr), (void *)(long)crc);
} while (0);
...
}
增加了一些命令:
{"slotsinfo",slotsinfoCommand,-1,"rF",0,NULL,0,0,0,0,0},
{"slotsdel",slotsdelCommand,-2,"w",0,NULL,1,-1,1,0,0},
{"slotsmgrtslot",slotsmgrtslotCommand,5,"aw",0,NULL,0,0,0,0,0},
{"slotsmgrtone",slotsmgrtoneCommand,5,"aw",0,NULL,0,0,0,0,0},
{"slotsmgrttagslot",slotsmgrttagslotCommand,5,"aw",0,NULL,0,0,0,0,0},
{"slotsmgrttagone",slotsmgrttagoneCommand,5,"aw",0,NULL,0,0,0,0,0},
{"slotshashkey",slotshashkeyCommand,-1,"rF",0,NULL,0,0,0,0,0},
{"slotscheck",slotscheckCommand,0,"r",0,NULL,0,0,0,0,0},
{"slotsrestore",slotsrestoreCommand,-4,"awm",0,NULL,1,1,1,0,0},
static int
slotsmgrt(redisClient *c, sds host, sds port, int fd, int dbid, int timeout, robj *keys[], robj *vals[], int n) {
...
syncWrite(fd, buf + pos, towrite, timeout);
syncReadLine(fd, buf1, sizeof(buf1), timeout)
}
阻塞迁移造成的短时拒绝服务(详见后面测试).
代码量大约6000:
------------------------------------------------------------------------------- Language files blank comment code ------------------------------------------------------------------------------- Go 42 975 171 5113 Javascript 7 84 99 596 HTML 4 48 24 588 JSON 3 0 0 52 CSS 3 2 4 11 Bourne Shell 2 3 0 6 ------------------------------------------------------------------------------- SUM: 61 1112 298 6366 -------------------------------------------------------------------------------
代码结构:
▾ pkg/
▾ models/ # 对应zk中的几个结构.
server_group.go
slot.go
action.go
proxy.go
▾ proxy/
▾ parser/
parser.go # 解析请求.
▾ redispool/
conn.go # 连接
redispool.go # 连接池
▾ cachepool/
cachepool.go # 由后端名字到连接池的map. [127.0.0.1:2000] => redispool {conn1, conn2, conn3}
▾ group/
group.go # 简单包装.
▾ router/
▸ topology/ # InitZkConn, watch
helper.go # redis命令黑名单, isMulOp, PING, SELECT 几个命令的处理.
mapper.go # mapKey2Slot (crc32 % 1024)
session.go # 记录当前实例的ops, starttime.
router.go # 主要proxy逻辑.
multioperator.go # mget/mset/del 的实现.
看一个获得所有ServerGroups的代码:
func ServerGroups(zkConn zkhelper.Conn, productName string) ([]ServerGroup, error) {
var ret []ServerGroup
root := fmt.Sprintf("/zk/codis/db_%s/servers", productName)
groups, _, err := zkConn.Children(root)
for _, group := range groups {
groupId, err := strconv.Atoi(strings.Split(group, "_")[1])
g, err := GetGroup(zkConn, productName, groupId)
ret = append(ret, *g)
}
return ret, nil
}
加载拓扑信息(一个slot指向那个group):
//use it in lock
func (s *Server) fillSlot(i int, force bool) {
slotInfo, groupInfo, err := s.top.GetSlotByIndex(i) # 从zk中获取slot信息
slot := &Slot{
slotInfo: slotInfo,
dst: group.NewGroup(*groupInfo),
groupInfo: groupInfo,
}
s.pools.AddPool(slot.dst.Master()) # 准备连接池
if slot.slotInfo.State.Status == models.SLOT_STATUS_MIGRATE { # 处理MIGRATE状态.
//get migrate src group and fill it
from, err := s.top.GetGroup(slot.slotInfo.State.MigrateStatus.From)
slot.migrateFrom = group.NewGroup(*from)
s.pools.AddPool(slot.migrateFrom.Master())
}
s.slots[i] = slot
}
#如果状态是migrate 的slot, 发migrate命令
func (s *Server) handleMigrateState(slotIndex int, key []byte) error {
...
err = WriteMigrateKeyCmd(redisConn.(*redispool.PooledConn), shd.dst.Master(), 30*1000, key)
...
}
转发逻辑, 读一个请求, 向后端转发一个:
func (s *Server) redisTunnel(c *session) error {
resp, err := parser.Parse(c.r) // read client request
op, k, err := getOpKey(resp)
i := mapKey2Slot(k)
check_state: # 这里是一个循环来检查, 等待SLOT_STATUS_PRE_MIGRATE结束
s.mu.RLock()
if s.slots[i] == nil {
s.mu.Unlock()
return errors.Errorf("should never happend, slot %d is empty", i)
}
//wait for state change, should be soon
if s.slots[i].slotInfo.State.Status == models.SLOT_STATUS_PRE_MIGRATE {
s.mu.RUnlock()
time.Sleep(10 * time.Millisecond)
goto check_state
}
s.handleMigrateState(i, k);
//get redis connection
redisConn, err := s.pools.GetConn(s.slots[i].dst.Master())
redisErr, clientErr := forward(c, redisConn.(*redispool.PooledConn), resp)
}
func (s *Server) handleConn(c net.Conn) {
for {
err = s.redisTunnel(client)
client.Ops++
}
}
func (s *Server) Run() {
log.Info("listening on", s.addr)
listener, err := net.Listen("tcp", s.addr)
for {
conn, err := listener.Accept()
go s.handleConn(conn) #起一个
}
}
proxy 会watch action 树下的变更, 有变化时 重新加载路由:
func (s *Server) OnGroupChange(groupId int) {
log.Warning("group changed", groupId)
for i, slot := range s.slots {
if slot.slotInfo.GroupId == groupId {
s.fillSlot(i, true)
}
}
}
上面代码是@ngaut同学pipeline优化前的代码,
一般来说, 实现pipeline可能存在下面两个问题, 不过测试发现: codis都没有问题 .
返回乱序:
get k1 k2 k3 k4 return v2 v1 v3 vj
原因是k1,k2发到不同后端, 如果其中一个后端很慢, 而先返回的后端就先写客户端, 就是这个错误.
测试发现codis没有这个问题, 但是看代码没有看懂为什么. 涉及到多个channel中传递消息(TODO).
乱序执行:
lpush lst 1 lpush lst 2 lpush lst 3 lpush lst 4 lpop return 2 1 3 4
第二种情况是发到同一个后端, 但是如果向同一个后端有多个连接, 就可能出这个问题. 某个连接上的请求先执行.
codis一个后端只有一个TaskRunner(一个连接), 所以应该不会出这个问题.
逐一访问:
func (oper *MultiOperator) mgetResults(mop *MulOp) ([]byte, error) {
results := make([]interface{}, len(mop.keys))
conn := oper.pool.Get()
defer conn.Close()
for i, key := range mop.keys {
replys, err := redis.Values(conn.Do("mget", key))
for _, reply := range replys {
if reply != nil {
results[i] = reply
} else {
results[i] = nil
}
}
}
b, err := respcoding.Marshal(results)
return b, errors.Trace(err)
}
这是为了保证迁移过程中的一致性, 必须一个一个处理.
性能较差.
对于简单value(value大小1字节) slots内存占用:
keys 0 1000 10000 100000 1000000 --------------------------------------------------------------- codis-server 2519112 2678920 4073224 17449032 176095304 redis-server 908688 1019856 2078736 12356240 120496272
为了保证迁移的一致性, codis选择牺牲可用性, 迁移单个key是通过阻塞当前实例来实现的.
一个100w 字段的hset(内存中大约占70M), 迁移耗时:
本机: 1.85 s 同机房不同机器: 2.06s
1-2s的不响应对大多数业务来说, 还是可以接受的, 所以这个问题不是很严重.
但是一定要注意:
测试代码: https://gist.github.com/idning/03f43b6789f14e1fe878
在proxy代码中, 给迁移一个key设置的超时是30s:
func (s *Server) handleMigrateState(slotIndex int, key []byte) error {
...
err = WriteMigrateKeyCmd(redisConn.(*redispool.PooledConn), shd.dst.Master(), 30*1000, key)
...
}
codis proxy对每个请求都是解析 => 找个连接发到后端 => 等待响应 => 发给客户端
这样就相当于不能使用pipeline, 而pipeline对要求高性能的case是非常重要的,
开pipeline的情况下, 单redis, 单线程client做简单set可以达到 100w qps.
spinlock同学的测试:
https://github.com/wandoulabs/codis/issues/63
CONC PIPELINE CODIS-LATENCY REDIS-LATENCY 50 10 3.17 0.60 50 20 5.88 0.89 50 75 21.78 2.40p
@ngaut 同学在15年2月实现了pipeline的支持: https://github.com/wandoulabs/codis/pull/110
pipeline 遇到slot迁移状态性能又是一个严重的问题.
命令行支持:
只支持 ./bin/codis-config -c sample/config.ini dashboard 不支持 ./bin/codis-config dashboard -c sample/config.ini, 这个命令报一个非常奇怪的错误.
努棒性:
配置写错, 比如:
dashboard_addr=:8087
zk节点也能建成功, 而且不删, 需要等一段时间.
这是整个系统一个极大的亮点, 用于数据迁移, 比我们的redis-replay-aof好的是, 不需要到目标机器读aof文件.
实现原理: 作为一个假的 slave,挂在一个redis后面,然后将master的数据同步回来,sync 到 codis 集群上,
设计文章: http://0xffff.me/blog/2014/11/11/codis-de-she-ji-yu-shi-xian-part-2/
codis design pdf
非常赞的人&非常赞的项目
redis-port很赞.
每种方案都是适用的地方.