lee3164 / myblog

blog
1 stars 0 forks source link

redis数据结构剖析 #1

Open lee3164 opened 5 years ago

lee3164 commented 5 years ago

1.列表(list)

list是开发中常见的一种数据结构,我们先来看看redis中如何定义list。

// 节点类型
typedef struct listNode {
    struct listNode *prev;
    struct listNode *next;
    void *value;
} listNode;

// 迭代器类型
#define AL_START_HEAD 0
#define AL_START_TAIL 1

typedef struct listIter {
    listNode *next;
    int direction; // 0正向迭代,1反向
} listIter;

typedef struct list {
    listNode *head;
    listNode *tail;
    void *(*dup)(void *ptr);  // value的复制函数
    void (*free)(void *ptr); // value的free函数
    int (*match)(void *ptr, void *key); // value的匹配函数
    unsigned long len;
} list;

/* Prototypes */
list *listCreate(void);  
void listRelease(list *list);
list *listAddNodeHead(list *list, void *value);
list *listAddNodeTail(list *list, void *value);
list *listInsertNode(list *list, listNode *old_node, void *value, int after);
void listDelNode(list *list, listNode *node);
listIter *listGetIterator(list *list, int direction);
listNode *listNext(listIter *iter);
void listReleaseIterator(listIter *iter);
list *listDup(list *orig);  // 复制链表,利用dup函数
listNode *listSearchKey(list *list, void *key);  // 搜索链表中的值,利用match函数
listNode *listIndex(list *list, long index);  // 根据索引得到值,支持正反,index<0即是反向
void listRewind(list *list, listIter *li);
void listRewindTail(list *list, listIter *li);
void listRotate(list *list); // 将链表尾部节点加到头部

从代码中可以看出,list是一个双向链表,提供了一个简单的迭代器。没有太多可说的

lee3164 commented 5 years ago

2.hash表(dict)

hash表也是常见的数据结构,先来看看redis中如何进行定义。

typedef struct dictEntry {
    void *key;
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    struct dictEntry *next;
} dictEntry;

typedef struct dictType {
    unsigned int (*hashFunction)(const void *key);
    void *(*keyDup)(void *privdata, const void *key);
    void *(*valDup)(void *privdata, const void *obj);
    int (*keyCompare)(void *privdata, const void *key1, const void *key2);
    void (*keyDestructor)(void *privdata, void *key);
    void (*valDestructor)(void *privdata, void *obj);
} dictType;

/* This is our hash table structure. Every dictionary has two of this as we
 * implement incremental rehashing, for the old to the new table. */
// dict hash table
typedef struct dictht {
    dictEntry **table;
    unsigned long size;
    unsigned long sizemask;
    unsigned long used;
} dictht;

typedef struct dict {
    dictType *type;
    void *privdata;
    dictht ht[2];
    long rehashidx; /* 记录是否正在rehash中,如果不等于-1则是在rehash,否则记录的是当前rehash进度*/ 
    int iterators; /* number of iterators currently running */
} dict;

从以上可以看出,dict使用的是开链法,当hash冲突的时候,注意每个dictEntry有个next指针。同时dict还维护了一些基本信息,为了模拟面向对象还定义了一个dictType来维护各种方法,如hash方法。dict中有个dictht[2]的成员,这个主要是为了rehash时候使用,平时使用的是0,rehash时将数据转移到1中,转移完成后将交换0和1. 下面简述下dict中的各种操作

/*
缩小操作,缩小到当前的使用量大小。
*/
int dictResize(dict *d)
{
    int minimal;

    if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR;
    minimal = d->ht[0].used;
    if (minimal < DICT_HT_INITIAL_SIZE)
        minimal = DICT_HT_INITIAL_SIZE;
    return dictExpand(d, minimal);
}

// 扩容缩小都要执行的函数
int dictExpand(dict *d, unsigned long size)
{
    dictht n; /* the new hash table */
    unsigned long realsize = _dictNextPower(size); // 计算根据当前size计算整个dict的容量,见下面

/*
通过该函数计算出来的dict容量始终是2的幂,通过这个函数找到一个比size大又是2的幂的数字
static unsigned long _dictNextPower(unsigned long size)
{
    unsigned long i = DICT_HT_INITIAL_SIZE;

    if (size >= LONG_MAX) return LONG_MAX;
    while(1) {
        if (i >= size)
            return i;
        i *= 2;
    }
}
*/

    /* the size is invalid if it is smaller than the number of
     * elements already inside the hash table */
    if (dictIsRehashing(d) || d->ht[0].used > size)
        return DICT_ERR;

    /* Rehashing to the same table size is not useful. */
    if (realsize == d->ht[0].size) return DICT_ERR;

    /* Allocate the new hash table and initialize all pointers to NULL */
    n.size = realsize;
    n.sizemask = realsize-1;
    n.table = zcalloc(realsize*sizeof(dictEntry*));
    n.used = 0;

    /* Is this the first initialization? If so it's not really a rehashing
     * we just set the first hash table so that it can accept keys. */
    if (d->ht[0].table == NULL) {
        d->ht[0] = n;
        return DICT_OK;
    }

    /* Prepare a second hash table for incremental rehashing */
    d->ht[1] = n;
    d->rehashidx = 0;
    return DICT_OK;
}

