锁的作用,我想大家都理解,就是让不同的线程或者进程可以安全地操作共享资源,而不会产生冲突。
比较熟悉的就是 Synchronized
和 ReentrantLock
等,这些可以保证同一个 jvm
程序中,不同线程安全操作共享资源。
但是在分布式系统中,这种方式就失效了;由于分布式系统多线程、多进程并且分布在不同机器上,这将使单机并发控制锁策略失效,为了解决这个问题就需要一种跨 JVM
的互斥机制来控制共享资源的访问。
比较常用的分布式锁有三种实现方式:
- 基于数据库的分布式锁
- 基于
ZooKeeper
的分布式锁 - 基于
Redis
的分布式锁
本篇文章主要讲解基于 Redis
分布式锁的实现。
一. 简单的实现
分布式锁最主要的作用就是保证任意一个时刻,只有一个客户端能访问共享资源。
1.1 SET NX 命令
我们知道 redis
有 SET key value NX
命令,仅在不存在 key
的时候才能被执行成功,保证多个客户端只有一个能执行成功,相当于获取锁。
释放锁的时候,只需要删除 del key
这个 key
就行了。
上面的实现看似已经满足要求了,但是忘了考虑在分布式环境下,有以下问题:
- 获取锁的客户端意外崩溃,没有释放锁;导致其他客户端再也不能获取锁。
- 网络分裂,导致释放锁的命令没有发给
redis
, 也导致锁无法释放。
最大的问题就是因为客户端或者网络问题,导致 redis
中的 key
没有删除,锁无法释放,因此其他客户端无法获取到锁。
1.2 SET NX PX 命令
针对上面的情况,使用了下面命令:
SET key my_random_value NX PX 30000
使用 PX
的命令,给 key
添加一个自动过期时间(30秒),保证即使因为意外情况,没有调用释放锁的方法,锁也会自动释放,其他客户端仍然可以获取到锁。
注意给这个 key
设置的值 my_random_value
是一个随机值,而且必须保证这个值在客户端必须是唯一的。这个值的作用是为了更加安全地释放锁。
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
这是为了避免删除其他客户端成功获取的锁。考虑下面情况:
当客户端A获取到锁,但是客户端A执行时间超过了锁
key
过期时间,这个锁的key
自动删除了,被客户端B 重新设置了锁key
, 获取了锁 。这个时候客户端A执行完成之后,直接把客户端B的锁给释放了。
注意:这里key
的名字是一样的,就是锁名。
因此这里使用一个 my_random_value
随机值,保证客户端只会释放自己获取的锁,即只删除自己设置的 key
。
这种实现方式,存在下面问题:
- 锁的过期时间,即只要超过了设置的过期时间,其他客户端就可以获取到锁,同一时间有两个客户端访问共享资源。
- 这个锁不是一个可重入锁,同一线程没有办法多次获取锁。
- 单点失败问题。即使是主从模式的
redis
,也会存在主从复制不同步,导致可能存在的多个客户端获取到锁。
二. Redisson 的实现
上面章节介绍了,简单实现存在的问题,下面来介绍一下 Redisson
实现又是怎么解决的这些问题的。
注:下面源码是基于
redisson-3.11.3
2.1 tryLock() 方法获取锁
// 在 RedissonLock 的源码中
// 同步方法获取锁, true 表示获取到锁
@Override
public boolean tryLock() {
return get(tryLockAsync());
}
// 异步方法获取锁, 得到的 RFuture get方法返回 true 表示获取到锁
@Override
public RFuture<Boolean> tryLockAsync() {
return tryLockAsync(Thread.currentThread().getId());
}
@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
return tryAcquireOnceAsync(-1, null, threadId);
}
// 返回是否获取到锁的 RFuture
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
// 锁持有时间 leaseTime 有值
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
// 持有锁的时间是不限制的
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
// 获取锁过程出错,就直接返回
if (e != null) {
return;
}
// ttlRemaining 为true,表示获取到锁,那么就需要开启定时,给锁的过期时间不断延期,保证锁不过期。
if (ttlRemaining) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
主要关注 tryAcquireOnceAsync
方法,有三个参数:
- leaseTime 和 unit:表示锁持有时间。如果
leaseTime = -1
表示锁持有时间是不限制的,即只要不释放锁,锁永不过期。 - threadId: 表示当前线程的
id
,用来实现可重入锁的。
如何实现锁永不过期呢?
Redisson
实现了一个定时操作,在锁过期之前,不断地给锁延期过期时间。默认的时间就是commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()
看门狗的时间。
2.2 tryLock(long waitTime, long leaseTime, TimeUnit unit) 方法获取锁
// 获取锁。 最多等待 waitTime 时间
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return tryLock(waitTime, -1, unit);
}
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
// 等待获取锁的时间
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
// 线程ID
long threadId = Thread.currentThread().getId();
// 获取锁 key 过期时间
Long ttl = tryAcquire(leaseTime, unit, threadId);
// 如果过期时间是 null,表示获取到锁了,直接返回
if (ttl == null) {
return true;
}
// 计算还剩下的等待获取锁的时间
time -= System.currentTimeMillis() - current;
// 如果等待获取锁的时间小于或者等于0,表示获取锁失败,直接返回 false
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
// 订阅一个 redis 通道,当锁被释放的时候,提醒等待线程去获取锁。
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 等待订阅这个操作是否成功, 订阅不成功,取消订阅,并返回获取锁失败
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
// 取消订阅
if (!subscribeFuture.cancel(false)) {
// 取消订阅成功,就需要调用 unsubscribe(subscribeFuture, threadId) 方法,真实地取消订阅
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(threadId);
return false;
}
try {
// 计算还剩下的等待获取锁的时间
time -= System.currentTimeMillis() - current;
// 如果等待获取锁的时间小于或者等于0,表示获取锁失败,直接返回 false
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// 死循环等待获取锁
while (true) {
long currentTime = System.currentTimeMillis();
// 尝试获取锁,并得到锁剩余过期时间
ttl = tryAcquire(leaseTime, unit, threadId);
// 如果 ttl 等于 null,表示成功获取到锁
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
// 如果等待获取锁的时间小于或者等于0,表示获取锁失败,直接返回 false
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
// 比较剩余等待获取锁的时间,和锁剩余过期时间大小,取较小的值作为等待时间。
// 当然这个 tryAcquire 可以被提前唤醒
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
// 如果等待获取锁的时间小于或者等于0,表示获取锁失败,直接返回 false
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
// 取消订阅
unsubscribe(subscribeFuture, threadId);
}
}
方法主要流程:
- 通过
tryAcquire(leaseTime, unit, threadId)
方法,尝试获取锁,如果获取到锁,直接返回。 - 通过
subscribe(threadId)
方法,进行订阅。当获取锁的线程释放锁的时候,发送通知,通知等待锁的线程去获取锁。 - 死循环,在等待的时间里,尝试获取锁。
重点注意步骤2,通过订阅的方式,当锁被释放的时候,可以及时有效地通知等待获取锁的线程去争抢锁。
2.3 lock 方法
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
@Override
public void lock(long leaseTime, TimeUnit unit) {
try {
lock(leaseTime, unit, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lock(-1, null, true);
}
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
lock(leaseTime, unit, true);
}
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 获取锁 key 过期时间
Long ttl = tryAcquire(leaseTime, unit, threadId);
// 如果过期时间是 null,表示获取到锁了,直接返回
if (ttl == null) {
return;
}
// 订阅一个 redis 通道,当锁被释放的时候,提醒等待线程去获取锁。
RFuture<RedissonLockEntry> future = subscribe(threadId);
// 等待订阅成功
commandExecutor.syncSubscription(future);
try {
// 死循环等待获取锁
while (true) {
// 获取锁 key 过期时间
ttl = tryAcquire(leaseTime, unit, threadId);
// 如果过期时间是 null,表示获取到锁了,直接返回
if (ttl == null) {
break;
}
// 锁 key 过期时间
if (ttl >= 0) {
// 等待 ttl 时间获取锁
try {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
// 一直等待获取锁
if (interruptibly) {
getEntry(threadId).getLatch().acquire();
} else {
getEntry(threadId).getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 取消订阅
unsubscribe(future, threadId);
}
}
这个方法的流程与 tryLock(long waitTime, long leaseTime, TimeUnit unit)
方法基本相同。
2.4 tryAcquireAsync 方法
// 获取锁过期时间
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
// 返回获取锁过期时间的 RFuture
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
// 锁持有时间 leaseTime 有值
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 持有锁的时间是不限制的
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
// 获取锁过程出错,就直接返回
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
这个方法与 tryAcquireOnceAsync
方法的区别,就是一个获取锁过期时间,一个是能否获取锁。即 获取锁过期时间 为 null
表示获取到锁,其他表示没有获取到锁。
2.5 tryLockInnerAsync 方法
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
获取锁最终都会调用这个方法,通过 lua
脚本与 redis
进行交互,来实现分布式锁。
首先分析,传给 lua
脚本的参数:
- Collections.<Object>singletonList(getName()) : 表示这个
lua
脚本的key
只有一个,KEYS[1]
就是getName()
返回值,即锁名。 -
lua
脚本参数值有两个,ARGV[1]
就是锁过期时间internalLockLeaseTime
的毫秒值,ARGV[2]
就是getLockName(threadId)
。
protected String getLockName(long threadId) { return id + ":" + threadId; }
其中id
是唯一的UUID
值,threadId
线程唯一id
值。
lua
脚本的流程:
- 第一个
if
判断,是判断锁是否存在,如果不存在,那么就创建锁,并设置过期时间,返回 null。
这里使用的
redis
数据结构是hash
,而不是string
, 是为了实现可重入锁。当获取锁的线程再次获取锁的时候,这里就会将对应的值加1
如果锁存在,那么就判断是不是当前线程获取的锁,通过判断
hash
储存的是不是当前线程key
的值。如果是,那么就对应的值加1
,表示再一次获取锁,重新设置过期时间并返回 null。当释放锁的时候,就需要这个值是0
,才表示完全释放锁了。如果上面两种情况都不是,表示获取锁失败,返回锁还剩余的过期时间。
2.6 定时延时过程
为了实现无限制持有锁,那么就需要定时刷新锁的过期时间。
2.6.1 ExpirationEntry 类
public static class ExpirationEntry {
// 获取锁的线程集合。一般情况下,只会有一个线程获取锁,集合中只有一个值,键所对应的值是这个线程获取锁的次数,即重入锁多少次
private final Map<Long, Integer> threadIds = new LinkedHashMap<>();
// 超时动作。它是当锁被释放的时候,用来取消定时的
private volatile Timeout timeout;
public ExpirationEntry() {
super();
}
// 添加线程id, counter 表示线程重入锁次数
public void addThreadId(long threadId) {
Integer counter = threadIds.get(threadId);
if (counter == null) {
counter = 1;
} else {
counter++;
}
threadIds.put(threadId, counter);
}
public boolean hasNoThreads() {
return threadIds.isEmpty();
}
// 获取集合中第一个线程id
public Long getFirstThreadId() {
if (threadIds.isEmpty()) {
return null;
}
return threadIds.keySet().iterator().next();
}
// 从集合中移除线程
public void removeThreadId(long threadId) {
Integer counter = threadIds.get(threadId);
if (counter == null) {
return;
}
counter--;
if (counter == 0) {
threadIds.remove(threadId);
} else {
threadIds.put(threadId, counter);
}
}
public void setTimeout(Timeout timeout) {
this.timeout = timeout;
}
public Timeout getTimeout() {
return timeout;
}
}
这个类最重要的是两个成员属性:
- Map<Long, Integer> threadIds : 用来记录获取锁的线程集合,
key
表示线程id
,value
表示这个线程重入锁的次数。
这里的确很奇怪,一般只会有一个线程能获取到锁,这里用集合记录有点让人疑惑。
- Timeout timeout : 这个类其实表示一个超时动作。如果没有在超时时间前,调用
timeout
的cancel
取消方法,那么这个超时动作就会执行。这里的作用,主要是当锁被释放的时候,用来取消超时动作。
2.6.2 scheduleExpirationRenewal 方法
// 记录锁对应 ExpirationEntry 的集合, `key` 是 `entryName` 值,即 `id + ":" + name`
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
// 定时刷新过期时间
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
// 使用的是并发集合 ConcurrentHashMap, 如果oldEntry有值,表示有其他线程已经创建 entry,并存入集合中了。
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
// 这里不需要调用 renewExpiration, 因为在其他线程中已经调用了
} else {
entry.addThreadId(threadId);
// 调用刷新过期时间的方法
renewExpiration();
}
}
使用一个静态并发集合 EXPIRATION_RENEWAL_MAP
来存储所有锁对应的 ExpirationEntry
,当有新的 ExpirationEntry
并存入到 EXPIRATION_RENEWAL_MAP
集合中时,需要调用 renewExpiration
方法,来刷新过期时间。
2.6.3 renewExpiration 方法
// 刷新过期时间
private void renewExpiration() {
// 通过锁的 entryName 来获取 ExpirationEntry
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
// 如果 ExpirationEntry 为空,表示这个锁已经被释放了,不需要刷新过期时间
if (ee == null) {
return;
}
// 开始超时动作,超时时间是 internalLockLeaseTime / 3,即看门狗时间的 1/3
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 通过锁的 entryName 来获取 ExpirationEntry
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
// 如果 ExpirationEntry 为空,表示这个锁已经被释放了,不需要刷新过期时间
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
// 如果线程 id 为空,那么也表示这个锁已经被释放了
if (threadId == null) {
return;
}
// 刷新过期时间
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// 调用renewExpiration, 再次重新刷新过期时间,达到定时的效果
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
// 设置超时动作
ee.setTimeout(task);
}
创建一个超时任务 Timeout task
,超时时间是 internalLockLeaseTime / 3
, 过了这个时间,即调用 renewExpirationAsync(threadId)
方法,来刷新锁的过期时间。
2.6.4 renewExpirationAsync 方法
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
判断如果是当前线程持有的锁,那么就重新设置过期时间,并返回 1
即 true
。否则返回 0
即 false
。
2.7 unlock 释放锁
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
@Override
public RFuture<Void> unlockAsync() {
long threadId = Thread.currentThread().getId();
return unlockAsync(threadId);
}
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
// 调用 redis 方法,释放锁
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
// 释放锁失败
if (e != null) {
// 取消定时延时
cancelExpirationRenewal(threadId);
// 设置结果值失败
result.tryFailure(e);
return;
}
// 锁不是当前线程持有的,无法释放
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
// 取消定时延时
cancelExpirationRenewal(threadId);
result.trySuccess(null);
});
return result;
}
通过调用 unlockInnerAsync(threadId)
来删除 redis
中的 key
来释放锁。特别注意一点,当不是持有锁的线程释放锁时引起的失败,不需要调用 cancelExpirationRenewal
方法,取消定时,因为锁还是被其他线程持有。
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
传给这个 lua
脚本的值:
-
KEYS
的值有两个:getName()
锁名和getChannelName()
订阅渠道名 -
ARGV
的值有三个:LockPubSub.UNLOCK_MESSAGE
表示锁释放的信息,internalLockLeaseTime
锁过期时间,getLockName(threadId)
当前线程对应的锁名。
这个 lua
脚本的流程:
- 先判断当前线程是否持有锁,如果不持有锁,那么直接返回
null
,表示不是当前线程持有锁,无法释放。 - 将当前线程对应的可重入次数减一。
- 判断可重入次数
counter
, 如果大于0
表示锁还有在使用,重置过期时间,并返回0
即false
,释放锁失败。如果等于0
表示锁已经没人使用了,删除对应的key
,并发送释放锁的通知,让等待锁的线程去重新获取锁,最后返回1
即true
,释放锁成功。
void cancelExpirationRenewal(Long threadId) {
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
// 锁已经被释放了
if (task == null) {
return;
}
if (threadId != null) {
task.removeThreadId(threadId);
}
// threadId == null 表示想要强制删除延时刷新动作
// task.hasNoThreads() 表示锁已经被释放了
if (threadId == null || task.hasNoThreads()) {
task.getTimeout().cancel();
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
2.7 订阅操作
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return pubSub.subscribe(getEntryName(), getChannelName());
}
调用了 LockPubSub
的 subscribe
进行订阅。
2.7.1 subscribe(String entryName, String channelName) 方法
public RFuture<E> subscribe(String entryName, String channelName) {
AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
// 获取异步操作信号量 AsyncSemaphore, 根据订阅名字从数组中获取,同一个订阅名,获取的是同一个
AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
// 表示订阅的未来结果值
RPromise<E> newPromise = new RedissonPromise<E>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// 如果取消订阅操作,那么这里就要删除异步操作信号量中对应的回调 list
return semaphore.remove(listenerHolder.get());
}
};
Runnable listener = new Runnable() {
@Override
public void run() {
// 通过 entryName 获取对应 RedissonLockEntry。 entryName 是唯一的
E entry = entries.get(entryName);
// 如果不为空,表示已经进行过订阅操作了
if (entry != null) {
// 表示等待获取锁的线程数量加一
entry.aquire();
// 调用 semaphore.release(),让 semaphore 中一个 listener 回调执行
semaphore.release();
// 将 newPromise 添加到 Promise 完成回调链条中,所以当 entry.getPromise() 设置结果值后,
// 整个调用链条都会调用,类似责任链模式
entry.getPromise().onComplete(new TransferListener<E>(newPromise));
return;
}
// 创建 RedissonLockEntry
E value = createEntry(newPromise);
// 待获取锁的线程数量加一
value.aquire();
// 存入集合中
E oldValue = entries.putIfAbsent(entryName, value);
// 如果 oldValue 不为空,表示有其他线程已经提前将 oldValue 存入集合中了。
// 那么就与 entry != null 时的操作一样了
if (oldValue != null) {
oldValue.aquire();
semaphore.release();
oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));
return;
}
// 创建订阅回调
RedisPubSubListener<Object> listener = createListener(channelName, value);
// 发送 `redis` 的命令,进行订阅
service.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
}
};
// 添加回调操作
semaphore.acquire(listener);
listenerHolder.set(listener);
return newPromise;
}
这个方法的作用就是向 redis
发起订阅,但是对于同一个锁的同一个客户端(即 一个 jvm
系统) 只会发起一次订阅,同一个客户端的其他等待同一个锁的线程会记录在 RedissonLockEntry
中。
方法流程:
- AsyncSemaphore semaphore : 异步操作信号量, 它的作用是控制并发量,这里是让订阅操作串行执行,等待前一个执行完成,才调用
listener
回调。 - Runnable listener :回调,被
semaphore
控制 - semaphore.acquire(listener): 添加到
semaphore
控制中。 - 通过
entries.get(entryName
获取RedissonLockEntry
。如果不为空,表示已经有线程发起订阅了,那么只需要通过entry.aquire()
方法,将等待获取锁的线程数量加一。 - 创建
RedissonLockEntry
实例并存放到entries
中,如果存放失败,也表示 有线程发起订阅了。 - 通过
service.subscribe
方法,发起订阅。
2.7.2 AsyncSemaphore 中重要方法
public void acquire(Runnable listener) {
acquire(listener, 1);
}
public void acquire(Runnable listener, int permits) {
boolean run = false;
synchronized (this) {
// 当 counter 小于 permits 时,那么 listener 回调就会等待
if (counter < permits) {
listeners.add(new Entry(listener, permits));
return;
} else {
counter -= permits;
run = true;
}
}
if (run) {
// 回调运行
listener.run();
}
}
只有当 counter >= permits
的时候,回调 listener
才会运行,起到控制 listener
运行的效果。
public void release() {
Entry entryToAcquire = null;
synchronized (this) {
// 添加 counter 数量
counter++;
Iterator<Entry> iter = listeners.iterator();
if (iter.hasNext()) {
Entry entry = iter.next();
// 表示 entry 可以运行了
if (entry.getPermits() <= counter) {
iter.remove();
entryToAcquire = entry;
}
}
}
if (entryToAcquire != null) {
// 重新尝试运行
acquire(entryToAcquire.getRunnable(), entryToAcquire.getPermits());
}
}
释放一个控制量,让其中一个回调 listener
能够运行。
2.7.3 RedissonLockEntry 类
public class RedissonLockEntry implements PubSubEntry<RedissonLockEntry> {
// 表示等待锁线程的数量
private int counter;
// 用来控制等待锁的信号量
private final Semaphore latch;
// 表示订阅成功的回调
private final RPromise<RedissonLockEntry> promise;
// 表示接受到释放锁通知后,可以运行的 Runnable 集合
private final ConcurrentLinkedQueue<Runnable> listeners = new ConcurrentLinkedQueue<Runnable>();
public RedissonLockEntry(RPromise<RedissonLockEntry> promise) {
super();
this.latch = new Semaphore(0);
this.promise = promise;
}
public void aquire() {
counter++;
}
public int release() {
return --counter;
}
public RPromise<RedissonLockEntry> getPromise() {
return promise;
}
public void addListener(Runnable listener) {
listeners.add(listener);
}
public boolean removeListener(Runnable listener) {
return listeners.remove(listener);
}
public ConcurrentLinkedQueue<Runnable> getListeners() {
return listeners;
}
public Semaphore getLatch() {
return latch;
}
}
主要属性:
- counter:表示等待锁线程的数量
- latch: 用来控制等待锁的信号量
我们在
RedissonLock
的lock
和tryLock
方法中,都调用了RedissonLockEntry
的latch
变量的tryAcquire
或者acquire
方法,进入阻塞等待状态。
只有等到LockPubSub
实例的onMessage
方法,收到redis
订阅的消息,会调用这个latch
变量的release()
方法,唤醒等待线程。
- promise:表示订阅成功的回调
- listeners:表示接受到释放锁通知后,可以运行的 Runnable 集合
2.7.4 LockPubSub 类
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
// 释放锁
public static final Long UNLOCK_MESSAGE = 0L;
// 释放只读锁,这个只有在读写锁中有用
public static final Long READ_UNLOCK_MESSAGE = 1L;
public LockPubSub(PublishSubscribeService service) {
super(service);
}
@Override
protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
return new RedissonLockEntry(newPromise);
}
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
// 收到释放锁的通知
if (message.equals(UNLOCK_MESSAGE)) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// 唤醒等待线程
value.getLatch().release();
} else if (message.equals(READ_UNLOCK_MESSAGE)) {
while (true) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
runnableToExecute.run();
}
// 唤醒等待线程
value.getLatch().release(value.getLatch().getQueueLength());
}
}
}
2.8 示例
2.8.1 简单获取锁
public static void main(String[] args) throws InterruptedException {
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379");
RedissonClient redissonClient = Redisson.create(config);
RLock rLock = redissonClient.getLock("lock");
System.out.println("获取锁前 thread:"+Thread.currentThread().getName());
boolean isLock = rLock.tryLock();
if (isLock) {
System.out.println("已经获取锁 thread:"+Thread.currentThread().getName());
Thread.sleep(1000 * 15);
rLock.unlock();
}
System.out.println("释放锁 thread:"+Thread.currentThread().getName());
}
这个过程对应的 redis
中监控的命令日志:
// 获取锁
1616134801.467569 [0 127.0.0.1:63105] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "lock" "30000" "35fa1302-d412-422b-9507-3224f8118996:1"
1616134801.467687 [0 lua] "exists" "lock"
1616134801.467697 [0 lua] "hset" "lock" "35fa1302-d412-422b-9507-3224f8118996:1" "1"
1616134801.467712 [0 lua] "pexpire" "lock" "30000"
// 重新刷新锁的过期时间
1616134811.508013 [0 127.0.0.1:63087] "EVAL" "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;" "1" "lock" "30000" "35fa1302-d412-422b-9507-3224f8118996:1"
1616134811.508106 [0 lua] "hexists" "lock" "35fa1302-d412-422b-9507-3224f8118996:1"
1616134811.508129 [0 lua] "pexpire" "lock" "30000"
// 释放锁
1616134816.500123 [0 127.0.0.1:63083] "EVAL" "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "lock" "redisson_lock__channel:{lock}" "0" "30000" "35fa1302-d412-422b-9507-3224f8118996:1"
1616134816.500278 [0 lua] "hexists" "lock" "35fa1302-d412-422b-9507-3224f8118996:1"
1616134816.500305 [0 lua] "hincrby" "lock" "35fa1302-d412-422b-9507-3224f8118996:1" "-1"
1616134816.500329 [0 lua] "del" "lock"
1616134816.500367 [0 lua] "publish" "redisson_lock__channel:{lock}" "0"
因为看门狗的默认时间是 30
秒,而定时刷新程序的时间是看门狗时间的 1/3
即 10
秒钟,示例程序休眠了 15
秒,导致触发了刷新锁的过期时间操作。
2.8.2 多个线程竞争锁
public static void main(String[] args) throws InterruptedException {
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379");
RedissonClient redissonClient = Redisson.create(config);
for (int index = 0; index < 2; index++) {
new Thread(new Runnable() {
@Override
public void run() {
RLock rLock = redissonClient.getLock("lock");
System.out.println("获取锁前 thread:"+Thread.currentThread().getId());
boolean isLock = false;
try {
isLock = rLock.tryLock(10, TimeUnit.SECONDS);
if (isLock) {
System.out.println("已经获取锁 thread:"+Thread.currentThread().getId());
Thread.sleep(15);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (isLock) {
rLock.unlock();
}
}
System.out.println("释放锁 thread:"+Thread.currentThread().getId());
}
}, "t_"+index).start();
}
}
注意 rLock.tryLock(10, TimeUnit.SECONDS);
时间要设置大一点,如果等待时间太短,小于获取锁 redis
命令的时间,那么就直接返回获取锁失败了。
// 线程 id 为 45 的线程获取到锁了
1616138978.155253 [0 127.0.0.1:51236] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "lock" "30000" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:45"
1616138978.155351 [0 lua] "exists" "lock"
1616138978.155361 [0 lua] "hset" "lock" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:45" "1"
1616138978.155377 [0 lua] "pexpire" "lock" "30000"
// 线程 44 没有获取到锁
1616138978.156080 [0 127.0.0.1:51232] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "lock" "30000" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:44"
1616138978.156171 [0 lua] "exists" "lock"
1616138978.156178 [0 lua] "hexists" "lock" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:44"
1616138978.156191 [0 lua] "pttl" "lock"
// 线程 44 开启的 redis 的订阅
1616138978.171950 [0 127.0.0.1:51222] "SUBSCRIBE" "redisson_lock__channel:{lock}"
// 线程 44 再一次准备获取锁失败
1616138978.177573 [0 127.0.0.1:51217] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "lock" "30000" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:44"
1616138978.177631 [0 lua] "exists" "lock"
1616138978.177637 [0 lua] "hexists" "lock" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:44"
1616138978.177645 [0 lua] "pttl" "lock"
// 线程 45 释放锁
1616138978.182198 [0 127.0.0.1:51235] "EVAL" "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "lock" "redisson_lock__channel:{lock}" "0" "30000" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:45"
1616138978.182312 [0 lua] "hexists" "lock" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:45"
1616138978.182331 [0 lua] "hincrby" "lock" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:45" "-1"
1616138978.182348 [0 lua] "del" "lock"
1616138978.182356 [0 lua] "publish" "redisson_lock__channel:{lock}" "0"
// 线程 44 获取到锁
1616138978.186110 [0 127.0.0.1:51234] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "lock" "30000" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:44"
1616138978.186221 [0 lua] "exists" "lock"
1616138978.186233 [0 lua] "hset" "lock" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:44" "1"
1616138978.186251 [0 lua] "pexpire" "lock" "30000"
// 取消订阅,因为没有等到获取锁的线程了
1616138978.188411 [0 127.0.0.1:51222] "UNSUBSCRIBE" "redisson_lock__channel:{lock}"
// 线程 44 释放锁
1616138978.221926 [0 127.0.0.1:51218] "EVAL" "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "lock" "redisson_lock__channel:{lock}" "0" "30000" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:44"
1616138978.222354 [0 lua] "hexists" "lock" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:44"
1616138978.222372 [0 lua] "hincrby" "lock" "8f2fb0bb-0edc-4f60-bf99-6e615d8358c3:44" "-1"
1616138978.222385 [0 lua] "del" "lock"
1616138978.222391 [0 lua] "publish" "redisson_lock__channel:{lock}" "0"
2.9 小结
分析源码我们了解 Redisson
模式的分布式,解决了锁过期时间和可重入的问题。但是针对 redis
本身可能存在的单点失败问题,其实是没有解决的。
基于这个问题,redis
作者提出了一种叫做 Redlock
算法, 但是这种算法本身也是有点问题的,想了解更多,请看 基于Redis的分布式锁到底安全吗