当前位置: 首页>后端>正文

Redis 分布式锁的实现

目录结构:

  • [前言]
  • [可靠性]
  • [代码实现]
    • [组件依赖]
    • [加锁代码]
    • [解锁代码]
  • [总结]
  • [参考阅读]

一、前言

1.1 实现方式

分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于Redis的分布式锁;3. 基于ZooKeeper的分布式锁。本篇博客将介绍第二种方式,基于Redis实现分布式锁。虽然网上已经有各种介绍Redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本文将详细介绍如何正确地实现Redis分布式锁。
Springboot集成Redis参考:https://www.cnblogs.com/zwqh/p/11664782.html,我们使用Springboot2.0默认的Lettuce连接。

1.2 使用场景

参考:https://blog.csdn.net/chunqiu3351/article/details/100762270
分布式锁的思路是:
在整个系统提供一个全局、唯一的获取锁的“东西”,然后每个系统在需要加锁时,都去问这个“东西”拿到一把锁,这样不同的系统拿到的就可以认为是同一把锁。

二、可靠性

首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

  1. 互斥性。在任意时刻,保证只有一台机器的一个线程可以持有锁。
  2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  3. 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
  4. 具备非阻塞性。一旦获取不到锁就立刻返回加锁失败。
  5. 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

三、代码实现

3.1 组件依赖

我们使用的是Springboot2.3.0版本,首先我们要通过Maven引入spring-boot-starter-data-redis操作Redis,在pom.xml文件加入下面的代码:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

3.2 加锁代码

正确姿势

Talk is easy, show me the codes first。先展示代码,再带大家慢慢解释为什么这样实现:

    /**
     * 获取锁
     *
     * @param lockKey    锁
     * @param identity   身份标识(保证锁不会被其他人释放)
     * @param expireTime 锁的过期时间(单位:秒)
     * @return
     */
    public boolean lock(String lockKey, String identity, long expireTime) {
        boolean lockResult = redisTemplate.opsForValue().setIfAbsent(lockKey, identity, expireTime, TimeUnit.SECONDS);
        return lockResult;
    }

可以看到,我们加锁就一行代码:redisTemplate.opsForValue().setIfAbsent(lockKey, identity, expireTime, TimeUnit.SECONDS),这个setIfAbsent()方法一共有4个形参:

  1. 第一个为key,我们使用key来当锁,因为key是唯一的,一个资源对应一个唯一的key。
  2. 第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成。
  3. 第三个参数expireTime为过期时间,此参数保证程序加锁后崩溃导致不能主动释放锁的时候自动释放锁,防止出现死锁。
  4. 第四个参数,锁的过期时间(单位:秒)。

为什么使用setIfAbsent方法呢?这个方法的好处就是,如果redis中已经存在这个key了,就会返回失败,并且不改变redis中的数据,这样就不会把别的线程的加的锁给覆盖掉。
总的来说,执行上面的setIfAbsent方法就只会导致两种结果:1. 当前没有锁(key不存在),那么就进行加锁操作,并对锁设置个有效期,同时value表示加锁的客户端。2. 已有锁存在,不做任何操作。

会发现,我们的加锁代码满足我们可靠性里描述的三个条件。首先,setIfAbsent()可以保证如果已有key存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。其次,由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即key被删除),不会发生死锁。最后,因为我们将value赋值为requestId,代表加锁的客户端请求标识,那么在客户端在解锁的时候就可以进行校验是否是同一个客户端。由于我们只考虑Redis单机部署的场景,所以容错性我们暂不考虑。

3.2.1 错误示例1

比较常见的错误示例就是使用jedis.setnx()和jedis.expire()组合实现加锁,代码如下:

public static void wrongGetLock1(Jedis jedis, String lockKey, String requestId, int expireTime) {

    Long result = jedis.setnx(lockKey, requestId);
    if (result == 1) {
        // 若在这里程序突然崩溃,则无法设置过期时间,将发生死锁
        jedis.expire(lockKey, expireTime);
    }

}

setnx()方法作用就是SET IF NOT EXIST,expire()方法就是给锁加一个过期时间。乍一看好像和前面的set()方法结果一样,然而由于这是两条Redis命令,不具有原子性,如果程序在执行完setnx()之后突然崩溃,导致锁没有设置过期时间。那么将会发生死锁。网上之所以有人这样实现,是因为低版本的jedis并不支持多参数的set()方法。

3.2.2 错误示例2

这一种错误示例就比较难以发现问题,而且实现也比较复杂。实现思路:使用jedis.setnx()命令实现加锁,其中key是锁,value是锁的过期时间。执行过程:1. 通过setnx()方法尝试加锁,如果当前锁不存在,返回加锁成功。2. 如果锁已经存在则获取锁的过期时间,和当前时间比较,如果锁已经过期,则设置新的过期时间,返回加锁成功。代码如下:

public static boolean wrongGetLock2(Jedis jedis, String lockKey, int expireTime) {

    long expires = System.currentTimeMillis() + expireTime;
    String expiresStr = String.valueOf(expires);

    // 如果当前锁不存在,返回加锁成功
    if (jedis.setnx(lockKey, expiresStr) == 1) {
        return true;
    }

    // 如果锁存在,获取锁的过期时间
    String currentValueStr = jedis.get(lockKey);
    if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
        // 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间
        String oldValueStr = jedis.getSet(lockKey, expiresStr);
        if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
            // 考虑多线程并发的情况,只有一个线程的设置值和当前值相同,它才有权利加锁
            return true;
        }
    }

    // 其他情况,一律返回加锁失败
    return false;

}

那么这段代码问题在哪里?1. 由于是客户端自己生成过期时间,所以需要强制要求分布式下每个客户端的时间必须同步。 2. 当锁过期的时候,如果多个客户端同时执行jedis.getSet()方法,那么虽然最终只有一个客户端可以加锁,但是这个客户端的锁的过期时间可能被其他客户端覆盖。3. 锁不具备拥有者标识,即任何客户端都可以解锁。

3.3 解锁代码

正确姿势

还是先展示代码,再带大家慢慢解释为什么这样实现:

    /**
     * 释放锁
     *
     * @param lockKey  锁
     * @param identity 身份标识(保证锁不会被其他人释放)
     * @return
     */
    public Object releaseLock(String lockKey, String identity) {
        String luaScript = "if " +
                "  redis.call('get', KEYS[1]) == ARGV[1] " +
                "then " +
                "  return redis.call('del', KEYS[1]) " +
                "else " +
                "  return 0 " +
                "end";
        DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
        redisScript.setResultType(Boolean.class);
        redisScript.setScriptText(luaScript);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
        Object result = redisTemplate.execute(redisScript, keys, identity);
        return (boolean) result;
    }

我们写了一个简单的Lua脚本代码。将Lua代码传到redisTemplate.execute()方法里,并使参数KEYS[1]赋值为keys,ARGV[1]赋值为identity。execute()方法是将Lua代码交给Redis服务端执行。

那么这段Lua代码的功能是什么呢?其实很简单,首先获取锁对应的value值,检查是否与identity相等,如果相等则删除锁(解锁)。那么为什么要使用Lua语言来实现呢?因为要确保上述操作是原子性的。关于非原子性会带来什么问题,可以阅读【解锁代码-错误示例2】 。那么执行execute()方法可以确保原子性,源于Redis的特性,下面是官网对execute命令的部分解释。

Redis 分布式锁的实现,第1张
Lua脚本原子性说明

简单来说,就是在execute命令执行Lua代码的时候,Lua代码将被当成一个命令去执行,并且直到execute命令执行完成,Redis才会执行其他命令。

3.3.1 错误示例1

最常见的解锁代码就是直接使用jedis.del()方法删除锁,这种不先判断锁的拥有者而直接解锁的方式,会导致任何客户端都可以随时进行解锁,即使这把锁不是它的。

public static void wrongReleaseLock1(Jedis jedis, String lockKey) {
    jedis.del(lockKey);
}

3.3.2 错误示例2

这种解锁代码乍一看也是没问题,甚至我之前也差点这样实现,与正确姿势差不多,唯一区别的是分成两条命令去执行,代码如下:

public static void wrongReleaseLock2(Jedis jedis, String lockKey, String requestId) {

    // 判断加锁与解锁是不是同一个客户端
    if (requestId.equals(jedis.get(lockKey))) {
        // 若在此时,这把锁突然不是这个客户端的,则会误解锁
        jedis.del(lockKey);
    }

}

如代码注释,问题在于如果调用jedis.del()方法的时候,这把锁已经不属于当前客户端的时候会解除他人加的锁。那么是否真的有这种场景?答案是肯定的,比如客户端A加锁,一段时间之后客户端A解锁,在执行jedis.del()之前,锁突然过期了,此时客户端B尝试加锁成功,然后客户端A再执行del()方法,则将客户端B的锁给解除了。

四、 application.properties

redis配置信息:

spring.application.name=redis-distributed-lock
# 应用服务 WEB 访问端口
server.port=8080
################ Redis 基础配置 ##############
# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis服务器地址
spring.redis.host=127.0.0.1
# Redis服务器连接端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 链接超时时间 单位 ms(毫秒)
spring.redis.timeout=3000
################ Redis 线程池设置 ##############
# 连接池最大连接数(使用负值表示没有限制) 默认 8
spring.redis.lettuce.pool.max-active=8
# 连接池最大阻塞等待时间(使用负值表示没有限制) 默认 -1
spring.redis.lettuce.pool.max-wait=-1
# 连接池中的最大空闲连接 默认 8
spring.redis.lettuce.pool.max-idle=8
# 连接池中的最小空闲连接 默认 0
spring.redis.lettuce.pool.min-idle=0

RedisLockTool.java

package com.erbadagang.springboot.redisdistributedlock.tool;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @description Redis分布式锁实现工具类。
 * @ClassName: RedisLockTool
 * @author: 郭秀志 jbcode@126.com
 * @date: 2020/8/2 16:29
 * @Copyright:
 */
@Component
public class RedisLockTool {
    @Autowired
    RedisTemplate redisTemplate;

    /**
     * 获取锁
     *
     * @param lockKey    锁
     * @param identity   身份标识(保证锁不会被其他人释放)
     * @param expireTime 锁的过期时间(单位:秒)
     * @return
     */
    public boolean lock(String lockKey, String identity, long expireTime) {
        boolean lockResult = redisTemplate.opsForValue().setIfAbsent(lockKey, identity, expireTime, TimeUnit.SECONDS);
        return lockResult;
    }

    /**
     * 释放锁
     *
     * @param lockKey  锁
     * @param identity 身份标识(保证锁不会被其他人释放)
     * @return
     */
    public boolean releaseLock(String lockKey, String identity) {
        String luaScript = "if " +
                "  redis.call('get', KEYS[1]) == ARGV[1] " +
                "then " +
                "  return redis.call('del', KEYS[1]) " +
                "else " +
                "  return 0 " +
                "end";
        DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
        redisScript.setResultType(Boolean.class);
        redisScript.setScriptText(luaScript);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
        Object result = redisTemplate.execute(redisScript, keys, identity);
        return (boolean) result;
    }
}

RedisLockController.java

package com.erbadagang.springboot.redisdistributedlock.controller;

import com.erbadagang.springboot.redisdistributedlock.tool.RedisLockTool;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @description 分布式锁的测试controller,加锁和释放锁2个url。
 * @ClassName: RedisLockController
 * @author: 郭秀志 jbcode@126.com
 * @date: 2020/8/2 16:31
 * @Copyright:
 */
@RestController
@RequestMapping("distributedLock/")
public class RedisLockController {
    //锁标识key的前缀,后面加上自己的需加锁业务的资源标识码。
    private final String lockPreKey = "DistributedLockKey";

    @Autowired
    private RedisLockTool redisLock;

    /**
     * 测试加锁
     *
     * @param id       加锁的资源id
     * @param identity 身份标识
     * @return
     */
    @GetMapping("lock")
    public String lock(@RequestParam("id") String id,
                       @RequestParam("identity") String identity) {
        String lockKey = lockPreKey + ":" + id;
        boolean lockSuccess = redisLock.lock(lockKey, identity, 60);
        String result = "lock failed";
        if (lockSuccess) {
            result = "lock success";
        }
        return result;
    }

    /**
     * 测试释放锁
     *
     * @param id       释放锁的资源id
     * @param identity 身份标识
     * @return
     */
    @GetMapping("release")
    public String release(@RequestParam("id") String id,
                          @RequestParam("identity") String identity) {
        String lockKey = lockPreKey + ":" + id;
        boolean releaseSuccess = redisLock.releaseLock(lockKey, identity);
        String result = "release failed";
        if (releaseSuccess) {
            result = "release success";
        }
        return result;
    }
}

实际使用中,identity可以使用String identity = UUID.randomUUID().toString();生成来保证唯一性。

五、测试

启动Redis和Springboot服务。

  1. 加锁功能:
    访问http://localhost:8080/distributedLock/lock?id=5&identity=erbadagang,返回lock success
  2. 再次访问上面链接http://localhost:8080/distributedLock/lock?id=5&identity=erbadagang,返回lock failed,说明第一次加的锁还存在。
  3. 修改参数id值为6来生成新的key,访问http://localhost:8080/distributedLock/lock?id=6&identity=erbadagang,返回lock success。不同的key代表不同的锁,互不冲突。
  4. 先加锁,访问http://localhost:8080/distributedLock/lock?id=6&identity=erbadagang,再释放锁,访问http://localhost:8080/distributedLock/release?id=6&identity=erbadagang,返回release success
  5. 如果锁不存在(没创建或者已经释放一次),进行释放锁,返回release failed

总结

本文主要介绍了如何使用Java代码正确实现Redis分布式锁,对于加锁和解锁也分别给出了两个比较经典的错误示例。其实想要通过Redis实现分布式锁并不难,只要保证能满足可靠性里的四个条件。互联网虽然给我们带来了方便,只要有问题就可以Baidu,然而网上的答案一定是对的吗?其实不然,所以我们更应该时刻保持着质疑精神,多想多验证。

如果你的项目中Redis是多机部署的,那么可以尝试使用Redisson实现分布式锁,这是Redis官方提供的Java组件。

此外,实现Redis的分布式锁,除了自己基于redis client原生api来实现之外,还可以使用开源框架:Redission。Redisson是一个企业级的开源Redis Client,也提供了分布式锁的支持。我也非常推荐大家使用,为什么呢?
回想一下上面说的,如果自己写代码来通过redis设置一个值,是通过下面这个命令设置的。
SET anyLock unique_value NX PX 30000
这里设置的超时时间是30s,假如我超过30s都还没有完成业务逻辑的情况下,key会过期,其他线程有可能会获取到锁。
这样一来的话,第一个线程还没执行完业务逻辑,第二个线程进来了也会出现线程安全问题。所以我们还需要额外的去维护这个过期时间,太麻烦了~

底线

本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。


https://www.xamrdz.com/backend/3j21921328.html

相关文章: