当前位置: 首页>后端>正文

Redis 分布式锁详细分析

锁的作用,我想大家都理解,就是让不同的线程或者进程可以安全地操作共享资源,而不会产生冲突。
比较熟悉的就是 SynchronizedReentrantLock 等,这些可以保证同一个 jvm 程序中,不同线程安全操作共享资源。
但是在分布式系统中,这种方式就失效了;由于分布式系统多线程、多进程并且分布在不同机器上,这将使单机并发控制锁策略失效,为了解决这个问题就需要一种跨 JVM 的互斥机制来控制共享资源的访问。
比较常用的分布式锁有三种实现方式:

  1. 基于数据库的分布式锁
  2. 基于 ZooKeeper 的分布式锁
  3. 基于 Redis 的分布式锁

本篇文章主要讲解基于 Redis 分布式锁的实现。

一. 简单的实现

分布式锁最主要的作用就是保证任意一个时刻,只有一个客户端能访问共享资源。

1.1 SET NX 命令

我们知道 redisSET key value NX 命令,仅在不存在 key 的时候才能被执行成功,保证多个客户端只有一个能执行成功,相当于获取锁。
释放锁的时候,只需要删除 del key 这个 key 就行了。

上面的实现看似已经满足要求了,但是忘了考虑在分布式环境下,有以下问题:

  1. 获取锁的客户端意外崩溃,没有释放锁;导致其他客户端再也不能获取锁。
  2. 网络分裂,导致释放锁的命令没有发给 redis , 也导致锁无法释放。

最大的问题就是因为客户端或者网络问题,导致 redis 中的 key 没有删除,锁无法释放,因此其他客户端无法获取到锁。

1.2 SET NX PX 命令

针对上面的情况,使用了下面命令:

SET key my_random_value NX PX 30000

使用 PX 的命令,给 key 添加一个自动过期时间(30秒),保证即使因为意外情况,没有调用释放锁的方法,锁也会自动释放,其他客户端仍然可以获取到锁。

注意给这个 key 设置的值 my_random_value 是一个随机值,而且必须保证这个值在客户端必须是唯一的。这个值的作用是为了更加安全地释放锁。

if redis.call("get",KEYS[1]) == ARGV[1]  then
    return redis.call("del",KEYS[1])
else
    return 0
end

这是为了避免删除其他客户端成功获取的锁。考虑下面情况:

当客户端A获取到锁,但是客户端A执行时间超过了锁 key 过期时间,这个锁的 key 自动删除了,被客户端B 重新设置了锁 key, 获取了锁 。这个时候客户端A执行完成之后,直接把客户端B的锁给释放了。
注意:这里 key 的名字是一样的,就是锁名。

因此这里使用一个 my_random_value 随机值,保证客户端只会释放自己获取的锁,即只删除自己设置的 key

这种实现方式,存在下面问题:

  1. 锁的过期时间,即只要超过了设置的过期时间,其他客户端就可以获取到锁,同一时间有两个客户端访问共享资源。
  2. 这个锁不是一个可重入锁,同一线程没有办法多次获取锁。
  3. 单点失败问题。即使是主从模式的 redis ,也会存在主从复制不同步,导致可能存在的多个客户端获取到锁。

二. Redisson 的实现

上面章节介绍了,简单实现存在的问题,下面来介绍一下 Redisson 实现又是怎么解决的这些问题的。

注:下面源码是基于 redisson-3.11.3

2.1 tryLock() 方法获取锁

   // 在 RedissonLock 的源码中

   // 同步方法获取锁, true 表示获取到锁
    @Override
    public boolean tryLock() {
        return get(tryLockAsync());
    }

   // 异步方法获取锁, 得到的 RFuture get方法返回 true 表示获取到锁
    @Override
    public RFuture<Boolean> tryLockAsync() {
        return tryLockAsync(Thread.currentThread().getId());
    }

    @Override
    public RFuture<Boolean> tryLockAsync(long threadId) {
        return tryAcquireOnceAsync(-1, null, threadId);
    }

    // 返回是否获取到锁的 RFuture
    private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
        // 锁持有时间 leaseTime 有值
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        }
       // 持有锁的时间是不限制的
        RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
    
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            // 获取锁过程出错,就直接返回
            if (e != null) {
                return;
            }

            // ttlRemaining 为true,表示获取到锁,那么就需要开启定时,给锁的过期时间不断延期,保证锁不过期。
            if (ttlRemaining) {
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }

主要关注 tryAcquireOnceAsync 方法,有三个参数:

  1. leaseTime 和 unit:表示锁持有时间。如果 leaseTime = -1 表示锁持有时间是不限制的,即只要不释放锁,锁永不过期。
  2. threadId: 表示当前线程的 id ,用来实现可重入锁的。

如何实现锁永不过期呢?
Redisson 实现了一个定时操作,在锁过期之前,不断地给锁延期过期时间。默认的时间就是 commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout() 看门狗的时间。

2.2 tryLock(long waitTime, long leaseTime, TimeUnit unit) 方法获取锁

    // 获取锁。 最多等待 waitTime 时间
    @Override
    public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
        return tryLock(waitTime, -1, unit);
    }

@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        // 等待获取锁的时间
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        // 线程ID
        long threadId = Thread.currentThread().getId();

        // 获取锁 key 过期时间
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // 如果过期时间是 null,表示获取到锁了,直接返回
        if (ttl == null) {
            return true;
        }

        // 计算还剩下的等待获取锁的时间
        time -= System.currentTimeMillis() - current;
        // 如果等待获取锁的时间小于或者等于0,表示获取锁失败,直接返回 false
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }

        current = System.currentTimeMillis();
        // 订阅一个 redis 通道,当锁被释放的时候,提醒等待线程去获取锁。
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        // 等待订阅这个操作是否成功, 订阅不成功,取消订阅,并返回获取锁失败
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            // 取消订阅
            if (!subscribeFuture.cancel(false)) {
                // 取消订阅成功,就需要调用 unsubscribe(subscribeFuture, threadId) 方法,真实地取消订阅
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            // 计算还剩下的等待获取锁的时间
            time -= System.currentTimeMillis() - current;
            // 如果等待获取锁的时间小于或者等于0,表示获取锁失败,直接返回 false
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }

            // 死循环等待获取锁
            while (true) {
                long currentTime = System.currentTimeMillis();
                // 尝试获取锁,并得到锁剩余过期时间
                ttl = tryAcquire(leaseTime, unit, threadId);
                // 如果 ttl 等于 null,表示成功获取到锁
                if (ttl == null) {
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                // 如果等待获取锁的时间小于或者等于0,表示获取锁失败,直接返回 false
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                // 比较剩余等待获取锁的时间,和锁剩余过期时间大小,取较小的值作为等待时间。
                // 当然这个 tryAcquire 可以被提前唤醒
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= System.currentTimeMillis() - currentTime;
                // 如果等待获取锁的时间小于或者等于0,表示获取锁失败,直接返回 false
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            // 取消订阅
            unsubscribe(subscribeFuture, threadId);
        }
    }

方法主要流程:

  1. 通过 tryAcquire(leaseTime, unit, threadId) 方法,尝试获取锁,如果获取到锁,直接返回。
  2. 通过 subscribe(threadId) 方法,进行订阅。当获取锁的线程释放锁的时候,发送通知,通知等待锁的线程去获取锁。
  3. 死循环,在等待的时间里,尝试获取锁。

重点注意步骤2,通过订阅的方式,当锁被释放的时候,可以及时有效地通知等待获取锁的线程去争抢锁。

