TFdream / blog

个人技术博客,博文写在 Issues 里。
Apache License 2.0
129 stars 18 forks source link

Redisson ReadWriteLock实现原理剖析 #363

Open TFdream opened 3 years ago

TFdream commented 3 years ago

什么是读写锁?

维基百科上 读写锁 定义:

读写锁是计算机程序的并发控制的一种同步机制,也称“共享-互斥锁”、多读者-单写者锁。多读者锁,“push lock”) 用于解决读写问题。读操作可并发重入,写操作是互斥的。

读写锁通常用互斥锁、条件变量、信号量实现。

某些读写锁允许在读模式与写模式之间升降级。

实现方案

1、使用两把互斥锁

Michel Raynal使用两把互斥锁与一个整数计数器实现。计数器b跟踪被阻塞的读线程。互斥锁r保护b,供读者使用。互斥锁g (指"global")确保写操作互斥。

注:实现是读操作优先。

读者加锁伪代码: Begin Read

Lock r.
Increment b.
If b = 1, lock g.
Unlock r.

End Read

Lock r.
Decrement b.
If b = 0, unlock g.
Unlock r.

写者加锁伪代码: Begin Write

Lock g.

End Write

Unlock g.

2、使用条件变量与互斥锁

可使用条件变量c与普通的互斥锁m、整型计数器r(表示正在读的个数)与布尔标志g(表示正在写)来实现读写锁。 lock-for-read操作:

Lock m (blocking).
While w:
   wait c, m
Increment r.
Unlock m.

lock-for-write操作:

Lock m (blocking).
While (w or r > 0):
   wait c, m
Set w to true.
Unlock m.

lock-for-read与lock-for-write各自有自己的逆操作。

应用场景

读写锁的含义是准确的:是一种 读共享,写独占的锁。

读写锁的特性:

互斥原则:

因而适用于读多写少的场景。

Redisson中读写锁实现

关于读写锁,大家应该都了解JDK中的ReadWriteLock, 当然Redisson也有读写锁的实现。

所谓读写锁,就是多个客户端同时加读锁,是不会互斥的,多个客户端可以同时加这个读锁,读锁和读锁是不互斥的

Redisson中使用RedissonReadWriteLock来实现读写锁,它是RReadWriteLock的子类,具体实现读写锁的类分别是:RedissonReadLock和RedissonWriteLock

官方文档中给出的Redisson读写锁使用示例

RReadWriteLock rwlock = redisson.getReadWriteLock("myLock");

RLock lock = rwlock.readLock();
// or
RLock lock = rwlock.writeLock();

// traditional lock method
lock.lock();

// or acquire lock and automatically unlock it after 10 seconds
lock.lock(10, TimeUnit.SECONDS);

// or wait for lock aquisition up to 100 seconds 
// and automatically unlock it after 10 seconds
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}

源码分析

首先我们看一下 Redisson#getReadWriteLock方法:

    @Override
    public RReadWriteLock getReadWriteLock(String name) {
        return new RedissonReadWriteLock(commandExecutor, name);
    }

RedissonReadWriteLock 是 RReadWriteLock 接口的唯一实现类,如下:

package org.redisson;

public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock {

    public RedissonReadWriteLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
    }

    @Override
    public RLock readLock() {
        return new RedissonReadLock(commandExecutor, getRawName());
    }

    @Override
    public RLock writeLock() {
        return new RedissonWriteLock(commandExecutor, getRawName());
    }

}

一、Redisson 读锁逻辑

Redisson 读锁相关的逻辑封装在 RedissonReadLock 类中,如下:

public class RedissonReadLock extends RedissonLock implements RLock {

    public RedissonReadLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
    }

    @Override
    String getChannelName() {
        return prefixName("redisson_rwlock", getRawName());
    }

    String getWriteLockName(long threadId) {
        return super.getLockName(threadId) + ":write";
    }

    String getReadWriteTimeoutNamePrefix(long threadId) {
        return suffixName(getRawName(), getLockName(threadId)) + ":rwlock_timeout";
    }
    //=======
    public final String getRawName() {
        return name;
    }

    protected String getLockName(long threadId) {
        return id + ":" + threadId;
    }

    public static String prefixName(String prefix, String name) {
        if (name.contains("{")) {
            return prefix + ":" + name;
        }
        return prefix + ":{" + name + "}";
    }

    public static String suffixName(String name, String suffix) {
        if (name.contains("{")) {
            return name + ":" + suffix;
        }
        return "{" + name + "}:" + suffix;
    }

}

1、加锁逻辑

首先,我们看看加锁逻辑:


    @Override
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                                "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                                "if (mode == false) then " +
                                  "redis.call('hset', KEYS[1], 'mode', 'read'); " +
                                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                                  "redis.call('set', KEYS[2] .. ':1', 1); " +
                                  "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
                                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                  "return nil; " +
                                "end; " +
                                "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
                                  "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 
                                  "local key = KEYS[2] .. ':' .. ind;" +
                                  "redis.call('set', key, 1); " +
                                  "redis.call('pexpire', key, ARGV[1]); " +
                                  "local remainTime = redis.call('pttl', KEYS[1]); " +
                                  "redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
                                  "return nil; " +
                                "end;" +
                                "return redis.call('pttl', KEYS[1]);",
                        Arrays.<Object>asList(getRawName(), getReadWriteTimeoutNamePrefix(threadId)),
                        unit.toMillis(leaseTime), getLockName(threadId), getWriteLockName(threadId));
    }

加锁lua脚本如下:

local mode = redis.call('hget', KEYS[1], 'mode');
if (mode == false) then
    redis.call('hset', KEYS[1], 'mode', 'read');
    redis.call('hset', KEYS[1], ARGV[2], 1);
    redis.call('set', KEYS[2] .. ':1', 1);
    redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end
if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then
    local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1);
    local key = KEYS[2] .. ':' .. ind;
    redis.call('set', key, 1);
    redis.call('pexpire', key, ARGV[1]);
    local remainTime = redis.call('pttl', KEYS[1]);
    redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1]));
    return nil;
    end;
return redis.call('pttl', KEYS[1]);

2、解锁逻辑

    @Override
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
        String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);

        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                "if (mode == false) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
                "if (lockExists == 0) then " +
                    "return nil;" +
                "end; " +

                "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " + 
                "if (counter == 0) then " +
                    "redis.call('hdel', KEYS[1], ARGV[2]); " + 
                "end;" +
                "redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +

                "if (redis.call('hlen', KEYS[1]) > 1) then " +
                    "local maxRemainTime = -3; " + 
                    "local keys = redis.call('hkeys', KEYS[1]); " + 
                    "for n, key in ipairs(keys) do " + 
                        "counter = tonumber(redis.call('hget', KEYS[1], key)); " + 
                        "if type(counter) == 'number' then " + 
                            "for i=counter, 1, -1 do " + 
                                "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + 
                                "maxRemainTime = math.max(remainTime, maxRemainTime);" + 
                            "end; " + 
                        "end; " + 
                    "end; " +

                    "if maxRemainTime > 0 then " +
                        "redis.call('pexpire', KEYS[1], maxRemainTime); " +
                        "return 0; " +
                    "end;" + 

                    "if mode == 'write' then " + 
                        "return 0;" + 
                    "end; " +
                "end; " +

                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; ",
                Arrays.<Object>asList(getRawName(), getChannelName(), timeoutPrefix, keyPrefix),
                LockPubSub.UNLOCK_MESSAGE, getLockName(threadId));
    }

对应的lua脚本如下:

local mode = redis.call('hget', KEYS[1], 'mode');
if (mode == false) then
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
end;
local lockExists = redis.call('hexists', KEYS[1], ARGV[2]);
if (lockExists == 0) then
    return nil;
end;

local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1);
if (counter == 0) then
    redis.call('hdel', KEYS[1], ARGV[2]);
end;
redis.call('del', KEYS[3] .. ':' .. (counter+1));

if (redis.call('hlen', KEYS[1]) > 1) then
    local maxRemainTime = -3;
    local keys = redis.call('hkeys', KEYS[1]);
    for n, key in ipairs(keys) do
        counter = tonumber(redis.call('hget', KEYS[1], key));
        if type(counter) == 'number' then
            for i=counter, 1, -1 do
                local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i);
                maxRemainTime = math.max(remainTime, maxRemainTime);
            end;
        end;
    end;

    if maxRemainTime > 0 then
        redis.call('pexpire', KEYS[1], maxRemainTime);
        return 0;
    end;

    if mode == 'write'
        return 0;
    end;
end;

redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;

Redisson 写锁逻辑

Redisson 写锁相关的逻辑封装在 RedissonWriteLock类中,如下:

public class RedissonWriteLock extends RedissonLock implements RLock {

    protected RedissonWriteLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
    }

    @Override
    String getChannelName() {
        return prefixName("redisson_rwlock", getRawName());
    }

    @Override
    protected String getLockName(long threadId) {
        return super.getLockName(threadId) + ":write";
    }

}

加锁逻辑:

    @Override
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                            "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                            "if (mode == false) then " +
                                  "redis.call('hset', KEYS[1], 'mode', 'write'); " +
                                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                  "return nil; " +
                              "end; " +
                              "if (mode == 'write') then " +
                                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + 
                                      "local currentExpire = redis.call('pttl', KEYS[1]); " +
                                      "redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
                                      "return nil; " +
                                  "end; " +
                                "end;" +
                                "return redis.call('pttl', KEYS[1]);",
                        Arrays.<Object>asList(getRawName()),
                        unit.toMillis(leaseTime), getLockName(threadId));
    }

解锁逻辑:


    @Override
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "local mode = redis.call('hget', KEYS[1], 'mode'); " +
                "if (mode == false) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                "if (mode == 'write') then " +
                    "local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +
                    "if (lockExists == 0) then " +
                        "return nil;" +
                    "else " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                            "return 0; " +
                        "else " +
                            "redis.call('hdel', KEYS[1], ARGV[3]); " +
                            "if (redis.call('hlen', KEYS[1]) == 1) then " +
                                "redis.call('del', KEYS[1]); " +
                                "redis.call('publish', KEYS[2], ARGV[1]); " + 
                            "else " +
                                // has unlocked read-locks
                                "redis.call('hset', KEYS[1], 'mode', 'read'); " +
                            "end; " +
                            "return 1; "+
                        "end; " +
                    "end; " +
                "end; "
                + "return nil;",
        Arrays.<Object>asList(getRawName(), getChannelName()),
        LockPubSub.READ_UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }