前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >redis事务源码分析

redis事务源码分析

原创
作者头像
冰寒火
修改2022-11-07 00:56:02
5310
修改2022-11-07 00:56:02
举报
文章被收录于专栏:软件设计

一、事务基础

1 redis事务介绍

事务能够将多个操作作为一个整体来执行,具备ACID四大特性。

  1. 原子性:redis主线程对字典空间进行操作,天生是原子的,不需要同步机制。
  2. 持久性:redis作为缓存是允许丢失数据的,我觉得不应该对持久性有过多的要求。另外redis也有rdb、aof来持久化数据。
  3. 一致性:redis并没有undo log,理论上事务执行一半就下线后是无法回滚的,需要通过redis-check-aof工具来检测,移除掉失败的事务命令。
  4. 隔离性:redis单线程处理client命令,算是可串行化读这个级别,事务不并发就不会破坏隔离性。

2 innodb事务介绍

事务并发破坏了事务的隔离性,根据破坏的程度分为四大隔离级别,每种级别对于写都是需要加写锁并在事务提交后释放,区别在于读的可见性不一样。

  1. 读未提交:对读没有约束,可以读到未提交的数据。
  2. 读已提交:每次读生成一个最新的read view,能够读到本事务执行期间提交的事务,与最开始读到的不一样,会有不可重复读现象。
  3. 可重复读:第一次读生成一个read view,后面的读以这个为主,并通过间隙锁来锁区间,阻止插入新的记录,解决幻读现象。 所有的写都是当前读,而读的可见性未必是当前最新的,也有可能是旧版本,需要考虑更新丢失/覆盖的现象。

二、监视器

redis事务是根据multi、exec、discard三个命令实现的,另外还需要关注watch、unwatch这个监视器,基于乐观锁的思想。redis事务期间是不能使用监视器的,待会儿源码中就能够看到。

1 redisDb

代码语言:c
复制
typedef struct redisDb {
    dict *dict;                 /* The keyspace for this DB */
    dict *expires;              /* Timeout of keys with a timeout set */
    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP)*/
    dict *ready_keys;           /* Blocked keys that received a PUSH */
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    int id;                     /* Database ID */
    long long avg_ttl;          /* Average TTL, just for stats */
    unsigned long expires_cursor; /* Cursor of the active expire cycle. */
    list *defrag_later;         /* List of key names to attempt to defrag one by one, gradually. */
    clusterSlotToKeyMapping *slots_to_keys; /* Array of slots to keys. Only used in cluster mode (db 0). */
} redisDb;


struct dict {
    dictType *type; //操作集

    dictEntry **ht_table[2]; //数组+链表+渐进式哈希
    unsigned long ht_used[2];

    long rehashidx; /* rehashing not in progress if rehashidx == -1 */

    /* Keep small vars at end for optimal (minimal) struct padding */
    int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
    signed char ht_size_exp[2]; /* exponent of size. (size = 1<<exp) */
};

redisDb里面有多个字典空间,这里我们主要讲解watched key的dict。字典空间是一个哈希表,数组+链表的形式,扩容时采用渐进式哈希将代价分摊到每个请求上,对用户请求延迟没有太大影响,但却能够分割停顿时间,是一个比较好的思想,Golang的map也是基于这种思想实现的。

watched_keys dict保存的是key-clients的映射关系,clients就是监视这个key的所有客户端。

2 watch

代码语言:c
复制
/* In the client->watched_keys list we need to use watchedKey structures
 * as in order to identify a key in Redis we need both the key name and the
 * DB */
typedef struct watchedKey {    robj *key;
    redisDb *db;
} watchedKey;
/**
 * watch命令
 */
void watchCommand(redisClient *c) {
    int j;
	//multi和watch只能选择一个
    if(c->flags & REDIS_MULTI) {
        addReplyError(c,"WATCH inside MULTI is not allowed");
        return;
    }
    for(j =1; j < c->argc; j++)
        watchForKey(c,c->argv[j]);
    addReply(c,shared.ok);
 }

/**
 * 监控指定的键
 */
void watchForKey(redisClient *c, robj *key) {    
    list *clients =NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;

    /* Check if we are already watching for this key */
    // 检查指定的键是否已经被监控
    listRewind(c->watched_keys,&li);
    while((ln =listNext(&li))) {
        wk =listNodeValue(ln);
        if(wk->db == c->db &&equalStringObjects(key,wk->key))
            return; /* Key already watched */
    }
    /* This key is not already watched in this DB. Let's add it */
    // 指定的键没有被监控
    clients =dictFetchValue(c->db->watched_keys,key);
    if(!clients) { 
        clients =listCreate();
        // 将key加入指定dict
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    listAddNodeTail(clients,c);
    /* Add the new key to the list of keys watched by this client */
    wk =zmalloc(sizeof(*wk));
    wk->key = key;
    wk->db = c->db;
    incrRefCount(key);
    // 将key加入指定的list
    listAddNodeTail(c->watched_keys,wk);
}

当客户端监视key时就会将这个客户端加入到watched_keys中key的value链表中。

3 修改watched_key

代码语言:c
复制
void signalModifiedKey(redisDb *db, robj *key){
	touchWatchedKey(db,key);
}
/* "Touch" a key, so that if this key is being WATCHed by some client the
 * next EXEC will fail. */
void touchWatchedKey(redisDb *db, robj *key) {    
    list *clients;
    listIter li;
    listNode *ln;

    // 判断该DB是否有监视的Key
    if(dictSize(db->watched_keys) ==0) return;
    clients =dictFetchValue(db->watched_keys, key);
    if(!clients) return;

    /* Mark all the clients watching this key as REDIS_DIRTY_CAS */
    /* Check if we are already watching for this key */
    listRewind(clients,&li);

    // 迭代所有的有该key的client,并为它们的flags标志位增加REDIS_DIRTY_CAS
    while((ln =listNext(&li))) {
        redisClient *c =listNodeValue(ln);

        c->flags |= REDIS_DIRTY_CAS;
    }
}

修改watched_key后会从watched_keys这个字典空间找到所有监视这个key的client,修改client的flag标志为脏位,client真正执行时都会判断flag是否为脏,如果是就直接return了。

4 unwatch

代码语言:c
复制
/**
 * unwatch命令
 */
void unwatchCommand(redisClient *c) {    
    unwatchAllKeys(c);
    //复位,重置脏位
    c->flags &=(~REDIS_DIRTY_CAS);
    addReply(c,shared.ok);
}
/* Unwatch all the keys watched by this client. To clean the EXEC dirty
 * flag is up to the caller. */
void unwatchAllKeys(redisClient *c) {    
    listIter li;
    listNode *ln;

    if(listLength(c->watched_keys) ==0) return;
    listRewind(c->watched_keys,&li);
    while((ln =listNext(&li))) {
        list *clients;
        watchedKey *wk;

        /* Lookup the watched key -> clients list and remove the client
         * from the list */
        wk =listNodeValue(ln);
        clients =dictFetchValue(wk->db->watched_keys, wk->key);
        redisAssertWithInfo(c,NULL,clients !=NULL);
        listDelNode(clients,listSearchKey(clients,c));
        /* Kill the entry at all if this was the only client */
        if(listLength(clients) ==0)
            // 函数dict中的key
            dictDelete(wk->db->watched_keys, wk->key);
        /* Remove this watched key from the client->watched list */
        // 删除list中的key
        listDelNode(c->watched_keys,ln);
        decrRefCount(wk->key);
        zfree(wk);
    }
}

touchWatchedKey修改key后会对watched_key的所有client设置脏位。所有set、delete等函数都会调用signalModifiedKey。

三、事务实现

主要讲解multi、exec、discard三个命令的实现。

image.png
image.png

1 multi

代码语言:c
复制
/**
 * multi命令对应的源码
 */void multiCommand(redisClient *c) {    // 判断是否嵌套执行multi
    if(c->flags & REDIS_MULTI) {
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }

    // 开启multi标志位
    c->flags |= REDIS_MULTI;
    addReply(c,shared.ok);
 }

2 入队

redis事务的命令是先入队,等exec到了后一起执行的。

代码语言:c
复制
//将命令入队。
/* Add a new command into the MULTI commands queue */
void queueMultiCommand(client *c, uint64_t cmd_flags) {
    multiCmd *mc;

    /* No sense to waste memory if the transaction is already aborted.
     * this is useful in case client sends these in a pipeline, or doesn't
     * bother to read previous responses and didn't notice the multi was already
     * aborted. */
    if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))
        return;
    if (c->mstate.count == 0) {
        /* If a client is using multi/exec, assuming it is used to execute at least
         * two commands. Hence, creating by default size of 2. */
        c->mstate.commands = zmalloc(sizeof(multiCmd)*2);
        c->mstate.alloc_count = 2;
    }
    if (c->mstate.count == c->mstate.alloc_count) {
        c->mstate.alloc_count = c->mstate.alloc_count < INT_MAX/2 ? c->mstate.alloc_count*2 : INT_MAX;
        c->mstate.commands = zrealloc(c->mstate.commands, sizeof(multiCmd)*(c->mstate.alloc_count));
    }
    mc = c->mstate.commands+c->mstate.count;
    mc->cmd = c->cmd;
    mc->argc = c->argc;
    mc->argv = c->argv;
    mc->argv_len = c->argv_len;

    c->mstate.count++;
    c->mstate.cmd_flags |= cmd_flags;
    c->mstate.cmd_inv_flags |= ~cmd_flags;
    c->mstate.argv_len_sums += c->argv_len_sum + sizeof(robj*)*c->argc;

    /* Reset the client's args since we copied them into the mstate and shouldn't
     * reference them from c anymore. */
    c->argv = NULL;
    c->argc = 0;
    c->argv_len_sum = 0;
    c->argv_len = 0;
}

