分1024个slot
zk保存拓扑.
go实现proxy, 无状态.
预分配: 1024个slot (单实例5G, 大约可以支撑5-10T规模)
平滑扩容.
notes:
必须保证不能同时有多个 slots 处于迁移状态 => 决定了很慢, 不能扩展到很大的集群(不过够用了)
redis-benchmark -p 10000 -r 10000 -t set -n 10000000
单核4w 多核 + pipeline(-P 10): 4-5w
多核6w 多核 + pipeline(-P 10): 20w cpu占用700%, 每个核剩下10%左右.
很不错!
redis中每个key会多存一份(slots hash表), 如果key比较大, 很浪费redis内存( 大约1.5倍 ).
主从切换不是自动, 需要手动操作(建议自动操作, 操作后人工后验检查)
不能并发迁移,扩容会很慢, 影响集群规模.
直接把redis代码包进来了, 对redis的后续升级, bugfix等难以merge.
使用crc32做sharding, 不是很均匀, 不过分了slot, 所以没关系了.
需要一个部署工具.
DATE=`date +'%Y%m%d%H%M'` DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" # clean & start zk cd ~/xredis/zookeeper-3.4.6 && ./bin/zkServer.sh stop rm /tmp/zookeeper -rf pkill -f codis-config cd ~/xredis/zookeeper-3.4.6 && ./bin/zkServer.sh start cd $DIR nohup ./bin/codis-config -c sample/config.ini dashboard & sleep 3 echo 'dash running' ./bin/codis-config -c sample/config.ini server add 0 localhost:2000 master ./bin/codis-config -c sample/config.ini server add 0 localhost:3000 slave ./bin/codis-config -c sample/config.ini server add 1 localhost:2001 master ./bin/codis-config -c sample/config.ini server add 1 localhost:3001 slave ./bin/codis-config -c sample/config.ini server add 2 localhost:2002 master ./bin/codis-config -c sample/config.ini server add 2 localhost:3002 slave ./bin/codis-config -c sample/config.ini server add 3 localhost:2003 master ./bin/codis-config -c sample/config.ini server add 3 localhost:3003 slave ./bin/codis-config -c sample/config.ini slot init ./bin/codis-config -c sample/config.ini slot range-set 0 511 0 online ./bin/codis-config -c sample/config.ini slot range-set 512 1023 1 online ./bin/codis-proxy -c sample/config.ini -L ./log/proxy.log --cpu=8 --addr=0.0.0.0:19000 --http-addr=0.0.0.0:11000
zk 设计:
/codis/db_{xx} {xx} means 产品名, 如: /codis/db_sync , /codis/db_applist /codis/db_{xx}/servers/group_{N}/{ server addr (e.g. 127.0.0.1:6379) } 存储真实的 redis 组 (主master、从slave), N为一个自定义的整数编号, in JSON, 内容包括服务器地址, 角色(master or slave)等信息 /codis/db_{xx}/slots/slot_{N}
举例, zk中存储的数据如下:
$ zk-shell 127.1:2181 Welcome to zk-shell (1.0.05) (CONNECTING) /> (CONNECTED) /> tree ├── zk │ ├── codis │ │ ├── db_test │ │ │ ├── migrate_manager │ │ │ ├── fence │ │ │ ├── servers │ │ │ │ ├── group_0 │ │ │ │ │ ├── localhost:2000 │ │ │ │ │ ├── localhost:3000 │ │ │ │ ├── group_1 │ │ │ │ │ ├── localhost:2001 │ │ │ │ │ ├── localhost:3001 │ │ │ ├── slots │ │ │ │ ├── slot_0 │ │ │ │ ├── slot_1 │ │ │ │ ├── slot_2 │ │ │ │ ├── slot_3 │ │ │ │ ├── ... │ │ │ │ ├── ... │ │ │ │ ├── slot_1023 │ │ │ ├── proxy | │ │ │ ├── proxy_1 │ │ │ ├── migrate_tasks │ │ │ ├── LOCK │ │ │ ├── actions │ │ │ │ ├── 0000000004 │ │ │ │ ├── 0000000010 │ │ │ │ ├── 0000000006 │ │ │ │ ├── 0000000008 │ │ │ │ ├── 0000000000 │ │ │ │ ├── 0000000002 │ │ │ ├── ActionResponse │ │ │ │ ├── 0000000004 │ │ │ │ ├── 0000000010 │ │ │ │ ├── 0000000006 │ │ │ │ ├── 0000000008 │ │ │ │ ├── 0000000000 │ │ │ │ ├── 0000000002
其中几种节点数据:
(CONNECTED) /> get zk/codis/db_test/servers/group_0/localhost:2000 {"type":"master","group_id":0,"addr":"localhost:2000"} (CONNECTED) /> get zk/codis/db_test/slots/slot_0 {"product_name":"test","id":0,"group_id":1,"state":{"status":"online","migrate_status":{"from":-1,"to":-1},"last_op_ts":"0"}} (CONNECTED) /> get zk/codis/db_test/proxy/proxy_1 {"id":"proxy_1","addr":"127.1:19000","last_event":"","last_event_ts":0,"state":"offline","description":"","debug_var_addr":"127.1:11000","pid":12438,"start_at":"2015-04-28 15:20:23.739459751 +0800 CST"}
ext/redis-2.8.13/
dictadd/dictDel/dictResize的时候都要在每个slot里面操作
增加一系列命令(slotsxxx) slots.c
typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ ... dict *hash_slots[HASH_SLOTS_SIZE]; } redisDb; initServer() { for (i = 0; i < HASH_SLOTS_SIZE; i ++) { server.db[j].hash_slots[i] = dictCreate(&hashSlotType, NULL); }
void dbAdd(redisDb *db, robj *key, robj *val) { sds copy = sdsdup(key->ptr); int retval = dictAdd(db->dict, copy, val); do { uint32_t crc; int slot = slots_num(key->ptr, &crc); dictAdd(db->hash_slots[slot], sdsdup(key->ptr), (void *)(long)crc); } while (0); ... }
增加了一些命令:
{"slotsinfo",slotsinfoCommand,-1,"rF",0,NULL,0,0,0,0,0}, {"slotsdel",slotsdelCommand,-2,"w",0,NULL,1,-1,1,0,0}, {"slotsmgrtslot",slotsmgrtslotCommand,5,"aw",0,NULL,0,0,0,0,0}, {"slotsmgrtone",slotsmgrtoneCommand,5,"aw",0,NULL,0,0,0,0,0}, {"slotsmgrttagslot",slotsmgrttagslotCommand,5,"aw",0,NULL,0,0,0,0,0}, {"slotsmgrttagone",slotsmgrttagoneCommand,5,"aw",0,NULL,0,0,0,0,0}, {"slotshashkey",slotshashkeyCommand,-1,"rF",0,NULL,0,0,0,0,0}, {"slotscheck",slotscheckCommand,0,"r",0,NULL,0,0,0,0,0}, {"slotsrestore",slotsrestoreCommand,-4,"awm",0,NULL,1,1,1,0,0},
static int slotsmgrt(redisClient *c, sds host, sds port, int fd, int dbid, int timeout, robj *keys[], robj *vals[], int n) { ... syncWrite(fd, buf + pos, towrite, timeout); syncReadLine(fd, buf1, sizeof(buf1), timeout) }
阻塞迁移造成的短时拒绝服务(详见后面测试).
代码量大约6000:
------------------------------------------------------------------------------- Language files blank comment code ------------------------------------------------------------------------------- Go 42 975 171 5113 Javascript 7 84 99 596 HTML 4 48 24 588 JSON 3 0 0 52 CSS 3 2 4 11 Bourne Shell 2 3 0 6 ------------------------------------------------------------------------------- SUM: 61 1112 298 6366 -------------------------------------------------------------------------------
代码结构:
▾ pkg/ ▾ models/ # 对应zk中的几个结构. server_group.go slot.go action.go proxy.go ▾ proxy/ ▾ parser/ parser.go # 解析请求. ▾ redispool/ conn.go # 连接 redispool.go # 连接池 ▾ cachepool/ cachepool.go # 由后端名字到连接池的map. [127.0.0.1:2000] => redispool {conn1, conn2, conn3} ▾ group/ group.go # 简单包装. ▾ router/ ▸ topology/ # InitZkConn, watch helper.go # redis命令黑名单, isMulOp, PING, SELECT 几个命令的处理. mapper.go # mapKey2Slot (crc32 % 1024) session.go # 记录当前实例的ops, starttime. router.go # 主要proxy逻辑. multioperator.go # mget/mset/del 的实现.
看一个获得所有ServerGroups的代码:
func ServerGroups(zkConn zkhelper.Conn, productName string) ([]ServerGroup, error) { var ret []ServerGroup root := fmt.Sprintf("/zk/codis/db_%s/servers", productName) groups, _, err := zkConn.Children(root) for _, group := range groups { groupId, err := strconv.Atoi(strings.Split(group, "_")[1]) g, err := GetGroup(zkConn, productName, groupId) ret = append(ret, *g) } return ret, nil }
加载拓扑信息(一个slot指向那个group):
//use it in lock func (s *Server) fillSlot(i int, force bool) { slotInfo, groupInfo, err := s.top.GetSlotByIndex(i) # 从zk中获取slot信息 slot := &Slot{ slotInfo: slotInfo, dst: group.NewGroup(*groupInfo), groupInfo: groupInfo, } s.pools.AddPool(slot.dst.Master()) # 准备连接池 if slot.slotInfo.State.Status == models.SLOT_STATUS_MIGRATE { # 处理MIGRATE状态. //get migrate src group and fill it from, err := s.top.GetGroup(slot.slotInfo.State.MigrateStatus.From) slot.migrateFrom = group.NewGroup(*from) s.pools.AddPool(slot.migrateFrom.Master()) } s.slots[i] = slot } #如果状态是migrate 的slot, 发migrate命令 func (s *Server) handleMigrateState(slotIndex int, key []byte) error { ... err = WriteMigrateKeyCmd(redisConn.(*redispool.PooledConn), shd.dst.Master(), 30*1000, key) ... }
转发逻辑, 读一个请求, 向后端转发一个:
func (s *Server) redisTunnel(c *session) error { resp, err := parser.Parse(c.r) // read client request op, k, err := getOpKey(resp) i := mapKey2Slot(k) check_state: # 这里是一个循环来检查, 等待SLOT_STATUS_PRE_MIGRATE结束 s.mu.RLock() if s.slots[i] == nil { s.mu.Unlock() return errors.Errorf("should never happend, slot %d is empty", i) } //wait for state change, should be soon if s.slots[i].slotInfo.State.Status == models.SLOT_STATUS_PRE_MIGRATE { s.mu.RUnlock() time.Sleep(10 * time.Millisecond) goto check_state } s.handleMigrateState(i, k); //get redis connection redisConn, err := s.pools.GetConn(s.slots[i].dst.Master()) redisErr, clientErr := forward(c, redisConn.(*redispool.PooledConn), resp) } func (s *Server) handleConn(c net.Conn) { for { err = s.redisTunnel(client) client.Ops++ } } func (s *Server) Run() { log.Info("listening on", s.addr) listener, err := net.Listen("tcp", s.addr) for { conn, err := listener.Accept() go s.handleConn(conn) #起一个 } }
proxy 会watch action 树下的变更, 有变化时 重新加载路由:
func (s *Server) OnGroupChange(groupId int) { log.Warning("group changed", groupId) for i, slot := range s.slots { if slot.slotInfo.GroupId == groupId { s.fillSlot(i, true) } } }
上面代码是@ngaut同学pipeline优化前的代码,
一般来说, 实现pipeline可能存在下面两个问题, 不过测试发现: codis都没有问题 .
返回乱序:
get k1 k2 k3 k4 return v2 v1 v3 vj
原因是k1,k2发到不同后端, 如果其中一个后端很慢, 而先返回的后端就先写客户端, 就是这个错误.
测试发现codis没有这个问题, 但是看代码没有看懂为什么. 涉及到多个channel中传递消息(TODO).
乱序执行:
lpush lst 1 lpush lst 2 lpush lst 3 lpush lst 4 lpop return 2 1 3 4
第二种情况是发到同一个后端, 但是如果向同一个后端有多个连接, 就可能出这个问题. 某个连接上的请求先执行.
codis一个后端只有一个TaskRunner(一个连接), 所以应该不会出这个问题.
逐一访问:
func (oper *MultiOperator) mgetResults(mop *MulOp) ([]byte, error) { results := make([]interface{}, len(mop.keys)) conn := oper.pool.Get() defer conn.Close() for i, key := range mop.keys { replys, err := redis.Values(conn.Do("mget", key)) for _, reply := range replys { if reply != nil { results[i] = reply } else { results[i] = nil } } } b, err := respcoding.Marshal(results) return b, errors.Trace(err) }
这是为了保证迁移过程中的一致性, 必须一个一个处理.
性能较差.
对于简单value(value大小1字节) slots内存占用:
keys 0 1000 10000 100000 1000000 --------------------------------------------------------------- codis-server 2519112 2678920 4073224 17449032 176095304 redis-server 908688 1019856 2078736 12356240 120496272
为了保证迁移的一致性, codis选择牺牲可用性, 迁移单个key是通过阻塞当前实例来实现的.
一个100w 字段的hset(内存中大约占70M), 迁移耗时:
本机: 1.85 s 同机房不同机器: 2.06s
1-2s的不响应对大多数业务来说, 还是可以接受的, 所以这个问题不是很严重.
但是一定要注意:
测试代码: https://gist.github.com/idning/03f43b6789f14e1fe878
在proxy代码中, 给迁移一个key设置的超时是30s:
func (s *Server) handleMigrateState(slotIndex int, key []byte) error { ... err = WriteMigrateKeyCmd(redisConn.(*redispool.PooledConn), shd.dst.Master(), 30*1000, key) ... }
codis proxy对每个请求都是解析 => 找个连接发到后端 => 等待响应 => 发给客户端
这样就相当于不能使用pipeline, 而pipeline对要求高性能的case是非常重要的,
开pipeline的情况下, 单redis, 单线程client做简单set可以达到 100w qps.
spinlock同学的测试:
https://github.com/wandoulabs/codis/issues/63
CONC PIPELINE CODIS-LATENCY REDIS-LATENCY 50 10 3.17 0.60 50 20 5.88 0.89 50 75 21.78 2.40p
@ngaut 同学在15年2月实现了pipeline的支持: https://github.com/wandoulabs/codis/pull/110
pipeline 遇到slot迁移状态性能又是一个严重的问题.
命令行支持:
只支持 ./bin/codis-config -c sample/config.ini dashboard 不支持 ./bin/codis-config dashboard -c sample/config.ini, 这个命令报一个非常奇怪的错误.
努棒性:
配置写错, 比如:
dashboard_addr=:8087
zk节点也能建成功, 而且不删, 需要等一段时间.
这是整个系统一个极大的亮点, 用于数据迁移, 比我们的redis-replay-aof好的是, 不需要到目标机器读aof文件.
实现原理: 作为一个假的 slave,挂在一个redis后面,然后将master的数据同步回来,sync 到 codis 集群上,
设计文章: http://0xffff.me/blog/2014/11/11/codis-de-she-ji-yu-shi-xian-part-2/
codis design pdf
非常赞的人&非常赞的项目
redis-port很赞.
每种方案都是适用的地方.