IntSet是Redis中set集合的一种实现方式,基于整数数组来实现,并且具备长度可变、有序等特征。结构如下:
typedof struct intset{
uint32_t encoding;/*编码方式,支持放16位、32位、64位整数**/
uint32_t length;/*元素个数*/
uint8_t contents[];/*整数数组,保存整数数据*/
}intset;其中的encoding包含三种模式,表示存储的整数大小不同:
* INTSET_ENC_INT16 < INTSET_ENC_INT32 < INTSET_ENC_INT64 *
# define INTSET_ENC_INT16(sizeof(int16_t)) /*2字节整数,范围类似java的short*/
# define INTSET_ENC_INT32(sizeof(int32_t)) /*4字节整数,范围类似java的int*/
# define INTSET_ENC_INT64(sizeof(int64_t)) /*8字节整数,范围类似java的long*/为了方便查找,Redis会将intset中所有的整数按照升序依次保存在contents数组中,结构如图:

现在,数组中每个数字都在int16_t的范围内,因此采用的编码方式是INTSET_ENC_INT16,每部分占用的字节大小为
我们向该其中添加一个数字:50000,这个数字超出了int16_t的范围,intset会自动升级编码方式到合适的大小。以当前案例来说流程如下
typedef struct intset {
uint32_t encoding;
uint32_t length;
int8_t contents[];
} intset;
intset *intsetAdd(intset *is, int64_t value, uint8_t *success);
intset *intsetAdd(intset *is, int64_t value, uint8_t *success)
{
// 获取当前元素的编码,如果是16位的就用16位的编码,超过就用32位,依次这样下去
// 应该要知道传入进来的value值要用什么编码
uint8_t valenc = _intsetValueEncoding(value);
// 要插入位置
uint32_t pos;
if (success)
*success = 1;
/* Upgrade encoding if necessary. If we need to upgrade, we know that
* this value should be either appended (if > 0) or prepended (if < 0),
* because it lies outside the range of existing values. */
// 判断valenc编码是不是超过了当前intSet的编码
if (valenc > intrev32ifbe(is->encoding))
{
/* This always succeeds, so we don't need to curry *success. */
// 超出编码,需要升级
return intsetUpgradeAndAdd(is, value);
}
else
{
/* Abort if the value is already present in the set.
* This call will populate "pos" with the right position to insert
* the value when it cannot be found. */
// 在当前intSet中查找值与value一样的元素角标pos
if (intsetSearch(is, value, &pos))
{
// 如果找到了就直接返回,因为intSet是不能重复元素存在的
if (success)
*success = 0;
return is;
}
// 数组扩容
is = intsetResize(is, intrev32ifbe(is->length) + 1);
if (pos < intrev32ifbe(is->length))
intsetMoveTail(is, pos, pos + 1);
}
_intsetSet(is, pos, value);
is->length = intrev32ifbe(intrev32ifbe(is->length) + 1);
return is;
}
/* Upgrades the intset to a larger encoding and inserts the given integer. */
static intset *intsetUpgradeAndAdd(intset *is, int64_t value)
{
// 获取当前intSet编码
uint8_t curenc = intrev32ifbe(is->encoding);
// 获取最新的编码
uint8_t newenc = _intsetValueEncoding(value);
// 获取元素个数
int length = intrev32ifbe(is->length);
// 判断新元素是大于0还是小于0,小于0插入队首,大于0插入队尾
int prepend = value < 0 ? 1 : 0;
/* First set new encoding and resize */
// 重新设置编码格式
is->encoding = intrev32ifbe(newenc);
// 重置数组的大小
is = intsetResize(is, intrev32ifbe(is->length) + 1);
/* Upgrade back-to-front so we don't overwrite values.
* Note that the "prepend" variable is used to make sure we have an empty
* space at either the beginning or the end of the intset. */
// 倒序遍历,逐个搬运元素到新的位置
while (length--)
_intsetSet(is, length + prepend, _intsetGetEncoded(is, length, curenc));
/* Set the value at the beginning or the end. */
// 插入新元素,prepend决定了是队首还是队尾
if (prepend)
_intsetSet(is, 0, value);
else
_intsetSet(is, intrev32ifbe(is->length), value);
// 修改数组长度
is->length = intrev32ifbe(intrev32ifbe(is->length) + 1);
return is;
}
/* Upgrades the intset to a larger encoding and inserts the given integer. */
static intset *intsetUpgradeAndAdd(intset *is, int64_t value)
{
// 获取当前intSet编码
uint8_t curenc = intrev32ifbe(is->encoding);
// 获取最新的编码
uint8_t newenc = _intsetValueEncoding(value);
// 获取元素个数
int length = intrev32ifbe(is->length);
// 判断新元素是大于0还是小于0,小于0插入队首,大于0插入队尾
int prepend = value < 0 ? 1 : 0;
/* First set new encoding and resize */
// 重新设置编码格式
is->encoding = intrev32ifbe(newenc);
// 重置数组的大小
is = intsetResize(is, intrev32ifbe(is->length) + 1);
/* Upgrade back-to-front so we don't overwrite values.
* Note that the "prepend" variable is used to make sure we have an empty
* space at either the beginning or the end of the intset. */
// 倒序遍历,逐个搬运元素到新的位置
while (length--)
_intsetSet(is, length + prepend, _intsetGetEncoded(is, length, curenc));
/* Set the value at the beginning or the end. */
// 插入新元素,prepend决定了是队首还是队尾
if (prepend)
_intsetSet(is, 0, value);
else
_intsetSet(is, intrev32ifbe(is->length), value);
// 修改数组长度
is->length = intrev32ifbe(intrev32ifbe(is->length) + 1);
return is;
}
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos)
{
// 初始化二分查找,min max mid
int min = 0, max = intrev32ifbe(is->length) - 1, mid = -1;
int64_t cur = -1;
/* The value can never be found when the set is empty */
// 数组不为空,判断value是否大于最大值,小于最小值
if (intrev32ifbe(is->length) == 0)
{
// 数组位空,插入第一个元素
if (pos)
*pos = 0;
return 0;
}
else
{
/* Check for the case where we know we cannot find the value,
* but do know the insert position. */
// 数组不位空,值是否大于最大值,大于最大值,插入队尾
if (value > _intsetGet(is, max))
{
if (pos)
*pos = intrev32ifbe(is->length);
return 0;
}
// 小于最小值,不用找了,插入队首
else if (value < _intsetGet(is, 0))
{
if (pos)
*pos = 0;
return 0;
}
}
// 二分查找
while (max >= min)
{
mid = ((unsigned int)min + (unsigned int)max) >> 1;
cur = _intsetGet(is, mid);
if (value > cur)
{
min = mid + 1;
}
else if (value < cur)
{
max = mid - 1;
}
else
{
break;
}
}
if (value == cur)
{
if (pos)
*pos = mid;
return 1;
}
else
{
if (pos)
*pos = min;
return 0;
}
}
Intset可以看做是特殊的整数数组,具备一些特点:
typedof struct dict{
dictType *type;// dict类型,内置不同的hash函数
void *privdata; // 私有数据,在做特殊hash运算时用
dictht ht[2]; // 一个Dict包含两个哈希表,其中一个是当前数据,另一个一般是空,rehash是使用
long rehashidx; // rehash的进度,-1表示未进行
int16_t pauserehash; // rehash是否暂停,1则暂停,0则继续
}dict;存在2个是因为在扩容时需要保持旧和新的数组
typedef struct dictht{
//entry数组
//数组中保存的是指向entry的指针
dictEntry **table;
// 哈希表大小
unsigned long size;
// 哈希表大小的掩码,总等于size-1
unsigned long sizemask;
// entry个数
unsigned long used;
}dictht;typedef struct dictEntry {
void *key; // key 存储redis中的K,该类型即为SDS
union {
void *val; // 引用类型
uint64_t u64;// 无符号
int64_t s64; // 有符号
double d; // double类型
} v;// val 存储redis中的V,该类型为redisObject
struct dictEntry *next; // 下一个Entry的指针
} dictEntry;
比如:当我们向Dict添加键值对时,Redis首先根据key计算出hash值(h),然后利用h&sizemask来计算元素应该存储到数组中的哪个索引位置。我们存储k1=v1,假设k1的哈希值h =1,则1&3 =1,因此k1=v1要存储到数组角标1位置
typedef struct dict{
dictType *type; // dict类型,内置不同的hash函数
void *privdata; // 私有数据,在做特殊hash运算时用
dictht ht[2]; // 一个Dict包含两个哈希表,其中一个是当前数据,另一个一般是空,rehash时使用
long rehashidx; // rehash的进度,-1表示未进行
int16_t pauserehash; // rehash是否暂停,1则暂停,0则继续
}dict;typedef struct dictht{
//entry数组
//数组中保存的是指向entry的指针
dictEntry **table;
// 哈希表大小
unsigned long size;
// 哈希表大小的掩码,总等于size - 1
unsigned long sizemask;
// entry 个数
unsigned long used;
}dictht;typedef struct dictEntry{
void *key; //键
union{
void *val;
uint64_t u64;
int64_t s64;
double d;
}v;//值
//下一个Entry的指针
struct dictEntry *next;
}dictEntry;


扩容源码分析:
static int _dictExpandIfNeeded(dict *d){
//如果正在rehash,则返回ok
if(dictIsRehashing(d))return DICK_OK;
//如果哈希表为空,则初始化哈希表为默认大小:4
if(d>ht[0].size == 0)return dictExpand(d,DICT_HT_INITIAL_SIZE);
//当负载因子(used/size)达到1以上,并且当前没有进行bgrewrite等子进程操作
//或者负载因子超过5,则进行dictExpand,也就是扩容
if(d->ht[0].used >= d->ht[0].size &&
(dict_can_resize || d->ht[0].used/d ->ht[0].size > dict_force_resize_ratio)){
//扩容大小为used +1,底层会对扩容大小做判断,实际上找的是第一个大于等于 used+1 的 2^n
return dictExpand(d,d->ht[0].used +1);
}
return DICT_OK;
}Dict除了扩容以外,每次删除元素时,也会对负载因子做检查,当LoadFactor < 0.1 时,会做哈希表收缩
// t_hash.c #hashTypeDeleted()
if(dictDelete((dict*)o->prt,field)==C_OK){
delete = 1;
// 删除成功后,检查是否需要重置Dict大小,如果需要则调用dictResize重置
if(htNeedsResize(o->ptr))dictResize(o->ptr);
}// server.c文件
int htNeedsResize(dict *dict){
long long size,used;
// 哈希表大小
size = dictSlots(dict);
// entry数量
used = dictSize(dict);
// siez > 4(哈希表初始大小) 并且负载因子低于0.1
return (size>DICT_HT_INITIAL_SIZE && (used*100/size < HASHTABLE_MIN_FILL))
}int dictResize(dict *d){
unsigned long minimal;
// 如果正在做bgsave或bgrewrireof或rehash,则返回错误
if(!dict_can_resize || dictIsRehashing(d))
return DICT_ERR;
// 获取used,也就是entry个数
minimal = d ->ht[0].used;
// 如果used小于4,则重置为4
if(minimal < DICT_HT_INITIAL_SIZE)
minimal = DICT_HT_INITLAL_SIZE;
// 重置大小为minimal,其实是第一个大于等于minimal的2^n
return dictExpand(d,minimal );
}Dict的rehash并不是一次性完成的。试想一下,如果Dict中包含数百万的entry,要在一次rehash完成,极有可能导致主线程阻塞。因此Dict的rehash是分多次、渐进式的完成,因此称为渐进式rehash。 流程如下:
1、计算新hash表的size,值取决于当前要做的是扩容还是收缩:
2、设置dict.rehashidx = 0,标示开始rehash
3、每次执行新增、查询、修改、删除操作时,都检查一下dict.rehashidx是否大于-1,如果是则将dict.ht[0].table[rehashidx]的entry链表rehash到dict.ht[1],并且将rehashidx++。直至dict.ht[0]的所有数据都rehash到dict.ht[1]
4、 将dict.ht[1]赋值给dict.ht[0],给dict.ht[1]初始化为空哈希表,释放原来的dict.ht[0]的内存
5、将rehashidx赋值为-1,代表rehash结束
6、在rehash过程中,新增操作,则直接写入ht[1],查询、修改和删除则会在dict.ht[0]和dict.ht[1]依次查找并执行。这样可以确保ht[0]的数据只减不增,随着rehash最终为空




原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。