2.3 lock 方法

    @Override
    public void lock() {
        try {
            lock(-1, null, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }

    @Override
    public void lock(long leaseTime, TimeUnit unit) {
        try {
            lock(leaseTime, unit, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }


    @Override
    public void lockInterruptibly() throws InterruptedException {
        lock(-1, null, true);
    }

    @Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        lock(leaseTime, unit, true);
    }


private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        // 获取锁 key 过期时间
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // 如果过期时间是 null,表示获取到锁了,直接返回
        if (ttl == null) {
            return;
        }

        // 订阅一个 redis 通道,当锁被释放的时候,提醒等待线程去获取锁。
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        // 等待订阅成功
        commandExecutor.syncSubscription(future);

        try {
            // 死循环等待获取锁
            while (true) {
                // 获取锁 key 过期时间
                ttl = tryAcquire(leaseTime, unit, threadId);
                // 如果过期时间是 null,表示获取到锁了,直接返回
                if (ttl == null) {
                    break;
                }

                // 锁 key 过期时间
                if (ttl >= 0) {
                    // 等待 ttl 时间获取锁
                    try {
                        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    // 一直等待获取锁
                    if (interruptibly) {
                        getEntry(threadId).getLatch().acquire();
                    } else {
                        getEntry(threadId).getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            // 取消订阅
            unsubscribe(future, threadId);
        }
    }

这个方法的流程与 tryLock(long waitTime, long leaseTime, TimeUnit unit) 方法基本相同。

2.4 tryAcquireAsync 方法

    // 获取锁过期时间
    private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(leaseTime, unit, threadId));
    }


    // 返回获取锁过期时间的 RFuture
    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
        // 锁持有时间 leaseTime 有值
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
       // 持有锁的时间是不限制的
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            // 获取锁过程出错,就直接返回
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining == null) {
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }

这个方法与 tryAcquireOnceAsync 方法的区别,就是一个获取锁过期时间,一个是能否获取锁。即 获取锁过期时间 为 null 表示获取到锁,其他表示没有获取到锁。

2.5 tryLockInnerAsync 方法

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

获取锁最终都会调用这个方法,通过 lua 脚本与 redis 进行交互,来实现分布式锁。
首先分析,传给 lua 脚本的参数:

  1. Collections.<Object>singletonList(getName()) : 表示这个 lua 脚本的 key 只有一个, KEYS[1] 就是 getName() 返回值,即锁名。
  2. lua 脚本参数值有两个,ARGV[1] 就是锁过期时间 internalLockLeaseTime 的毫秒值, ARGV[2] 就是 getLockName(threadId)

protected String getLockName(long threadId) { return id + ":" + threadId; }
其中 id 是唯一的 UUID 值,threadId 线程唯一 id 值。

lua 脚本的流程:

  1. 第一个 if 判断,是判断锁是否存在,如果不存在,那么就创建锁,并设置过期时间,返回 null。

这里使用的 redis 数据结构hash ,而不是 string, 是为了实现可重入锁。当获取锁的线程再次获取锁的时候,这里就会将对应的值加 1

  1. 如果锁存在,那么就判断是不是当前线程获取的锁,通过判断 hash 储存的是不是当前线程 key 的值。如果是,那么就对应的值加 1,表示再一次获取锁,重新设置过期时间并返回 null。当释放锁的时候,就需要这个值是 0 ,才表示完全释放锁了。

  2. 如果上面两种情况都不是,表示获取锁失败,返回锁还剩余的过期时间。

2.6 定时延时过程

为了实现无限制持有锁,那么就需要定时刷新锁的过期时间。

2.6.1 ExpirationEntry 类

    public static class ExpirationEntry {

        // 获取锁的线程集合。一般情况下,只会有一个线程获取锁,集合中只有一个值,键所对应的值是这个线程获取锁的次数,即重入锁多少次
        private final Map<Long, Integer> threadIds = new LinkedHashMap<>();
        // 超时动作。它是当锁被释放的时候,用来取消定时的
        private volatile Timeout timeout;

        public ExpirationEntry() {
            super();
        }

        // 添加线程id, counter 表示线程重入锁次数
        public void addThreadId(long threadId) {
            Integer counter = threadIds.get(threadId);
            if (counter == null) {
                counter = 1;
            } else {
                counter++;
            }
            threadIds.put(threadId, counter);
        }
        
        public boolean hasNoThreads() {
            return threadIds.isEmpty();
        }
        
        // 获取集合中第一个线程id
        public Long getFirstThreadId() {
            if (threadIds.isEmpty()) {
                return null;
            }
            return threadIds.keySet().iterator().next();
        }
        
        // 从集合中移除线程
        public void removeThreadId(long threadId) {
            Integer counter = threadIds.get(threadId);
            if (counter == null) {
                return;
            }
            counter--;
            if (counter == 0) {
                threadIds.remove(threadId);
            } else {
                threadIds.put(threadId, counter);
            }
        }


        public void setTimeout(Timeout timeout) {
            this.timeout = timeout;
        }
        public Timeout getTimeout() {
            return timeout;
        }

    }

这个类最重要的是两个成员属性:

  1. Map<Long, Integer> threadIds : 用来记录获取锁的线程集合,key 表示线程 id , value 表示这个线程重入锁的次数。

这里的确很奇怪,一般只会有一个线程能获取到锁,这里用集合记录有点让人疑惑。

  1. Timeout timeout : 这个类其实表示一个超时动作。如果没有在超时时间前,调用 timeoutcancel 取消方法,那么这个超时动作就会执行。这里的作用,主要是当锁被释放的时候,用来取消超时动作。

2.6.2 scheduleExpirationRenewal 方法

    // 记录锁对应 ExpirationEntry 的集合,  `key` 是 `entryName` 值,即 `id + ":" + name`
    private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();

    // 定时刷新过期时间
    private void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        // 使用的是并发集合 ConcurrentHashMap, 如果oldEntry有值,表示有其他线程已经创建 entry,并存入集合中了。
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
            // 这里不需要调用 renewExpiration, 因为在其他线程中已经调用了
        } else {
            entry.addThreadId(threadId);
            // 调用刷新过期时间的方法
            renewExpiration();
        }
    }

使用一个静态并发集合 EXPIRATION_RENEWAL_MAP 来存储所有锁对应的 ExpirationEntry ,当有新的 ExpirationEntry 并存入到 EXPIRATION_RENEWAL_MAP 集合中时,需要调用 renewExpiration 方法,来刷新过期时间。

2.6.3 renewExpiration 方法


    // 刷新过期时间
    private void renewExpiration() {
        // 通过锁的 entryName 来获取 ExpirationEntry
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        // 如果 ExpirationEntry 为空,表示这个锁已经被释放了,不需要刷新过期时间
        if (ee == null) {
            return;
        }

        // 开始超时动作,超时时间是 internalLockLeaseTime / 3,即看门狗时间的 1/3
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                // 通过锁的 entryName 来获取 ExpirationEntry
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                // 如果 ExpirationEntry 为空,表示这个锁已经被释放了,不需要刷新过期时间
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                // 如果线程 id 为空,那么也表示这个锁已经被释放了
                if (threadId == null) {
                    return;
                }

                // 刷新过期时间
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }

                    if (res) {
                        // 调用renewExpiration, 再次重新刷新过期时间,达到定时的效果
                        renewExpiration();
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        // 设置超时动作
        ee.setTimeout(task);
    }

创建一个超时任务 Timeout task ,超时时间是 internalLockLeaseTime / 3, 过了这个时间,即调用 renewExpirationAsync(threadId) 方法,来刷新锁的过期时间。

2.6.4 renewExpirationAsync 方法

    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "return 0;",
            Collections.<Object>singletonList(getName()), 
            internalLockLeaseTime, getLockName(threadId));
    }

判断如果是当前线程持有的锁,那么就重新设置过期时间,并返回 1true 。否则返回 0false

2.7 unlock 释放锁

    @Override
    public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
}

    @Override
    public RFuture<Void> unlockAsync() {
        long threadId = Thread.currentThread().getId();
        return unlockAsync(threadId);
    }

    @Override
    public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise<Void>();
        // 调用 redis 方法,释放锁
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        future.onComplete((opStatus, e) -> {
            // 释放锁失败
            if (e != null) {
                // 取消定时延时
                cancelExpirationRenewal(threadId);
                // 设置结果值失败
                result.tryFailure(e);
                return;
            }
            
            // 锁不是当前线程持有的,无法释放
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                result.tryFailure(cause);
                return;
            }

            // 取消定时延时
            cancelExpirationRenewal(threadId);
            result.trySuccess(null);
        });

        return result;
    }

通过调用 unlockInnerAsync(threadId) 来删除 redis 中的 key 来释放锁。特别注意一点,当不是持有锁的线程释放锁时引起的失败,不需要调用 cancelExpirationRenewal 方法,取消定时,因为锁还是被其他线程持有。

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

    }

传给这个 lua 脚本的值:

  1. KEYS 的值有两个:getName() 锁名和 getChannelName() 订阅渠道名
  2. ARGV 的值有三个: LockPubSub.UNLOCK_MESSAGE 表示锁释放的信息, internalLockLeaseTime 锁过期时间, getLockName(threadId) 当前线程对应的锁名。

这个 lua 脚本的流程:

  1. 先判断当前线程是否持有锁,如果不持有锁,那么直接返回 null ,表示不是当前线程持有锁,无法释放。
  2. 将当前线程对应的可重入次数减一。
  3. 判断可重入次数 counter , 如果大于 0 表示锁还有在使用,重置过期时间,并返回 0false,释放锁失败。如果等于 0 表示锁已经没人使用了,删除对应的 key ,并发送释放锁的通知,让等待锁的线程去重新获取锁,最后返回 1true ,释放锁成功。
    void cancelExpirationRenewal(Long threadId) {
        ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        // 锁已经被释放了
        if (task == null) {
            return;
        }

        if (threadId != null) {
            task.removeThreadId(threadId);
        }

        // threadId == null 表示想要强制删除延时刷新动作
        // task.hasNoThreads() 表示锁已经被释放了
        if (threadId == null || task.hasNoThreads()) {
            task.getTimeout().cancel();
            EXPIRATION_RENEWAL_MAP.remove(getEntryName());
        }
    }

2.7 订阅操作

    protected RFuture<RedissonLockEntry> subscribe(long threadId) {
        return pubSub.subscribe(getEntryName(), getChannelName());
    }

调用了 LockPubSubsubscribe 进行订阅。

2.7.1 subscribe(String entryName, String channelName) 方法

public RFuture<E> subscribe(String entryName, String channelName) {
        
        AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
        // 获取异步操作信号量 AsyncSemaphore, 根据订阅名字从数组中获取,同一个订阅名,获取的是同一个
        AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
        // 表示订阅的未来结果值
        RPromise<E> newPromise = new RedissonPromise<E>() {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                // 如果取消订阅操作,那么这里就要删除异步操作信号量中对应的回调 list
                return semaphore.remove(listenerHolder.get());
            }
        };

        Runnable listener = new Runnable() {

            @Override
            public void run() {
                // 通过 entryName 获取对应 RedissonLockEntry。 entryName 是唯一的
                E entry = entries.get(entryName);
                // 如果不为空,表示已经进行过订阅操作了
                if (entry != null) {
                    // 表示等待获取锁的线程数量加一
                    entry.aquire();
                    // 调用 semaphore.release(),让 semaphore 中一个 listener 回调执行
                    semaphore.release();
                    // 将 newPromise 添加到 Promise 完成回调链条中,所以当 entry.getPromise() 设置结果值后,
                    // 整个调用链条都会调用,类似责任链模式
                    entry.getPromise().onComplete(new TransferListener<E>(newPromise));
                    return;
                }

                // 创建 RedissonLockEntry
                E value = createEntry(newPromise);
                // 待获取锁的线程数量加一
                value.aquire();

                // 存入集合中
                E oldValue = entries.putIfAbsent(entryName, value);
                // 如果 oldValue 不为空,表示有其他线程已经提前将 oldValue 存入集合中了。
                // 那么就与 entry != null 时的操作一样了
                if (oldValue != null) {
                    oldValue.aquire();
                    semaphore.release();
                    oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));
                    return;
                }

                // 创建订阅回调
                RedisPubSubListener<Object> listener = createListener(channelName, value);
                // 发送 `redis` 的命令,进行订阅
                service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
            }
        };
        // 添加回调操作
        semaphore.acquire(listener);
        listenerHolder.set(listener);

        return newPromise;
    }

这个方法的作用就是向 redis 发起订阅,但是对于同一个锁的同一个客户端(即 一个 jvm 系统) 只会发起一次订阅,同一个客户端的其他等待同一个锁的线程会记录在 RedissonLockEntry 中。

方法流程:

  1. AsyncSemaphore semaphore : 异步操作信号量, 它的作用是控制并发量,这里是让订阅操作串行执行,等待前一个执行完成,才调用 listener 回调。
  2. Runnable listener :回调,被 semaphore 控制
  3. semaphore.acquire(listener): 添加到 semaphore 控制中。
  4. 通过 entries.get(entryName 获取 RedissonLockEntry 。如果不为空,表示已经有线程发起订阅了,那么只需要通过 entry.aquire() 方法,将等待获取锁的线程数量加一。
  5. 创建 RedissonLockEntry 实例并存放到 entries 中,如果存放失败,也表示 有线程发起订阅了。
  6. 通过 service.subscribe 方法,发起订阅。

2.7.2 AsyncSemaphore 中重要方法

    public void acquire(Runnable listener) {
        acquire(listener, 1);
    }
    
    public void acquire(Runnable listener, int permits) {
        boolean run = false;
        
        synchronized (this) {
            // 当 counter 小于 permits 时,那么 listener 回调就会等待
            if (counter < permits) {
                listeners.add(new Entry(listener, permits));
                return;
            } else {  
                counter -= permits;
                run = true;
            }
        }
        
        if (run) {
            // 回调运行
            listener.run();
        }
    }

只有当 counter >= permits 的时候,回调 listener 才会运行,起到控制 listener 运行的效果。

    public void release() {
        Entry entryToAcquire = null;
        
        synchronized (this) { 
            // 添加 counter 数量
            counter++;
            Iterator<Entry> iter = listeners.iterator();
            if (iter.hasNext()) {
                Entry entry = iter.next();
                // 表示 entry 可以运行了
                if (entry.getPermits() <= counter) {
                    iter.remove();
                    entryToAcquire = entry;
                }
            }
        }
        
        if (entryToAcquire != null) {
           // 重新尝试运行
            acquire(entryToAcquire.getRunnable(), entryToAcquire.getPermits());
        }
    }

释放一个控制量,让其中一个回调 listener 能够运行。

2.7.3 RedissonLockEntry 类

public class RedissonLockEntry implements PubSubEntry<RedissonLockEntry> {

    // 表示等待锁线程的数量
    private int counter;

   // 用来控制等待锁的信号量
    private final Semaphore latch;
   // 表示订阅成功的回调
    private final RPromise<RedissonLockEntry> promise;
    // 表示接受到释放锁通知后,可以运行的 Runnable 集合
    private final ConcurrentLinkedQueue<Runnable> listeners = new ConcurrentLinkedQueue<Runnable>();

    public RedissonLockEntry(RPromise<RedissonLockEntry> promise) {
        super();
        this.latch = new Semaphore(0);
        this.promise = promise;
    }

    public void aquire() {
        counter++;
    }

    public int release() {
        return --counter;
    }

    public RPromise<RedissonLockEntry> getPromise() {
        return promise;
    }

    public void addListener(Runnable listener) {
        listeners.add(listener);
    }

    public boolean removeListener(Runnable listener) {
        return listeners.remove(listener);
    }

    public ConcurrentLinkedQueue<Runnable> getListeners() {
        return listeners;
    }

    public Semaphore getLatch() {
        return latch;
    }
}

主要属性:

  1. counter:表示等待锁线程的数量
  2. latch: 用来控制等待锁的信号量

我们在 RedissonLocklocktryLock 方法中,都调用了 RedissonLockEntrylatch 变量的 tryAcquire 或者 acquire 方法,进入阻塞等待状态。
只有等到 LockPubSub 实例的 onMessage 方法,收到 redis 订阅的消息,会调用这个 latch 变量的 release() 方法,唤醒等待线程。

  1. promise:表示订阅成功的回调
  2. listeners:表示接受到释放锁通知后,可以运行的 Runnable 集合

2.7.4 LockPubSub 类

public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {

    // 释放锁
    public static final Long UNLOCK_MESSAGE = 0L;
   // 释放只读锁,这个只有在读写锁中有用
    public static final Long READ_UNLOCK_MESSAGE = 1L;

    public LockPubSub(PublishSubscribeService service) {
        super(service);
    }
    
    @Override
    protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
        return new RedissonLockEntry(newPromise);
    }

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
       // 收到释放锁的通知
        if (message.equals(UNLOCK_MESSAGE)) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute != null) {
                runnableToExecute.run();
            }
            // 唤醒等待线程
            value.getLatch().release();
        } else if (message.equals(READ_UNLOCK_MESSAGE)) {
            while (true) {
                Runnable runnableToExecute = value.getListeners().poll();
                if (runnableToExecute == null) {
                    break;
                }
                runnableToExecute.run();
            }
           // 唤醒等待线程
            value.getLatch().release(value.getLatch().getQueueLength());
        }
    }

}

2.8 示例

2.8.1 简单获取锁

    public static void main(String[] args) throws InterruptedException {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://localhost:6379");
        RedissonClient redissonClient = Redisson.create(config);

        RLock rLock = redissonClient.getLock("lock");
        System.out.println("获取锁前   thread:"+Thread.currentThread().getName());
        boolean isLock = rLock.tryLock();
        if (isLock) {
            System.out.println("已经获取锁  thread:"+Thread.currentThread().getName());
            Thread.sleep(1000 * 15);
            rLock.unlock();
        }
        System.out.println("释放锁    thread:"+Thread.currentThread().getName());
    }

这个过程对应的 redis 中监控的命令日志:

// 获取锁
1616134801.467569 [0 127.0.0.1:63105] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "lock" "30000" "35fa1302-d412-422b-9507-3224f8118996:1"
1616134801.467687 [0 lua] "exists" "lock"
1616134801.467697 [0 lua] "hset" "lock" "35fa1302-d412-422b-9507-3224f8118996:1" "1"
1616134801.467712 [0 lua] "pexpire" "lock" "30000"

// 重新刷新锁的过期时间
1616134811.508013 [0 127.0.0.1:63087] "EVAL" "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;" "1" "lock" "30000" "35fa1302-d412-422b-9507-3224f8118996:1"
1616134811.508106 [0 lua] "hexists" "lock" "35fa1302-d412-422b-9507-3224f8118996:1"
1616134811.508129 [0 lua] "pexpire" "lock" "30000"

//  释放锁
1616134816.500123 [0 127.0.0.1:63083] "EVAL" "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "lock" "redisson_lock__channel:{lock}" "0" "30000" "35fa1302-d412-422b-9507-3224f8118996:1"
1616134816.500278 [0 lua] "hexists" "lock" "35fa1302-d412-422b-9507-3224f8118996:1"
1616134816.500305 [0 lua] "hincrby" "lock" "35fa1302-d412-422b-9507-3224f8118996:1" "-1"
1616134816.500329 [0 lua] "del" "lock"
1616134816.500367 [0 lua] "publish" "redisson_lock__channel:{lock}" "0"

因为看门狗的默认时间是 30 秒,而定时刷新程序的时间是看门狗时间的 1/310 秒钟,示例程序休眠了 15 秒,导致触发了刷新锁的过期时间操作。

2.8.2 多个线程竞争锁

  public static void main(String[] args) throws InterruptedException {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://localhost:6379");
        RedissonClient redissonClient = Redisson.create(config);

        for (int index = 0; index < 2; index++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    RLock rLock = redissonClient.getLock("lock");
                    System.out.println("获取锁前   thread:"+Thread.currentThread().getId());
                    boolean isLock = false;
                    try {
                        isLock = rLock.tryLock(10, TimeUnit.SECONDS);
                        if (isLock) {
                            System.out.println("已经获取锁  thread:"+Thread.currentThread().getId());
                            Thread.sleep(15);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        if (isLock) {
                            rLock.unlock();
                        }
                    }
                    System.out.println("释放锁    thread:"+Thread.currentThread().getId());
                }
            }, "t_"+index).start();
        }

    }

注意 rLock.tryLock(10, TimeUnit.SECONDS); 时间要设置大一点,如果等待时间太短,小于获取锁 redis 命令的时间,那么就直接返回获取锁失败了。

// 线程 id 为 45 的线程获取到锁了
1616138978.155253 [0 127.0.0.1:51236] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "lock" "30000" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:45"
1616138978.155351 [0 lua] "exists" "lock"
1616138978.155361 [0 lua] "hset" "lock" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:45" "1"
1616138978.155377 [0 lua] "pexpire" "lock" "30000"

// 线程 44 没有获取到锁
1616138978.156080 [0 127.0.0.1:51232] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "lock" "30000" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:44"
1616138978.156171 [0 lua] "exists" "lock"
1616138978.156178 [0 lua] "hexists" "lock" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:44"
1616138978.156191 [0 lua] "pttl" "lock"

// 线程 44 开启的 redis 的订阅
1616138978.171950 [0 127.0.0.1:51222] "SUBSCRIBE" "redisson_lock__channel:{lock}"

// 线程 44 再一次准备获取锁失败
1616138978.177573 [0 127.0.0.1:51217] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "lock" "30000" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:44"
1616138978.177631 [0 lua] "exists" "lock"
1616138978.177637 [0 lua] "hexists" "lock" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:44"
1616138978.177645 [0 lua] "pttl" "lock"

// 线程 45 释放锁
1616138978.182198 [0 127.0.0.1:51235] "EVAL" "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "lock" "redisson_lock__channel:{lock}" "0" "30000" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:45"
1616138978.182312 [0 lua] "hexists" "lock" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:45"
1616138978.182331 [0 lua] "hincrby" "lock" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:45" "-1"
1616138978.182348 [0 lua] "del" "lock"
1616138978.182356 [0 lua] "publish" "redisson_lock__channel:{lock}" "0"

// 线程 44 获取到锁
1616138978.186110 [0 127.0.0.1:51234] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "lock" "30000" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:44"
1616138978.186221 [0 lua] "exists" "lock"
1616138978.186233 [0 lua] "hset" "lock" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:44" "1"
1616138978.186251 [0 lua] "pexpire" "lock" "30000"

// 取消订阅,因为没有等到获取锁的线程了
1616138978.188411 [0 127.0.0.1:51222] "UNSUBSCRIBE" "redisson_lock__channel:{lock}"

// 线程 44 释放锁
1616138978.221926 [0 127.0.0.1:51218] "EVAL" "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "lock" "redisson_lock__channel:{lock}" "0" "30000" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:44"
1616138978.222354 [0 lua] "hexists" "lock" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:44"
1616138978.222372 [0 lua] "hincrby" "lock" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:44" "-1"
1616138978.222385 [0 lua] "del" "lock"
1616138978.222391 [0 lua] "publish" "redisson_lock__channel:{lock}" "0"

2.9 小结

分析源码我们了解 Redisson 模式的分布式,解决了锁过期时间和可重入的问题。但是针对 redis 本身可能存在的单点失败问题,其实是没有解决的。
基于这个问题,redis 作者提出了一种叫做 Redlock 算法, 但是这种算法本身也是有点问题的,想了解更多,请看 基于Redis的分布式锁到底安全吗


https://www.xamrdz.com/backend/3ms1936097.html

相关文章: