当前位置: 首页>移动开发>正文

分布式锁的三种实现方案

带着问题去思考

分布式锁有哪些解决方案?方案的利弊各自体现在哪里?
基于redis来实现分布式锁实现原理,以及需要主要那些问题?
基于ZooKeeper 的分布式锁实现原理

背景概要

互联网从开始的单体应用随之发展成目前的分布式应用,例如市场上流行的分布式框架Dubbo、SpringCloud等等
单体应用的优势:维护、集成、部署简单,适合小团队独立维护,劣势随之产生的是可扩展性太差,代码腐化维护成本的增加、应用复杂度越高功能越多风险性就越大。
所以为了解决以上问题,引入了分布式应用,市场上很多大型网站以及应用都是分布式部署的。

分布式系统主要从三方面获得提升:

  • 扩展性:集群扩展性、地理扩展性、管理扩展性
  • 性能:短RT、低延迟,高吞吐和较低的计算资源占用率。
  • 可用性:可用性=可用时间/(可用时间+不可用时间),可用性百分比越高,难度越高 。

分布式家族中包含分布式服务、分布式消息、分布式缓存、分布式调度、分布式数据库、分布式搜索、分布式锁、分布式事务、分布式计算等等


分布式锁的三种实现方案,第1张
分布式家族

在分布式场景中数据一致性向来是很重要的话题,CAP理论中“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。要么CP,要么AP。
https://www.yuque.com/docs/share/6e6afffe-c348-461e-90bc-711231fbd209?#
在很多场景下,我们是需要保证数据一致性,为了满足一致性问题,我们需要很多技术方案支持,比如分布式锁分布式事务等等。本次分享主要针对分布式锁来进行延伸探讨。

分布式锁的介绍

分布式锁的三种实现方案,第2张
分布式锁

讲到分布式锁,首先要提到与之对应的线程锁

  • 线程锁:当某个方法或代码使用锁,在同一时刻仅有一个线程执行该方法或该代码段。以保证共享资源安全性,线程锁只在同一个进程【同一个JVM】中才有效。
  • 分布式锁:当多个进程不在同一个系统中,用分布式锁控制多个进程对资源的访问。

到底什么时候需要分布式锁呢?

总结来说,当有多个客户端需要访问并防止并操作同一个资源,并且还需要保持这个资源的一致性的时候,就需要分布式锁,实现让多个客户端互斥的对共享资源进行访问。
eg: 集群部署下秒杀场景、集群部署下批处理业务的执行等等

考虑因素

  • 排他性:分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行
  • 可重入性:同一线程多次获取锁避免死锁
  • 阻塞锁考虑:考虑业务是否需要
  • 高可用:获取释放锁性能佳
  • 原子性:加锁解锁原子性操作,避免多次请求获得锁。

常见的分布式锁实现方案

想要实现分布式锁,我们就需要借助外部系统实现互斥的功能。常见的有以下几种方式:

  • 基于数据库;
  • 基于redis;
  • 基于Zookeeper;

基于数据库

  • Mysql:通过唯一索引、通过乐观锁version版本、通过悲观锁行锁 for update实现;
  • MongoDB:findAndModify原子性命令,相较Mysql性能要好很多

容易理解,但解决问题的方案相对越来越复杂,并且数据库需要一定的开销,性能值得考虑

本次分享不涵盖基于数据库分布式锁实现,有兴趣可以下去自行通过上述方向去扩展。

基于Redis

在实现redis分布式锁之前,我们先mock一个场景来看看,当分布式应用场景下,我们不用锁或者用java中线程锁会出现什么问题呢?

@GetMapping("/kill")
public String kill() {
  // 定义商品key值
  String key = "goods";
  // 获取商品数量
  Object obj = redisTemplate.opsForValue().get(key);
  Integer mount = Integer.valueOf(obj.toString());
  // 如果商品被抢完,直接返回
  if (mount < 0 || mount == 0) {
  System.out.println("很遗憾,商品已被抢完");
  return "很遗憾,商品已被抢完";
  }
  // 线程睡眠,目的在于放大错误
  try {
  Thread.sleep(2000);
  } catch (InterruptedException e) {
  e.printStackTrace();
  }
  // 抢到商品后,将redis的商品数量减一
  mount = --mount;
  redisTemplate.opsForValue().set(key, mount.toString());
  // 打印,以便观察
  System.out.println(System.currentTimeMillis() + "-" + Thread.currentThread().getName() + ":抢到第" + (mount + 1) + "件商品【kill】");
  return "恭喜,商品抢购成功";
}

原理

最核心的三个命令:setNx、Px(setNx、expire)、delete
setNx当返回1时代表获取锁成功,0则抢锁失败

redis原生实现

具体实现

接下来我们所有的实现redis分布式锁都是基于Spring切面定义来完成。

    // 准备工作
    @Bean(name="redisTemplate")
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, String> template = new RedisTemplate<>();
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        template.setConnectionFactory(factory);
        //key序列化方式
        template.setKeySerializer(redisSerializer);
        //value序列化
        template.setValueSerializer(redisSerializer);
        //value hashmap序列化
        template.setHashValueSerializer(redisSerializer);
        //key haspmap序列化
        template.setHashKeySerializer(redisSerializer);
        return template;
    }
@Aspect
@Component
public class RedisLockProvider {

    @Around("@annotation(lock)")
    public Object execute(ProceedingJoinPoint point, RedisLock lock) throws IllegalAccessException, InstantiationException {
        Object result = null;
        String lockKey = lock.lockKey();
        Class<extends RedisLockStrategy> strategy = lock.strategy();
        RedisLockStrategy redisLockStrategy = strategy.newInstance();
        try {
            int expireTime = lock.expireTime();
            if(redisLockStrategy.lock(lockKey, expireTime)){
                result = point.proceed();
            }
        } catch (Throwable throwable) {
            redisLockStrategy.unLock(lockKey);
            throwable.printStackTrace();
        } finally {
            redisLockStrategy.unLock(lockKey);
        }
        return result;
    }

}

我们看了不加锁或者加java锁的情况,确实会出现数据异常的情况,那现在我们通过redis本身的一些特性来实现redis分布式锁。

public class RedisLockUtil0 implements RedisLockStrategy {

    private String TEMP_VALUE = "OK";

    private RedisTemplate<String, String> redisTemplate =  ApplicationContextUtil.getBean("redisTemplate");

    public Boolean lock(String key, int expireTime) {
          return redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS);
    }

    public void unLock(String key){
        redisTemplate.delete(key);
    }
}

但上述场景中会有一个问题,就是当我获取锁的时候,如果没抢锁成功会立刻返回到客户端通知结果,也许下一时间正好就能抢到锁,所以我们做了自旋的操作,并设置默认超时时间,这里也可以不设置超时时间:那就是阻塞锁了,看业务场景而制定

public class RedisLockUtil1 implements RedisLockStrategy {

    private static final long DEFAULT_TIME_OUT = 2000; // 默认超时时间(毫秒) 默认2秒

    private String TEMP_VALUE = "OK";

    private RedisTemplate<String, String> redisTemplate =  ApplicationContextUtil.getBean("redisTemplate");

    public Boolean lock(String key, int expireTime) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
                if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
                    log.info("申请锁(" + key + ")成功");
                    return Boolean.TRUE;
                }
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            log.error("加锁失败, 锁键值:" + key, e);
        }
        return Boolean.FALSE;
    }

    public void unLock(String key){
        log.info("释放锁,key:"+key);
        redisTemplate.delete(key);
    }
}

上述实现方式,看似逻辑ok,有同学能看出有什么漏洞吗???我们来测试看下。
让我们业务时间执行较短时测试,发现业务逻辑没太大问题;
让我们业务时间稍微变大,结果发现了什么?

当业务执行时间大于默认的自旋超时时间时,会触发删除锁操作,会出现误删的情况,A业务的锁还未执行完成,B锁获取异常或获取失败或者自旋时间已经超时,导致误删了A业务的锁,也就会导致分布式锁的定义没有任何意义了。所以我们在设置锁的时候,设置一个属于该锁的唯一ID,在删除锁的时候要判断是否属于自己的锁。以避免误删场景。

public class RedisLockUtil2 implements RedisLockStrategy {

    /**
     * 默认超时时间(毫秒) 默认2秒
     */
    private static final long DEFAULT_TIME_OUT = 2000;

    /**
     * 避免误删key,在value中赋值UUID
     */
    private String TEMP_VALUE = UUID.randomUUID().toString();

    private RedisTemplate redisTemplate =  ApplicationContextUtil.getBean("redisTemplate");

    public Boolean lock(String key, int expireTime) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
                if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
                    log.info("加锁成功...");
                    return Boolean.TRUE;
                }
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            log.error("加锁失败, 锁键值:" + key, e);
        }
        return Boolean.FALSE;
    }

    public void unLock(String key){
        if(TEMP_VALUE.equals(redisTemplate.opsForValue().get(key))){
            log.info("释放锁,key:"+key);
            redisTemplate.delete(key);
        }
    }
}

那现在我们模拟一个场景,假设我们业务的执行时间过长的情况下,但我们设置的redis过期时间很短,那会出现什么问题呢???我们来测试看下。

是不是导致A业务的还未执行完成,B业务却拿到了本应该属于A的锁,分布式锁的意思有荡然无存了,所以我们借鉴了redisson中WatchDog【看门狗】的机制来完善业务时间大于过期时间的问题。

redisson是用java来实现的分布式框架,稍后我们会介绍redisson是如何基于redis来实现分布式锁并解决相关问题的。

public class RedisLockUtil3 implements RedisLockStrategy {

    private static final long DEFAULT_TIME_OUT = 2000; // 默认超时时间(毫秒) 默认2秒

    private String TEMP_VALUE = UUID.randomUUID().toString();

    private RedisTemplate redisTemplate =  ApplicationContextUtil.getBean("redisTemplate");

    /**
     * 初始化任务线程池
     */
    ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
    /**
     * 延长过期次数阈值
     */
    private Integer maxRenewTimes = 10;
    /**
     * 延长过期时间次数
     */
    private AtomicInteger renewTimes = new AtomicInteger(0);

    public Boolean lock(String key, int expireTime) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
                if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
                    log.info("申请锁(" + key + ")成功");
                    this.scheduleExpirationRenewal(key, TEMP_VALUE, expireTime);
                    return Boolean.TRUE;
                }
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            log.error("加锁失败, 锁键值:" + key, e);
        }
        return Boolean.FALSE;
    }

    /**
     * expireTime/3 频率去重置过期时间
     * @param key
     * @param value
     * @param expireTime
     */
    private void scheduleExpirationRenewal(String key, String value, Integer expireTime){
        scheduledExecutor.schedule(()->{
            // 延长过期时间失败直接返回
            if(!this.renewExpiration(key, value, expireTime)){
                return;
            }
            // 超过延长次数阈值直接返回
            if(maxRenewTimes > 0 && renewTimes.incrementAndGet() == maxRenewTimes){
                return;
            }
            this.scheduleExpirationRenewal(key,value,expireTime);
        }, expireTime / 3, TimeUnit.SECONDS);
    }

    /**
     * 重置过期时间
     * @param lockKey
     * @param lockValue
     * @param lockWatchdogTimeout
     * @return
     */
    private boolean renewExpiration(String lockKey, String lockValue, Integer lockWatchdogTimeout) {
        String value = (String) redisTemplate.opsForValue().get(lockKey);
        if (Objects.isNull(value) || !value.equals(lockValue)) {
            return false;
        }
        log.info("延长过期时间,key:"+lockKey);
        return redisTemplate.expire(lockKey, lockWatchdogTimeout, TimeUnit.SECONDS);
    }

    public void unLock(String key){
        if(TEMP_VALUE.equals(redisTemplate.opsForValue().get(key))){
            log.info("释放锁,key:"+key);
            redisTemplate.delete(key);
        }
    }
}

Lua脚本语言的引入

到此为止,我们自己实现的分布式方案看似已经ok,但其实还是有很大的问题,譬如在删除锁、给锁加长超时时间等操作,我们是先获取锁在删除或者延长超时时间,两者操作并不是原子性操作,如果在获取锁成功之后,redis宕机,那么也会出现业务紊乱,所以我们在redis操作要尽量保证院子性操作。
那么我们可以引入Lua脚本语言来支持,Lua脚本的优势,兴趣的同学可以下去学习下lua语言,大部分游戏开发都用的lua来实现;语法链接:https://www.runoob.com/lua/lua-tutorial.html
并且我们国人章亦春 OpenResty 也是基于Lua和Nginx来实现高性能服务端的。

分布式锁的三种实现方案,第3张
Lua优势

Redis 脚本使用 Lua 解释器来执行脚本。 Redis 2.6 版本通过内嵌支持 Lua 环境。执行脚本的常用命令为 EVAL。脚本命令:https://www.runoob.com/redis/redis-scripting.html

脚本入参:key个数、KEYS[1]、ARGV[1]、ARGV[2]
eg:EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second

eval "return redis.call('setNx', KEYS[1], ARGV[1])" 1 wuding WD
eval "return redis.call('get',KEYS[1])" 1 CCC;

接下来我们先测试下redis内嵌lua脚本。执行成功返回1,否则返回0

@GetMapping("/luaTest")
    public String luaTest() {
        String script_init = "if redis.call('setNx',KEYS[1],ARGV[1]) == 1  then " +
                "return redis.call('expire',KEYS[1],ARGV[2]) " +
                "else " +
                "return 0 " +
                "end";

        Object initResult1 = redisTemplate.execute(new DefaultRedisScript<Long>(script_init, Long.class), Arrays.asList("AAA"), "aaa","1000");
        System.out.println(initResult1);

        Object initResult2 = redisTemplate.execute(new DefaultRedisScript<Long>(script_init, Long.class), Arrays.asList("AAA"), "aaa","1000");
        System.out.println(initResult2);


        String expire_init = "if redis.call('get',KEYS[1]) == ARGV[1]  then " +
                "return redis.call('expire',KEYS[1],ARGV[2]) " +
                "else " +
                "return 0 " +
                "end";


        Object expireResult = redisTemplate.execute(new DefaultRedisScript<Long>(expire_init, Long.class), Arrays.asList("AAA"),"aaa", "5000");
        System.out.println(expireResult);
        
        String script = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
                "return redis.call('del',KEYS[1]) " +
                "else " +
                "return 0 " +
                "end";
        Object aaa = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("AAA"), "aaa");
        System.out.println(aaa);
        return aaa.toString();
    }

完善嵌入lua脚本的redis分布式锁实现

public class RedisLockUtil4 implements RedisLockStrategy {

    private static final long DEFAULT_TIME_OUT = 20000; // 默认超时时间(毫秒) 默认20秒
    private String TEMP_VALUE = UUID.randomUUID().toString();
    private RedisTemplate redisTemplate =  ApplicationContextUtil.getBean("redisTemplate");
    ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
    private Integer maxRenewTimes = 10;
    private AtomicInteger renewTimes = new AtomicInteger(0);

    public Boolean lock(String key, int expireTime) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
                if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
                    log.info("redis申请锁(" + key + ")成功");
                    this.scheduleExpirationRenewal(key, TEMP_VALUE, expireTime);
                    return Boolean.TRUE;
                }
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            log.error("加锁失败, 锁键值:" + key, e);
        }
        return Boolean.FALSE;
    }

    private void scheduleExpirationRenewal(String key, String value, Integer expireTime){
        scheduledExecutor.schedule(()->{
            if(!this.renewExpiration(key, value, expireTime)){
                return;
            }
            if(maxRenewTimes > 0 && renewTimes.incrementAndGet() == maxRenewTimes){
                return;
            }
            this.scheduleExpirationRenewal(key,value,expireTime);
        }, expireTime / 3, TimeUnit.SECONDS);
    }

    private boolean renewExpiration(String lockKey, String lockValue, Integer lockWatchdogTimeout) {
        String expire_init = "if redis.call('get',KEYS[1]) == ARGV[1]  then " +
                "return redis.call('expire',KEYS[1],ARGV[2]) " +
                "else " +
                "return 0 " +
                "end";

        // 入参切记都是字符串,要不然就会类型转换失败,scheduledExecutor把异常捕获掉了看不到错误信息
        Long expireResult = null;
        try {
            expireResult = (Long) redisTemplate.execute(new DefaultRedisScript<Long>(expire_init, Long.class), Arrays.asList(lockKey),lockValue, lockWatchdogTimeout.toString());
        } catch (Exception e) {
            log.error("执行lua脚本出错!e:"+ e.getCause().getMessage());
            return Boolean.FALSE;
        }
        if(expireResult == 1L){
            log.info("延长过期时间,key:"+lockKey);
        }
        return expireResult == 1L;
    }

    public void unLock(String key){
        String script = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
                "return redis.call('del',KEYS[1]) " +
                "else " +
                "return 0 " +
                "end";
        Long result = (Long) redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList(key), TEMP_VALUE);
        if(result == 1L){
            log.info("释放锁,key:"+key);
        }
    }
}

接下来我们再设想一个问题:业务A调用业务B,AB两者业务都调用了分布式锁,或者A业务来做个递归操作,那么大家猜想下会出现什么问题???我们来测试看下。

把锁的超时时间设大来进行测试,如果是阻塞锁会一直阻塞下去,非阻塞锁的话,超时时间获取子方法也没有执行,业务逻辑也就会有问题。这就是我们可重入性的问题。
可重入锁指的是可重复可递归调用的锁,在外层使用锁之后,在内层仍然可以使用,如果没有可重入锁的支持,在第二次尝试获得锁时将会进入死锁状态。
通俗理解就是:排队打水,一个人只能用一个桶来接水,如果你还有一个桶,只能再去排队,这就是非重入性,反之,只要到排到你,不管你拿几个桶你都可以来接满水。

public class RedisLockUtil5 implements RedisLockStrategy {

    private static final long DEFAULT_TIME_OUT = 20000; // 默认超时时间(毫秒) 默认2秒

    private String TEMP_VALUE = UUID.randomUUID().toString();

    private RedisTemplate<String, String> redisTemplate =  ApplicationContextUtil.getBean("redisTemplate");

    ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();

    private Integer maxRenewTimes = 10;

    private AtomicInteger renewTimes = new AtomicInteger(0);

    public Boolean lock(String key){
        return lock(key, 10);
    }

    public Boolean lock(String key, int expireTime) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            while ((System.currentTimeMillis() - currentTimeMillis) < DEFAULT_TIME_OUT) {
                LockInfo lockInfo = ThreadLocalUtil.get();
                if(Objects.nonNull(lockInfo)
                        && key.equals(lockInfo.getKey())){
                    lockInfo.getCount().incrementAndGet();
                    ThreadLocalUtil.put(lockInfo);
                    // 将threadLocal中的value赋值给当前TEMP_VALUE,保证可重入性,保证删除逻辑正常
                    TEMP_VALUE = lockInfo.getValue();
                    // TODO 这里应该重置redis过期时间
                    log.info("可重入加锁成功...");
                    return Boolean.TRUE;
                }
                if(redisTemplate.opsForValue().setIfAbsent(key, TEMP_VALUE, expireTime, TimeUnit.SECONDS)){
                    log.info("redis申请锁(" + key + ")成功");
                    ThreadLocalUtil.put(new LockInfo(key, TEMP_VALUE, new AtomicInteger(1)));
                    this.scheduleExpirationRenewal(key, TEMP_VALUE, expireTime);
                    return Boolean.TRUE;
                }
                Thread.sleep(100);
            }
        } catch (InterruptedException e) {
            log.error("加锁失败, 锁键值:" + key, e);
        }
        return Boolean.FALSE;
    }

    private void scheduleExpirationRenewal(String key, String value, Integer expireTime){
        scheduledExecutor.schedule(()->{
            if(!this.renewExpiration(key, value, expireTime)){
                return;
            }
            if(maxRenewTimes > 0 && renewTimes.incrementAndGet() == maxRenewTimes){
                ThreadLocalUtil.clear();
                return;
            }
            this.scheduleExpirationRenewal(key,value,expireTime);
        }, expireTime / 3, TimeUnit.SECONDS);
    }

    private boolean renewExpiration(String lockKey, String lockValue, Integer lockWatchdogTimeout) {
        String expire_init = "if redis.call('get',KEYS[1]) == ARGV[1]  then " +
                "return redis.call('expire',KEYS[1],ARGV[2]) " +
                "else " +
                "return 0 " +
                "end";

        // 入参切记都是字符串,要不然就会类型转换失败,scheduledExecutor把异常捕获掉了看不到错误信息
        Long expireResult = null;
        try {
            expireResult = (Long) redisTemplate.execute(new DefaultRedisScript<Long>(expire_init, Long.class), Arrays.asList(lockKey),lockValue, lockWatchdogTimeout.toString());
        } catch (Exception e) {
            log.error("执行lua脚本出错!e:"+ e.getCause().getMessage());
            return Boolean.FALSE;
        }
        if(expireResult == 1L){
            log.info("延长过期时间,key:"+lockKey);
        }
        return expireResult == 1L;
    }

    public void unLock(String key){
        LockInfo lockInfo = ThreadLocalUtil.get();
        if(Objects.nonNull(lockInfo)
                && key.equals(lockInfo.getKey())
                && TEMP_VALUE.equals(lockInfo.getValue())){
            lockInfo.getCount().decrementAndGet();
            ThreadLocalUtil.put(lockInfo);
            log.info("释放threadLocal锁,key:"+key);
        }

        if(Objects.nonNull(lockInfo) && lockInfo.getCount().get() <= 0){
            try {
                String script = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
                        "return redis.call('del',KEYS[1]) " +
                        "else " +
                        "return 0 " +
                        "end";
                Long result = (Long) redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList(key), TEMP_VALUE);
                if(result == 1L){
                    log.info("释放redis锁,key:"+key);
                }
            } finally {
                ThreadLocalUtil.clear();
            }
        }
    }


    /**
     * 也可以将线程id存入redis中去做比较,有兴趣可以自行实现
     */
    static class ThreadLocalUtil{
        private static final ThreadLocal<LockInfo> THREAD_LOCAL = new ThreadLocal<>();

        public static LockInfo get() {
            return THREAD_LOCAL.get();
        }

        public static void put(LockInfo lockInfo) {
            THREAD_LOCAL.set(lockInfo);
        }

        public static void clear() {
            THREAD_LOCAL.remove();
        }

    }

    @AllArgsConstructor
    @Data
    static class LockInfo{

       private String key;

       private String value;

       private AtomicInteger count;
    }
}

也可以将线程id存入redis中去做比较,有兴趣可以自行实现
并且避免线程id可能重复,可以把每个进程标识为唯一id作为前缀,有兴趣可以自行实现

最终形态的流程图

分布式锁的三种实现方案,第4张
流程图

解决了哪些问题

到此为止我们redis分布式锁就最终实现完成了。我们回顾下我们解决了那些问题?

  • 阻塞锁-回旋操作
  • 误删锁的问题
  • 业务执行时间大于分布式锁的过期时间如何处理
  • redis命令非原子性问题
  • 可重入锁的问题

Redisson实现

在介绍看门狗机制的时候我们有提到redisson框架,那么接下来我们看下redisson分布式锁框架具体是如何实现的。首先我们先针对上述mock业务通过redisson分布式锁来探究是否会出现上述问题

具体实现

分布式锁的三种实现方案,第5张
配置文件

redisson.yml配置:具体配置映射对象在org.redisson.config;

# 单节点配置
singleServerConfig:
  # 连接空闲超时,单位:毫秒
  idleConnectionTimeout: 10000
  # 连接超时,单位:毫秒
  connectTimeout: 10000
  # 命令等待超时,单位:毫秒
  timeout: 3000
  # 命令失败重试次数,如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。
  # 如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。
  retryAttempts: 3
  # 命令重试发送时间间隔,单位:毫秒
  retryInterval: 1500
  #  # 重新连接时间间隔,单位:毫秒
  #  reconnectionTimeout: 3000
  #  # 执行失败最大次数
  #  failedAttempts: 3
  # 密码
  password:
  # 单个连接最大订阅数量
  subscriptionsPerConnection: 5
  # 客户端名称
  clientName: myRedis
  #  # 节点地址
  address: redis://127.0.0.1:6379
  # 发布和订阅连接的最小空闲连接数
  subscriptionConnectionMinimumIdleSize: 1
  # 发布和订阅连接池大小
  subscriptionConnectionPoolSize: 50
  # 最小空闲连接数
  connectionMinimumIdleSize: 32
  # 连接池大小
  connectionPoolSize: 64
  # 数据库编号
  database: 0
  # DNS监测时间间隔,单位:毫秒
  dnsMonitoringInterval: 5000
# 线程池数量,默认值: 当前处理核数量 * 2
threads: 0
# Netty线程池数量,默认值: 当前处理核数量 * 2
nettyThreads: 0
# 编码
codec: !<org.redisson.codec.JsonJacksonCodec> {}
# 传输模式
transportMode : "NIO"
// 准备工作
@Bean(destroyMethod="shutdown")
public RedissonClient redisson() throws IOException {
    RedissonClient redisson = Redisson.create(
        Config.fromYAML(new ClassPathResource("redisson.yml").getInputStream()));
    return redisson;
}
@Aspect
@Component
public class RedissonLockProvider {

    @Autowired
    private RedissonClient redissonClient;

    @Around("@annotation(lock)")
    public Object execute(ProceedingJoinPoint point, RedissonLock lock){
        Object result = null;
        String lockKey = lock.lockKey();
        RLock rLock = redissonClient.getLock(lockKey);
        rLock.lock();
        try {
            result = point.proceed();
        } catch (Throwable throwable) {
            rLock.unlock();
            throwable.printStackTrace();
        } finally {
            rLock.unlock();
        }
        return result;
    }
}

Redisson锁分类

  • 可重入锁(Reentrant Lock)
  • 公平锁(Fair Lock):也是继承了可重入锁的
  • 联锁(MultiLock):将多个RLock对象关联为一个联锁,每个实例可以来自于不同Redisson实例。
  • 红锁(RedLock):继承联锁。n个master节点完全独立,并且没有主从同步,此时如果有n / 2 + 1个节点成功拿到锁并且大多数节点加锁的总耗时,要小于锁设置的过期时间。so加锁成功。
  • 读写锁(ReadWriteLock)、信号量(Semaphore)、可过期性信号量(PermitExpirableSemaphore)、闭锁(CountDownLatch)

节点挂掉的时候,存在丢失锁的风险的问题。而现实情况是有一些场景无法容忍的,所以 Redisson 提供了实现了redlock算法,如果业务场景可以容忍这种小概率的错误,则推荐使用 RedissonLock, 如果无法容忍,则推荐使用 RedissonRedLock。

源码探究

大家如果有兴趣了解上述锁的具体实现原理可自行研究,本次分享主要针对默认redisson实现锁方案可重入锁RLock来讲解,

    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        // 异步的Executor执行器
        this.commandExecutor = commandExecutor;
        this.id = commandExecutor.getConnectionManager().getId();
        // 默认超时时间为30S
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        this.entryName = id + ":" + name;
        // redis的订阅发布模式
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }

// 获取锁成功返回null,获取锁失败 && 等待时间还早就频繁获取锁并监听锁是否被释放掉
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        // 尝试获取锁,没有获取到锁,返回剩余ttl过期时间
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        // waitTime超时后,返回false获取锁失败
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        // 订阅分布式锁,解决通知,发布订阅模式
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        // 阻塞等待锁释放,等待超时释放订阅信息
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
            // 循环去调用获取锁方法tryAcquire
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                //1、latch其实是个信号量Semaphore,调用其tryAcquire方法会让当前线程阻塞一段时间,避免了在while循环中频繁请求获取锁;
                //2、该Semaphore的release方法,会在订阅解锁消息的监听器消息处理方法org.redisson.pubsub.LockPubSub#onMessage调用;当其他线程释放了占用的锁,会广播解锁消息,监听器接收解锁消息,并释放信号量,最终会唤醒阻塞在这里的线程。
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            // 取消解锁消息的订阅
            unsubscribe(subscribeFuture, threadId);
        }
    }

    private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(leaseTime, unit, threadId));
    }
    
    private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
        // tryLock()方法最终执行逻辑
        if (leaseTime != -1) {
            // 如果有设置锁的过期时间,则直接调用lua,不走看门狗逻辑
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        }
        // lock()方法最终执行逻辑
        RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining) {
                // 执行看门狗逻辑
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }

    // 将线程id存入hash中的Field,用于解决可重入问题
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +  // key是否不存在
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +  // 不存在的话把赋值Key,Filed为线程id,value为1存储到hash数据结构中
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +  // 并且设置过期时间
                      "return nil; " +  // 返回null
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 如果可以存在,并且field等于线程id
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +  // 则吧value++1
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +  // 重置过期时间
                      "return nil; " +  // 返回null
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",  // 返回过期时间
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
// watchDog看门狗逻辑
private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        renewExpiration();
    }
}

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    // 定时任务过期时间 internalLockLeaseTime/3 频率来延长过期时间为internalLockLeaseTime
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }

            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }

                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    ee.setTimeout(task);
}

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "return 0;",
            Collections.<Object>singletonList(getName()), 
            internalLockLeaseTime, getLockName(threadId));
}
// 解锁逻辑
// key+field不存在,直接返回null
// 存在的话,value--1,判断value是否大于0,大于0重置过期时间,否则删除key并且发布删除订阅事件
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

    }

基于Zookeeper

zookeeper分布式锁原理

  • 保持独占:多个客户端同时过来创建/lock/zk-001节点,那么有且仅有一个客户端能创建成功。换句话说,倘若把Zookeeper上的一个节点看做是一把锁,那么成功创建的客户端则能保持独占;
  • 控制时序:有一种临时有序节点,每个来尝试获取锁的客户端,都会在Zookeeper的根目录下创建一个临时有序节点,Zookeeper的/lock节点维护一个序列,序号最小的节点获取锁成功。
  • 监听机制:Watcher机制能原子性的监听Zookeeper上节点的增删改操作

基础理论

zk-znode
zk操作和维护的为一个个数据节点,称为 znode,采用类似文件系统的层级树状结构进行管理,如果 znode 节点包含数据则存储为字节数组(byte array)。同一个节点多个客户同时创建,只有一个客户端会成功,其它客户端创建时将失败。

zk的四种节点
持久性节点:节点创建后将会一直存在
临时节点:临时节点的生命周期和当前会话绑定,一旦当前会话断开临时节点也会删除,当然可以主动删除。
持久有序节点:节点创建一直存在,并且zk会自动为节点加上一个自增的后缀作为新的节点名称。
临时有序节点:保留临时节点的特性,并且zk会自动为节点加上一个自增的后缀作为新的节点名称。

zk-watcher
事件监听器是zookeeper中的一个很重要的特性。

None(-1), 客户端与服务端成功建立连接
NodeCreated(1),Watcher监听的对应数据节点被创建
NodeDeleted(2),Watcher监听的对应数据节点被删除
NodeDataChanged(3),Watcher监听的对应数据节点的数据内容发生变更
NodeChildrenChanged(4),Wather监听的对应数据节点的子节点列表发生变更
DataWatchRemoved(5),
ChildWatchRemoved(6),
_PersistentWatchRemoved _(7);

实现思路

首先方案:同一个节点只能创建一次,加锁时检查节点是否exist,不存在则创建节点,否则监听该节点的删除事件,当释放锁的时候再次竞争去创建节点。如此带来的就是当并发量很高的时候,释放锁会唤醒许多客户端都去竞争,竞争失败的客户端再去休眠,如此反复对系统资源造成了极大的浪费。


分布式锁的三种实现方案,第6张
方案一

为了规避以上问题,我们可以使用有序子节点的形式来实现分布式锁,而且为了规避客户端获取锁后突然断线的风险,我们有必要使用临时有序节点。

多个客户端竞争锁资源,创建多个临时有序节点,检查所属节点是否是最小节点,如若是,则获取锁成功,如若不是,那就监听自己节点-1的删除事件,等待被唤醒。
这种方案在每次释放锁时只唤醒一个客户端,减少了线程唤醒的代价,提高了效率。


分布式锁的三种实现方案,第7张
方案二

zk原生API实现

接下来我们通过zk原生实现API来实现分布式锁

public class ZkLockUtil implements Watcher {

    public static final String NODE_PATH = "/lock-space-watcher";

    private ZooKeeper zk = null;

    public ZkLockUtil() throws IOException, KeeperException, InterruptedException {
        zk = new ZooKeeper("127.0.0.1:2181", 300000, this);
    }

    protected  CountDownLatch countDownLatch=new CountDownLatch(1);

    private String lockPath;

    public String createNode(String key){
        try {
            String node = NODE_PATH +"/"+ key;
            //检测节点是否存在
            Stat stat = zk.exists(node, false);
            //父节点不存在,则创建父节点
            if(Objects.isNull(stat)){
                synchronized (NODE_PATH) {
                    //父节点是持久节点 一层层创建否则会报错
                    zk.create(node, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
            }
            lockPath = zk.create(node + "/", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println("节点创建成功,返回值【"+lockPath+"】");
            return lockPath;
        } catch (KeeperException e1) {
            e1.printStackTrace();
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        return null;
    }

    //校验当前节点是否为序号最小的节点
    public boolean checkLockPath(String key, String lockPath){
        String nodePath = NODE_PATH + "/" + key;
        try {
            //注册父节点监听事件,当父节点下面的子节点有变化,就会触发Watcher事件
            List<String> nodeList = zk.getChildren(nodePath, false);
            Collections.sort(nodeList);
            int index = nodeList.indexOf( lockPath.substring(nodePath.length()+1));
            switch (index){
                case -1:{
                    System.out.println("本节点已不在了"+lockPath);
                    return false;
                }
                case 0:{
                    System.out.println("获取锁成功,子节点序号【"+lockPath+"】");
                    return true;
                }
                default:{

                    String waitPath = nodeList.get(index - 1);
                    zk.exists(nodePath+"/"+waitPath, this);
                    System.out.println(waitPath+"在"+nodeList.get(index)+"点前面,需要等待【"+nodeList.get(index)+"】");
                    return false;
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    public boolean lock(String key, Integer waitTime){
        //创建获取锁的节点(顺序临时节点)
        String childPath = createNode(key);
        boolean flag = true;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        if(null != childPath){
            lockPath = childPath;
            try {
                //轮询等待zk获取锁的通知
                while(flag){
                    if(checkLockPath(key, childPath)){
                        //获取锁成功
                        return true;
                    }
                    if(null != waitTime && atomicInteger.get() > 0){
                        // 删除当前等待节点
                        return false;
                    }
                    //节点创建成功, 则等待zk通知
                    if(null != waitTime){
                        countDownLatch.await(waitTime, TimeUnit.SECONDS);
                        atomicInteger.incrementAndGet();
                        System.out.println("await等待被唤醒~"+waitTime);
                    }else{
                        countDownLatch.await();
                        System.out.println("await等待被唤醒~");
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }else{
            System.out.println("节点没有创建成功,获取锁失败");
        }
        return false;
    }

    @Override
    public void process(WatchedEvent event) {
        //成功连接zk,状态判断
        if(event.getState() == Event.KeeperState.SyncConnected){
            //子节点有变化
            if(event.getType() == Event.EventType.NodeDeleted){
                System.out.println("临时节点自动删除");
                countDownLatch.countDown();
            }
        }
    }

    public void unlock(){
        try {
            zk.delete(getLockPath(), -1);
            if(Objects.nonNull(zk)){
                zk.close();
            }
        } catch (Exception e) {
        }
    }

    public String getLockPath() {
        return lockPath;
    }

}

上述实现方式虽能更好的理解zk来实现分布式锁的逻辑,但本身zk原生实现编码实现较多,并且很难保证是否有有问题,不太建议自己编码来实现zk原生的分布式锁,如果有兴趣的同学可自行实现可重入锁的逻辑,上述已经分析了很多实现方案,这儿不在对此深入。

客户端Curator实现

接下来我们通过zk的客户端Curator来实现zk的分布式锁。
Apache 开源框架Curator是一个比较完善的ZooKeeper客户端框架,通过封装的一套高级API 简化了ZooKeeper的操作。

Curator锁分类

可重入互斥锁 InterProcessMutex
不可重入互斥锁 InterProcessSemaphoreMutex
读写锁 InterProcessReadWriteLock
集合锁 InterProcessMultiLock

具体实现

接下来我们通过zk客户端Curator来实现分布式锁

    InterProcessSemaphoreMutex mutex = new InterProcessSemaphoreMutex(ApplicationContextUtil.getBean(CuratorFramework.class), String.format("/lock-space-1/%s", "KILL_LOCK"));
//    InterProcessMutex mutex = new InterProcessMutex(ApplicationContextUtil.getBean(CuratorFramework.class), String.format("/lock-space-1/%s", "KILL_LOCK"));

public String kill() throws Exception {
        mutex.acquire();
//        mutex.acquire(15, TimeUnit.SECONDS);
        try{
            // 定义商品key值
            String key = "goods";
            // 获取商品数量
            Object obj = redisTemplate.opsForValue().get(key);
            Integer mount = Integer.valueOf(obj.toString());
            // 如果商品被抢完,直接返回
            if (mount < 0 || mount == 0) {
                System.out.println("很遗憾,商品已被抢完【kill】");
                return "很遗憾,商品已被抢完";
            }
            // 线程睡眠,目的在于放大错误
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 抢到商品后,将redis的商品数量减一
            mount = --mount;
            redisTemplate.opsForValue().set(key, mount.toString());
            // 打印,以便观察
            System.err.println(System.currentTimeMillis() + "-" + Thread.currentThread().getName() + ":抢到第" + (mount + 1) + "件商品【kill】");

        } catch (Exception e) {
            mutex.release();
            e.printStackTrace();
        } finally {
            mutex.release();
        }
         return "恭喜,商品抢购成功";
    }

Curator-InterProcessMutex可重入锁源码探究

InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver)
{
    basePath = PathUtils.validatePath(path);
    internals = new LockInternals(client, driver, path, lockName, maxLeases);
}


public boolean acquire(long time, TimeUnit unit) throws Exception
{
    return internalLock(time, unit);
}
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */

        Thread currentThread = Thread.currentThread();
        // LockData存储当前持有锁的线程:为了实现可重入
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null )
        {
            // 当前锁 重入++1
            lockData.lockCount.incrementAndGet();
            return true;
        }

        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {   
            // 获取锁成功放到缓存中
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
}

 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        final long      startMillis = System.currentTimeMillis();
        final Long      millisToWait = (unit != null) unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) new byte[0] : lockNodeBytes;
        int             retryCount = 0;

        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
        ...省略部分代码
        // 创建临时有序节点
        ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
        // 执行获取锁逻辑
        hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        ...省略部分代码
        return ourPath;
    }
    
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
    {
        boolean     haveTheLock = false;
        boolean     doDelete = false;
        try
        {
            if ( revocable.get() != null )
            {
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }

            while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                // 对所有节点进行排序:从小到大
                List<String>        children = getSortedChildren();
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
                // 返回当前节点或者等待节点的上一个节点
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    // 上一个节点路径信息
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try
                        {
                            //设置监听器,getData会判读前一个节点是否存在,不存在就会抛出异常从而不会设置监听器
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            if ( millisToWait != null )
                            {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if ( millisToWait <= 0 )
                                {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }
                                // 等待一段时间被唤醒
                                wait(millisToWait);
                            }
                            else
                            {
                                // 一直等待被唤醒
                                wait();
                            }
                        }
                        catch ( KeeperException.NoNodeException e )
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        }
        finally
        {
            if ( doDelete )
            {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }
public void release() throws Exception
    {
        Thread currentThread = Thread.currentThread();
        LockData lockData = threadData.get(currentThread);
        if ( lockData == null )
        {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }
        // LockData当前线程可重入value--1
        int newLockCount = lockData.lockCount.decrementAndGet();
        if ( newLockCount > 0 )
        {
            return;
        }
        if ( newLockCount < 0 )
        {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try
        {   
            // value == 0 的时候才真正删除临时节点
            internals.releaseLock(lockData.lockPath);
        }
        finally
        {
            threadData.remove(currentThread);
        }
    }

    final void releaseLock(String lockPath) throws Exception
    {
        // 一处所有监听者
        client.removeWatchers();
        revocable.set(null);
        // 删除临时节点
        deleteOurPath(lockPath);
    }

 private void deleteOurPath(String ourPath) throws Exception
    {
        try
        {
            client.delete().guaranteed().forPath(ourPath);
        }
        catch ( KeeperException.NoNodeException e )
        {
            // ignore - already deleted (possibly expired session, etc.)
        }
    }

分布式锁各自有何优势

基于Redis的分布式锁,适用于并发量很大、性能要求很高的、而可靠性问题可以通过其他方案去弥补的场景。
基于zk的分布式锁,适用于高可靠(高可用)而并发量不是太大的场景;因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。大家知道,ZK中创建和删除节点只能通过Leader服务器来执行,然后Leader服务器还需要将数据同不到所有的Follower机器上,这样频繁的网络通信,性能的短板是非常突出的。
而数据库来实现的分布式锁,受制于连接池资源、无锁失效机制、单点等因素,在并发量较低可靠性不那么强的时候也可以用。

分布式锁没有绝对的可靠性,只能通过人为补偿机制竟可能的提升锁可靠性。


https://www.xamrdz.com/mobile/4xc1996464.html

相关文章: