

  redis数据结构set, zset, hash, string类型的会用到dict。zset, hash类型在数据量少的时候会在ziplist, 数据量大时,zset使用dict + skiplist的数据结构,dict来存储value和score的映射关系。set在集合元素全部为int这种特殊情况下会使用intset, 通过二叉查找来查找数据。
  其他场景例如集群模式中用来存储ip:port与clusterNode的映射关系。 二、dict的数据结构
  2.1 dictEntry typedef struct dictEntry {     void *key;     union {         void *val;         uint64_t u64;         int64_t s64;         double d;     } v;     struct dictEntry *next; } dictEntry;
  2.2 dictht typedef struct dictht {     dictEntry **table;     // table size     unsigned long size;       //用于将哈希值映射到table的位置索引 = size - 1, hash(key) % sizemask 来计算落到table的那个bucket上         unsigned long sizemask;       //dict中数据个数,used / size的比值就是装载因子   } dictht;
  2.3 dict typedef struct dict {     dictType *type;     void *privdata;     dictht ht[2];     /* 表示rehash的步骤,rehashidx == -1 当前没有进行rehash */     long rehashidx;      /* number of iterators currently running */     unsigned long iterators;  } dict;
  2.4 dictType typedef struct dictType {     uint64_t (*hashFunction)(const void *key);     void *(*keyDup)(void *privdata, const void *key);     void *(*valDup)(void *privdata, const void *obj);     int (*keyCompare)(void *privdata, const void *key1, const void *key2);     void (*keyDestructor)(void *privdata, void *key);     void (*valDestructor)(void *privdata, void *obj); } dictType;
  redis dict核心数据结构模型三、dict的核心API
  判断是否进行rehashstatic int _dictExpandIfNeeded(dict *d) {     /* 判定是否当前处于rehash的状态. */     if (dictIsRehashing(d)) return DICT_OK;       /* 初始设置,table size = 4 */     if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);     /* If we reached the 1:1 ratio, and we are allowed to resize the hash      * table (global setting) or we should avoid it but the ratio between      * elements/buckets is over the "safe" threshold, we resize doubling      * the number of buckets. */     if (d->ht[0].used >= d->ht[0].size &&         (dict_can_resize ||          //数据量超过table size的5倍          d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))     {         //根据当前dict元素数量的2倍进行扩展        return dictExpand(d, d->ht[0].used*2);     }     return DICT_OK; }
  dict扩展逻辑 int dictExpand(dict *d, unsigned long size) {     /* the size is invalid if it is smaller than the number of      * elements already inside the hash table */     if (dictIsRehashing(d) || d->ht[0].used > size)         return DICT_ERR;     dictht n; /* the new hash table */     unsigned long realsize = _dictNextPower(size);     /* Rehashing to the same table size is not useful. */     if (realsize == d->ht[0].size) return DICT_ERR;     /* 根据新的size重新分配一个hashtable */     n.size = realsize;     n.sizemask = realsize-1;     n.table = zcalloc(realsize*sizeof(dictEntry*));     n.used = 0;       /* Is this the first initialization? If so it"s not really a rehashing      * we just set the first hash table so that it can accept keys. */     if (d->ht[0].table == NULL) {         d->ht[0] = n;         return DICT_OK;     }       /* 重新分配的hashtable置于ht[1] */     d->ht[1] = n;     //开始渐进rehash的标记     d->rehashidx = 0;     return DICT_OK; }   //如果2^n >= used * 2 返回 2^n, hashtable的size一定是2^n static unsigned long _dictNextPower(unsigned long size) {     unsigned long i = DICT_HT_INITIAL_SIZE;     if (size >= LONG_MAX) return LONG_MAX + 1LU;     while(1) {         if (i >= size)             return i;         i *= 2;     } }
  什么时候开启渐进式rehash, 在每次从hashtable中定位key的时候,代码如下  static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing) {     unsigned long idx, table;     dictEntry *he;     if (existing) *existing = NULL;       /* 判断是否需要扩展 */     if (_dictExpandIfNeeded(d) == DICT_ERR)         return -1;              /* 拉链法遍历hashtable,遍历两个hashtable */     /* ht[0], ht[1], 如果当前没有在rehash,则只遍历ht[0] */     for (table = 0; table <= 1; table++) {         //计算数据所在bucket位置,找到链表第一个entry         idx = hash & d->ht[table].sizemask;           /* Search if this slot does not already contain the given key */         he = d->ht[table].table[idx];         //遍历链表, 比较key值         while(he) {             if (key==he->key || dictCompareKeys(d, key, he->key)) {                 if (existing) *existing = he;                 return -1;             }             he = he->next;         }         if (!dictIsRehashing(d)) break;     }     return idx; }
  什么时候进行分步渐进rehash, 在每一个add, get操作中:  dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing) {     long index;     dictEntry *entry;     dictht *ht;     //进行分步rehash     if (dictIsRehashing(d))        _dictRehashStep(d);             /* Get the index of the new element, or -1 if      * the element already exists. */     if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)         return NULL;     /* Allocate the memory and store the new entry.      * Insert the element in top, with the assumption that in a database      * system it is more likely that recently added entries are accessed      * more frequently. */     /** rehash时, 数据存入ht[1] */     ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];     entry = zmalloc(sizeof(*entry));     entry->next = ht->table[index];     ht->table[index] = entry;     ht->used++;     /* 设置key, 返回entry*/     dictSetKey(d, entry, key);     return entry; }
  分步rehash过程 static void _dictRehashStep(dict *d) {     if (d->iterators == 0) dictRehash(d,1); }   int dictRehash(dict *d, int n) {     int empty_visits = n*10; /* Max number of empty buckets to visit. */     if (!dictIsRehashing(d)) return 0;       while(n-- && d->ht[0].used != 0) {         dictEntry *de, *nextde;           /* Note that rehashidx can"t overflow as we are sure there are more          * elements because ht[0].used != 0 */         assert(d->ht[0].size > (unsigned long)d->rehashidx);           /* 如果table中存在bucket为空时,检查10个bucket后都为空则返回,等下一次再处理 */         while(d->ht[0].table[d->rehashidx] == NULL) {             d->rehashidx++;             if (--empty_visits == 0) return 1;         }                  de = d->ht[0].table[d->rehashidx];                  /* 一次处理table中的一个bucket */         /* Move all the keys in this bucket from the old to the new hash HT */         while(de) {             uint64_t h;             nextde = de->next;             /* Get the index in the new hash table */             h = dictHashKey(d, de->key) & d->ht[1].sizemask;             de->next = d->ht[1].table[h];             d->ht[1].table[h] = de;             d->ht[0].used--;             d->ht[1].used++;             de = nextde;         }         d->ht[0].table[d->rehashidx] = NULL;         d->rehashidx++;     }     /* Check if we already rehashed the whole table... */     if (d->ht[0].used == 0) {         zfree(d->ht[0].table);         d->ht[0] = d->ht[1];         _dictReset(&d->ht[1]);         d->rehashidx = -1;         return 0;     }     /* More to rehash... */     return 1; }
  redis dict的缩小的触发存在两种情况  hash,set,zset这三类数据使用的dict, 在删除元素的时候会尝试rehash  另一种是定时尝试进行rehash
  尝试触发的代码如下  //当used / size < 10%时进行开启缩容rehash。 int htNeedsResize(dict *dict) {     long long size, used;     size = dictSlots(dict);     used = dictSize(dict);     return (size > DICT_HT_INITIAL_SIZE &&         (used*100/size < HASHTABLE_MIN_FILL)); }   /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL  * we resize the hash table to save memory */ void tryResizeHashTables(int dbid) {     if (htNeedsResize(server.db[dbid].dict))         dictResize(server.db[dbid].dict);              if (htNeedsResize(server.db[dbid].expires))         dictResize(server.db[dbid].expires); }
  由于rehash时需要分配内存,此部分内存是从redis maxmemory中的一部分,因此当bucket比较大时,rehash生成ht[1]时会使用大量内存。如果ht[1]占用的内存 + 原来的内存超过maxmeory,则会发生key逐出。由于sizeof(dictEntry*) = 8byte,因此当ht[0] size为4时,ht[1]扩展2倍,ht[1]占用内存为8 * 4 * 2 = 64byte,当size = 2^n 时,ht[1]占用的内存为8 * 2 * 2^n。
  三、dict scan
  1 正常状态
  2 正在rehash状态
  3 rehash扩容完成状态
  4 rehash缩容完成状态
  由于redis中dict是有状态的,只有在正常状态下可以完整的扫描字典中的所有的key。当dict在进行扩容和缩容时可能会存在扫描key的遗漏或者重复扫描。由于redis的dict中存在两个hashtable,分别为ht[0], ht[1]。rehash的过程是将ht[0]的key重新hash到ht[1]。这时redis的所有key其实存在于两个hashtable中。redis针对四种情况的方案如下: Dict tablesize保持不变,稳定的状态下,直接顺序遍历即可 Dict Resize,dict扩大了,如果还是按照顺序遍历,就会导致扫描大量重复Key。比如tablesize从8变成了16,假设之前访问的是3号桶,那么表扩展后则是继续访问415号桶;但是,原先的03号桶中的数据在Dict长度变大后被迁移到811号桶中,因此,遍历811号桶的时候会有大量的重复Key被返回。 Dict Resize,dict缩小了,如果还是按照顺序遍历,就会导致大量的Key被遗漏。比如tablesize从8变成了4,假设当前访问的是3号桶,那么下一次则会直接返回遍历结束了;但是之前47号桶中的数据在缩容后迁移带 可03号桶中,因此这部分Key就无法扫描到。 字典正在rehash,这种情况如(2)和(3)情况一下,要么大量重复扫描、要么遗漏很多Key。
  高位序访问即按照dict sizemask(掩码),在有效位(上图中dict sizemask为00000111)上从高位开始加一枚举(右边图的);低位则按照有效位的低位逐步加一枚举(左边的图)。
  高位序遍历路径 : 0->4->2->6->1->5->3->7
  redis采用掩码高位序访问的原因是在rehash时尽量少的重复扫描key。为什么从高位掩码访问会减少重复扫描。由于rehash最终计算bucket位置时是取模操作(v & size_mask),举个例子:
  针对 v = 00101011,原来size_mask为00000111,取模       00000111        00101011 =      00000011
  当size_mask为00001111,取模         00001111        00101011        00001011
  可以看出取模后的两个值存在相同的后缀011。size_mask变大后,生成的结果只是高位发生变化。  原来落到bucket[00000011]的key,table size从8->16时,size_mask从00000111->00001111时 会落到        [00000011] 和           [00001011]中,高位变化,低位不变,此时再看一遍从高位序访问的图
  redis从左边hashtable bucket0开始遍历,到遍历bucket6时发生了rehash。bucket长度为8扩展到16,redis的scan路径就会按照 6→14→1→9→5→13→3→11→7→15来完成,这里存在一个隐藏逻辑,一定是先遍历小hashtable,再变了大的hashtable。
  hashtable bucket
  从图上可以看出高位序scan在dict rehash时即可以避免重复遍历,又能完整返回原始的所有Key。同理,字典缩容时也一样,字典缩容可以看出是反向扩容。
  来看一下redis scan源码  unsigned long dictScan(dict *d,                        unsigned long v,                        dictScanFunction *fn,                        dictScanBucketFunction* bucketfn,                        void *privdata) {     dictht *t0, *t1;     const dictEntry *de, *next;     unsigned long m0, m1;       if (dictSize(d) == 0) return 0;     //没有进行rehash的情况     if (!dictIsRehashing(d)) {         t0 = &(d->ht[0]);         m0= t0->sizemask;         /* Emit entries at cursor */         if (bucketfn)            bucketfn(privdata, &t0->table[v & m0]);                  de = t0->table[v & m0];                  while (de) {             next = de->next;             fn(privdata, de);             de = next;         }         /* 高位序遍历计算出下一个cursor */         v |= ~m0;         /* Increment the reverse cursor */         v = rev(v);         v++;         v = rev(v);     } else {         //进行rehash的情况         t0 = &d->ht[0];         t1 = &d->ht[1];         /* 确保t0为小hashtable,t1为大的hashtable */         if (t0->size > t1->size) {             t0 = &d->ht[1];             t1 = &d->ht[0];         }         m0 = t0->sizemask;         m1 = t1->sizemask;         /* 先根据cursor遍历小的hashtable */         if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);         de = t0->table[v & m0];         while (de) {             next = de->next;             fn(privdata, de);             de = next;         }         /* 再变了大的hashtable,大的hashtable会遍历多个bucket */         do {             /* Emit entries at cursor */             if (bucketfn) bucketfn(privdata, &t1->table[v & m1]);             de = t1->table[v & m1];             while (de) {                 next = de->next;                 fn(privdata, de);                 de = next;             }             /* 反向二进制迭代算法来计算出下一个cursor */             v |= ~m1;             v = rev(v);             v++;             v = rev(v);             /* Continue while bits covered by mask difference is non-zero */         } while (v & (m0 ^ m1));     }     return v; }   // 反向二进制迭代算法 1. 对游标进行二进制翻转(原来的高位变成低位) 2. 低位+1 (对原来的高位进行+1,进位等操作) 3. 再进行一次二进制翻转 (恢复原来的高位,此时完成高位+1迭代)