// 这个是一个内部函数,private,调用真正rehash函数。
// 什么时候调用这个函数呢,每次有更新,查询的时候都调,
// 因此rehash不是一次性全部完成的,而是分步完成
static void _dictRehashStep(dict *d) {
    if (d->iterators == 0) dictRehash(d,1);
}

/*
rehash的步骤并不是一步完成的,因为有时rehash很耗时,如果此时有命令到达,会阻塞。
此处的n的意思是该次rehash的步数,例如1就是rehash一个槽,但是dict中有很多bucket是空的,也要控制。空槽如果太多的话也会影响其他命令处理,因此每次最多处理 n*10个空槽,剩下的才是真正rehash的过程,通过rehasidx保存当前rehash的进度,一个个依次rehash
*/
int dictRehash(dict *d, int n) {
    int empty_visits = n*10; /* Max number of empty buckets to visit. */
    if (!dictIsRehashing(d)) return 0;

    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;

        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */
        assert(d->ht[0].size > (unsigned long)d->rehashidx);
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        de = d->ht[0].table[d->rehashidx];
        /* Move all the keys in this bucket from the old to the new hash HT */
        // 开放链表法,依次转移链表元素
        while(de) {
            unsigned int h;

            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            de->next = d->ht[1].table[h];
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
        d->ht[0].table[d->rehashidx] = NULL; // 转移之后将原来的指针赋值NULL
        d->rehashidx++; // rehash进度+1
    }

    /* Check if we already rehashed the whole table... */
    // 如果全部rehash,返回0,此处根据used的参数判断
    // 没转已完成返回1
    if (d->ht[0].used == 0) {
        zfree(d->ht[0].table);
        d->ht[0] = d->ht[1];
        _dictReset(&d->ht[1]);
        d->rehashidx = -1;
        return 0;
    }

    /* More to rehash... */
    return 1;
}

// 判断dict是否需要扩张的函数
static int _dictExpandIfNeeded(dict *d)
{
    /* Incremental rehashing already in progress. Return. */
    // 如果当前正在rehash,返回,
    if (dictIsRehashing(d)) return DICT_OK;

    /* If the hash table is empty expand it to the initial size. */
    // 首次的时候初始化到初始大小,是4
    if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

    /* If we reached the 1:1 ratio, and we are allowed to resize the hash
     * table (global setting) or we should avoid it but the ratio between
     * elements/buckets is over the "safe" threshold, we resize doubling
     * the number of buckets. */
    // 有两种判断策略,如果dict_can_resize打开,则当used 超过dict槽数量的时候就扩张
    // 否则是 大小超过 指定倍数的时候,默认是5
    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize ||
         d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
    {
        // 扩招为当前的2倍
        return dictExpand(d, d->ht[0].used*2);
    }
    return DICT_OK;
}

// 根据key找槽位置的函数,还要负责dict的扩张
// 如果key已经插入,返回-1,否则返回指定位置
static int _dictKeyIndex(dict *d, const void *key)
{
    unsigned int h, idx, table;
    dictEntry *he;

    /* Expand the hash table if needed */
    if (_dictExpandIfNeeded(d) == DICT_ERR)
        return -1;
    /* Compute the key hash value */
    h = dictHashKey(d, key);
    for (table = 0; table <= 1; table++) {
        idx = h & d->ht[table].sizemask;
        /* Search if this slot does not already contain the given key */
        he = d->ht[table].table[idx];
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key))
                return -1;
            he = he->next;
        }
        if (!dictIsRehashing(d)) break;
    }
    return idx;
}

// 根据key找bucket的函数
dictEntry *dictAddRaw(dict *d, void *key)
{
    int index;
    dictEntry *entry;
    dictht *ht;

    // 如果当前需要rehash,进行rehash
    if (dictIsRehashing(d)) _dictRehashStep(d);

    /* Get the index of the new element, or -1 if
     * the element already exists. */
    // 如果key已经插入了,返回null
    if ((index = _dictKeyIndex(d, key)) == -1)
        return NULL;

    /* Allocate the memory and store the new entry.
     * Insert the element in top, with the assumption that in a database
     * system it is more likely that recently added entries are accessed
     * more frequently. */

    // 否则将创建一个entry,插到指定位置的链表头部
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    entry->next = ht->table[index];
    ht->table[index] = entry;
    ht->used++;

    /* Set the hash entry fields. */
    dictSetKey(d, entry, key);
    return entry;
}

// 插入k,v到dict
int dictAdd(dict *d, void *key, void *val)
{
    dictEntry *entry = dictAddRaw(d,key);
    // 找到一个合适的bucket,见下
    // 找到了插值,找不到返回错误

    if (!entry) return DICT_ERR;
    dictSetVal(d, entry, val);
    return DICT_OK;
}

// 更新k,v,有则更新,无则插入
int dictReplace(dict *d, void *key, void *val)
{
    dictEntry *entry, auxentry;

    // 首先尝试插入,成功则返回
    if (dictAdd(d, key, val) == DICT_OK)
        return 1;

    // 此处说明key已经存在,找到对应entry
    entry = dictFind(d, key);
    /* Set the new value and free the old one. Note that it is important
     * to do that in this order, as the value may just be exactly the same
     * as the previous one. In this context, think to reference counting,
     * you want to increment (set), and then decrement (free), and not the
     * reverse. */
    auxentry = *entry;
    dictSetVal(d, entry, val);
    dictFreeVal(d, &auxentry);
    return 0;
}