3 exec

代码语言:c
复制
/**
 * exec命令对应的源码
 */ 
void execCommand(redisClient *c) {
    ……
    
    // 判断是否开启事务
    if(!(c->flags & REDIS_MULTI)) {
        addReplyError(c,"EXEC without MULTI");
        return;
    }
    
    ……
    
    // 判断是否有对应的watch,且watch的keys是否被修改
    if(c->flags &(REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {
        addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
                                                  shared.nullmultibulk);
        
        // 取消事务
        discardTransaction(c);
        goto handle_monitor;
    }

    // 执行事务前,将所有的wach的keys都unwatch掉
    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
    orig_argv = c->argv;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    addReplyMultiBulkLen(c,c->mstate.count);
    for(j =0; j < c->mstate.count; j++) {

        ……
        
        // 执行事务
        call(c,REDIS_CALL_FULL);

        ……
    }
    c->argv = orig_argv;
    c->argc = orig_argc;
    c->cmd = orig_cmd;
    
    ……
}

事务期间是不能够再watch的,一般是multi之前watch key来监视key的变化,然后exec时再判断这些watched keys来决定是否执行。在事务exec/discard后,这个client所有的watched key都会失效。

4 discard

代码语言:c
复制
//重置multi队列,释放所有监视的键
void discardTransaction(client *c) {
    freeClientMultiState(c);
    initClientMultiState(c);
    c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
    unwatchAllKeys(c);
}

四、小结

本节介绍redis事务,并简单讨论了下innodb的事务,对于事务的实现都是采用乐观锁/或者悲观锁来实现,乐观锁实现居多,大多是基于mvcc实现。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、事务基础
    • 1 redis事务介绍
      • 2 innodb事务介绍
      • 二、监视器
        • 1 redisDb
          • 2 watch
            • 3 修改watched_key
              • 4 unwatch
              • 三、事务实现
                • 1 multi
                  • 2 入队
                    • 3 exec
                      • 4 discard
                      • 四、小结
                      相关产品与服务
                      云数据库 Redis
                      腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档