当前位置: 首页>数据库>正文

Redisson之--RedissonLock简介

WHY - 分布式锁

单进程系统中,当存在多个线程可以同时对某个变量或某块代码进行操作时,为保证其结果的正确性,需要保证同一时间内只有一个线程在进行操作,这个过程可以通过加锁来实现。由于在单进程中的多线程是可以共享堆内存,因此可以简单的在内存中记录是否加锁的标记。
但是现在部署一般都是多站点,多进程的情况下,就需要把标记位存储在一个各个进程都可以看到的地方,这就出现了分布式锁;
在本次的项目开发中因为需要不停的扫描数据库的变更情况,为避免多台站点同时进行,浪费资源,因此需要分布式锁来锁定其中一个进程来完成此操作,所以需要用到分布式锁。

WHY - Redisson

由于在本次的项目中扫描数据库是一个持续性的动作,每个5s扫描一次,因此需要获取锁之后不断延长其过期时间,也就是当某个线程获取锁后会一直保持并执行扫描的动作,直到该站点挂掉后,才会有其他站点重新获取锁并执行相关操作;Redisson已经封装好了续时功能,使用方便,因此选用Rdisson。

分布式锁的特征:

1-互斥:互斥的是必须的,否则就不叫锁了;
2-死锁:如果在一个线程中获取到锁,然后挂了,并没有释放,这样会导致其他的进程或线程永远无法获取到锁,这就会造成死锁.所以分布式锁必须避免造成死锁;
3-性能:高并发分布式系统中,线程互斥等待会成为性能瓶颈,需要好的中间件和实现来保证性能;
4-锁特性:分布式锁不能只是加锁,需要实现一些其他的功能如:锁判断,超市设置,可重入等;

1.锁的分类以及基本使用方法

1.1可重入锁
基于Redis的Redisson分布式可重入锁RLock
RLock lock = redisson.getLock("myTestLock");
// 最常见的使用方法
lock.lock();

// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}

//Reddsion同时还为分布式锁提供了异步执行的相关方法
RLock lock = redisson.getLock("myTestLock");
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
1.2公平锁

基于Redis的Redisson分布式可重入公平锁,它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。

RLock fairLock = redisson.getFairLock("myTestLock");
// 最常见的使用方法
fairLock.lock();

// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
fairLock.lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
...
fairLock.unlock();

//Redisson同时还为分布式可重入公平锁提供了异步执行的相关方法:
RLock fairLock = redisson.getFairLock("myTestLock");
fairLock.lockAsync();
fairLock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);
1.3 联锁

基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();

RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开
lock.lock(10, TimeUnit.SECONDS);

// 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
1.4 红锁

基于Redis的Redisson红锁RedissonRedLock对象可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的Redisson实例。
使用方式同上。区别在与连锁是所有节点的锁加锁成功才算成功,但红锁是大部分节点加锁成功即为成功。

红锁主要是用来解决什么问题的呢?
为了redis的高可用,一般都会给redis的节点挂一个slave,然后采用哨兵模式进行主备切换。但由于Redis的主从复制(replication)是异步的,这可能会出现在数据同步过程中,master宕机,slave来不及同步数据就被选为master,从而数据丢失。具体流程如下所示:
1.客户端1从Master获取了锁。
2.Master宕机了,存储锁的key还没有来得及同步到Slave上。
3.Slave升级为Master。
4.客户端2从新的Master获取到了对应同一个资源的锁。
为了应对这个情形, redis的作者提出了RedLock算法,步骤如下(该流程出自官方文档),假设我们有N个master节点(官方文档里将N设置成5,其实大等于3就行)
1.获取当前时间(单位是毫秒)。
2.轮流用相同的key和随机值在N个节点上请求锁,在这一步里,客户端在每个master上请求锁时,会有一个和总的锁释放时间相比小的多的超时时间。比如如果锁自动释放时间是10秒钟,那每个节点锁请求的超时时间可能是5-50毫秒的范围,这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间,如果一个master节点不可用了,我们应该尽快尝试下一个master节点。
3.客户端计算第二步中获取锁所花的时间,只有当客户端在大多数master节点上成功获取了锁(在这里是3个),而且总共消耗的时间不超过锁释放时间,这个锁就认为是获取成功了。
4.如果锁获取成功了,那现在锁自动释放时间就是最初的锁释放时间减去之前获取锁所消耗的时间。
5.如果锁获取失败了,不管是因为获取成功的锁不超过一半(N/2+1)还是因为总消耗时间超过了锁释放时间,客户端都会到每个master节点上释放锁,即便是那些他认为没有获取成功的锁。

分析:RedLock算法细想一下还存在下面的问题
节点崩溃重启,会出现多个客户端持有锁
假设一共有5个Redis节点:A, B, C, D, E。设想发生了如下的事件序列:
1.客户端1成功锁住了A, B, C,获取锁成功(但D和E没有锁住)。
2.节点C崩溃重启了,但客户端1在C上加的锁没有持久化下来,丢失了。
3.节点C重启后,客户端2锁住了C, D, E,获取锁成功。
这样,客户端1和客户端2同时获得了锁(针对同一资源)。
为了应对节点重启引发的锁失效问题,redis的作者提出了延迟重启的概念,即一个节点崩溃后,先不立即重启它,而是等待一段时间再重启,等待的时间大于锁的有效时间。采用这种方式,这个节点在重启前所参与的锁都会过期,它在重启后就不会对现有的锁造成影响。这其实也是通过人为补偿措施,降低不一致发生的概率。

1.5 读写锁

基于Redis的Redisson分布式可重入读写锁RReadWriteLock允许同时有多个读锁和一个写锁处于加锁状态。

RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();

// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);

// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
1.6 信号量

在上面的分布式可重入锁中,只有自己持有的锁才可以解锁,也就是说其他线程是没有办法解不属于他们的锁的,但是如果有业务需要的话可以使用基于Redis的Redisson的分布式信号量(Semaphore)来实现。

RSemaphore semaphore = redisson.getSemaphore("semaphore");
//设置共同持有许可证的最大个数
semaphore.trySetPermits(23);
//申请一个许可证(需要添加try catch 所以一般常用tryAcquire,不传参数默认获取一个,如下面传参23,则表示想要申请23个)
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();
1.7可过期性信号量

基于Redis的Redisson可过期性信号量(PermitExpirableSemaphore)是在RSemaphore对象的基础上,为每个信号增加了一个过期时间。
在使用过程中一般较常用的是带过期时间的信号量,原理类似与车库停车,车库满了就不能停车,车被开走,腾出的车位可以继续使用;因此在实际项目中经常用来解决分布式限流的问题或限制一项资源最多能够同时被多少客户访问的问题。

RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 获取一个信号,有效期只有2秒钟。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);
1.8闭锁

基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch。

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();

// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();

2.RedissionLock源码解读

2.1RedissionLock继承关系
Redisson之--RedissonLock简介,第1张
继承图.jpeg
2.2加锁解锁源码分析

看加锁方法之前先来看下加锁的流程图:


Redisson之--RedissonLock简介,第2张
流程图.png

Redisson之--RedissonLock简介,第3张
加锁流程图.png
Redisson之--RedissonLock简介,第4张
解锁流程图.png

先来看下RedissionLock中的Lock方法:

@Override
    public void lock(long leaseTime, TimeUnit unit) {
        try {
            lockInterruptibly(leaseTime, unit);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

@Override
    public void lockInterruptibly() throws InterruptedException {
        lockInterruptibly(-1, null);
    }

从上面的源码可以看出主要的实现方法在lockInterruptibly中,再来看看这个方法(此方法中主要的思路就是先获取锁,如果不成功的话则订阅释放锁的消息,获得消息前阻塞。得到释放通知后再去循环获取锁):

@Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        //获取当前的线程id
        long threadId = Thread.currentThread().getId();
        //尝试获取锁
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
        //获取成功
            return;
        }
        //异步订阅redis channel
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        //阻塞获取订阅结果
        //这里会订阅Channel,当资源可用时可以及时知道,并抢占,防止无效的轮询而浪费资源
        commandExecutor.syncSubscription(future);

       //当资源可用用的时候,循环去尝试获取锁,由于多个线程同时去竞争资源,所以这里用了信号量,对于同一个资源只允许一个线程获得锁,其它的线程阻塞
        try {
            while (true) { //循环判断直到获取锁
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired 如果剩余锁超时时间==null,则说明获取成功
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    //在这里其实是使用到了Semaphore来阻塞获取直到可以获取当前线程的许可证后才能继续当前的while循环
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            //取消订阅
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

接下来看下尝试获取锁的源码 Long ttl = tryAcquire(leaseTime, unit, threadId)
tips:下面方法中使用到了Netty的Future-listener模型,(多线程异步执行的时候,有时候还需要知道执行结果或者拿到线程执行的返回值,Future就是为了解决这个问题)

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(leaseTime, unit, threadId));
    }
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            //如果设置了锁的超时时间,则直接调用tryLockInnerAsync
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        //如果没有设置锁的超时时间,则默认超时时间为30s(此时间限制可以在配置文件中修改)
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Long ttlRemaining = future.getNow();
                // lock acquired
                //监听Future后如果获取锁成功,则当还剩下1/3的超时时间时刷新其过期时间达到续时的效果
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

接下来终于到了最终获取锁的方法tryLockInnerAsync(),点进去看到的源码如下:

 <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  //如果不存在"myTestLock"(getName()获取的值)这个key值
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      //则设置锁,"myTestLock"为key, "uuid:threadId" 为filed,filed值为1,大概的结构为"myTestLock":{"uuid:threadId" : 1}
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      //设置当前锁的过期时间为internalLockLeaseTime对应的值
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                 //如果key存在,filed也存在并且值为1,说明当前锁被当前线程持有着
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      //则把filed对应的值做加1处理
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      //刷新过期时间
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  //已经被其他线程持有,key存在,但是field不存在,返回当前锁的剩余超时时间
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

接下来看下解锁的核心方法unlockInnerAsync:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//如果key不存在,说明已经过期或压根没有加过锁
                "if (redis.call('exists', KEYS[1]) == 0) then " +
//则发送unlockMessage
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
//返回1,解锁成功
                    "return 1; " +
                "end;" +
//如果key存在,但是filed(uuid:threadId)不存在,说明当前线程不是该锁的持有者,无权解锁,直接返回nil
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
//key存在,filed也存在,说明是当前线程的持有锁,对filed的值进行减1操作,因为是可重入的,所以不能直接释放
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//如果减1操作后,持有数还是大于0,说明该线程还有其他的地方在持有锁,刷新过期时间,返回0
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
//如果不大于0,则说明该线程不再有持有者,则释放
                "else " +
//删除key
                    "redis.call('del', KEYS[1]); " +
//发送释放锁消息
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
//解锁成功
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
    }

加锁和解锁的核心代码最终是通过一段lua脚本实现,这样做的目的是为了保证这段复杂业务逻辑执行的原子性,因为当lua脚本在执行的时候,不会有其他脚本和命令同时执行。

2.3续时操作分析
 private void scheduleExpirationRenewal(final long threadId) {
//如果当前锁已过期则不再执行续时操作,直接返回
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }

        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                //执行续时操作
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                //同样使用Future-Listener获取执行续时操作的结果
                future.addListener(new FutureListener<Boolean>() {
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }
                        
                        if (future.getNow()) {
                            // reschedule itself  说明执行续时成功,递归调用
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }

        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
            task.cancel();
        }
    }

执行续时的核心方法renewExpirationAsync

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//如果key和filed都存在
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
//设置过期时间
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "return 0;",
            Collections.<Object>singletonList(getName()), 
            internalLockLeaseTime, getLockName(threadId));
    }

https://www.xamrdz.com/database/62c1997579.html

相关文章: