当前位置: 首页>后端>正文

redis分布式锁?原来就这么简单

1 前言

现在大多互联网公司的服务都是集群架构,单节点的时代已经称为了过去。同时也带来了各式各样的问题,而事务管理就是其中之一。比如:扣减库存,送完即止活动,甚至中台的批量导入新增/修改都需要去考虑并发问题。这时我们就需要考虑怎么解决这并发问题了,解决方案之一就是分布式锁。

2 分布式锁的技术选型

分布式锁就是一个概念,实现方式并不是唯一固定的,其中主要的思路是要找一个可以实现排他锁的中间件。满足这个条件的技术栈就很多了,这里提几个我们常接触,实现起来也比较简单的技术栈:

  • MySQL
  • Zookeeper
  • Redis

2.1 MySQL

优点:

  1. 所有系统基本都用到了 MySQL(当然有些系统可能是用了 Oracle 又或者别的 DB,我这里就以 MySQL 举例了)。使用它,可以少引入其他中间件,避免增加系统架构的复杂度
  2. 实现简单

缺点:

  1. 不能接住较高的并发

2.2 ZooKeeper

优点:

  1. 吞吐量相对于 MySQL 而言更高

缺点:

  1. 相信很多人都只是那它作为注册中心使用,它的其他功能估计没怎么用过,有一定的学习成本
  2. 现在它的社区活跃度也没有之前那么高了,后面会不会淘汰也难说

2.3 Redis

优点:

  1. 相比前面的,它可以接收的并发量是最高的
  2. 大家实际工作中用的应该也是蛮多的,群众基础高,学习成本相对较低
  3. 已经很多成熟的实现方案,可以大大降低踩雷的概率

缺点:

  1. 需要更多的服务器成本

3 分布式锁逻辑流程图

redis分布式锁?原来就这么简单,第1张
redis分布式锁实现流程图.png

4 代码实现

工具实现代码

package com.example.demo.utils;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author CaiZhuliang
 * @date 2023/8/31
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisSimpleLockUtil {
    private static final ScheduledExecutorService EXECUTOR_SERVICE = new ScheduledThreadPoolExecutor(50,
            new BasicThreadFactory.Builder()
                    .namingPattern("redisSimpleLockUtil-schedule-pool-%d")
                    .daemon(true)
                    .build());

    private static final ThreadLocal<String> THREAD_LOCAL_TOKEN = new ThreadLocal<>();
    private static final String SUCCESS = "1";
    /**
     * 允许当前token续约
     */
    private static final Integer CAN_RENEW = 0;
    /**
     * 记录token的状态,0-可以续约,其他情况均不能续约
     */
    private static final Map<String, Integer> TOKEN_STATUS = Maps.newConcurrentMap();

    private final RedisTemplate<String, String> redisTemplate;

    /**
     * 释放锁
     * <p>必须和 RedisSimpleLockUtil#lock 是同一个线程</p>
     * @param key key 需要释放锁的key
     * @param token 持有的令牌
     * @return true-成功 false-失败
     */
    public boolean releaseLock(String key, String token) {
        if (StringUtils.isBlank(token)) {
            return false;
        }
        TOKEN_STATUS.put(token, 1);
        try {
            String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
                    "then redis.call('expire', KEYS[1], 0) return '1' end " +
                    "return '0'";
            DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
            String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token);
            log.info("非cluster模式简单分布式锁 - 释放key: {}, result : {}, token : {}", key, result, token);
            return SUCCESS.equals(result);
        } catch (Exception e) {
            log.error("非cluster模式简单分布式锁 - 释放锁发生异常,key : {}", key, e);
            return false;
        } finally {
            THREAD_LOCAL_TOKEN.remove();
            TOKEN_STATUS.remove(token);
        }
    }

    /**
     * 简单分布式锁实现,续约周期是 expireTime 的一半。举个例子, expireTime = 8000,那么锁续约将会是每 4000 毫秒续约一次
     * <p>这个方法不考虑redis的集群架构,不考虑脑裂问题,当只有一个redis来考虑。</p>
     * @param key 需要上锁的key
     * @param expireTime 过期时间,单位:毫秒
     * @return 上锁成功返回令牌,失败则返回空串
     */
    public String lock(String key, Long expireTime) {
        if (StringUtils.isBlank(key)) {
            log.warn("非cluster模式简单分布式锁 - key is blank");
            return StringUtils.EMPTY;
        }
        if (null == expireTime || expireTime <= 0) {
            expireTime = 0L;
        }
        // 续约周期,单位纳秒
        long renewPeriod = expireTime * 500_000;
        try {
            String token = System.currentTimeMillis() + ":" + UUID.randomUUID();
            // 设置锁
            Boolean result = redisTemplate.opsForValue().setIfAbsent(key, token, expireTime, TimeUnit.MILLISECONDS);
            if (Boolean.FALSE.equals(result)) {
                return StringUtils.EMPTY;
            }
            log.info("非cluster模式简单分布式锁 - 上锁成功,key : {}, token : {}", key, token);
            // 上锁成功后将令牌绑定当前线程
            THREAD_LOCAL_TOKEN.set(token);
            TOKEN_STATUS.put(token, 0);
            if (renewPeriod > 0) {
                // 续约任务
                log.info("非cluster模式简单分布式锁 - 添加续约任务,key : {}, token : {}, renewPeriod : {}纳秒", key, token, renewPeriod);
                renewTask(key, token, expireTime, renewPeriod);
            }
            return token;
        } catch (Exception e) {
            String token = THREAD_LOCAL_TOKEN.get();
            log.error("非cluster模式简单分布式锁 - 上锁发生异常,key : {}, token : {}", key, token, e);
            return StringUtils.isBlank(token) StringUtils.EMPTY : token;
        }
    }

    /**
     * 锁续约任务
     * @param key 需要续命的key
     * @param token 成功获锁的线程持有的令牌
     * @param expireTime 过期时间,单位:毫秒
     * @param renewPeriod 续约周期,单位:纳秒
     */
    private void renewTask(String key, String token, long expireTime, long renewPeriod) {
        try {
            EXECUTOR_SERVICE.schedule(() -> {
                try {
                    String lua = "if (redis.call('get', KEYS[1]) == ARGV[1]) " +
                            "then " +
                            "  if (redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2])) " +
                            "  then return '1' else return redis.call('get', KEYS[1]) end " +
                            "end " +
                            "return redis.call('get', KEYS[1])";
                    DefaultRedisScript<String> luaScript = new DefaultRedisScript<>(lua, String.class);
                    String result = redisTemplate.execute(luaScript, Lists.newArrayList(key), token, String.valueOf(expireTime));
                    if (SUCCESS.equals(result)) {
                        // 续约成功
                        log.debug("非cluster模式简单分布式锁 - 锁续约成功,key : {}, token : {}", key, token);
                        // 这里加判断是为了减少定时任务
                        if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
                            // 开启下一次续约任务
                            renewTask(key, token, expireTime, renewPeriod);
                        }
                    } else {
                        // 这里加判断是为了防止误打印warn日志
                        if (CAN_RENEW.equals(TOKEN_STATUS.get(token))) {
                            log.warn("非cluster模式简单分布式锁 - 锁续约失败,key : {}, token : {}, result : {}", key, token, result);
                        }
                    }
                } catch (Exception e) {
                    // 这里异常是抛不出去的,所以需要 catch 打印
                    log.error("非cluster模式简单分布式锁 - 锁续约发生异常,key : {}, token : {}", key, token, e);
                }
            }, renewPeriod, TimeUnit.NANOSECONDS);
        } catch (Exception e) {
            log.error("非cluster模式简单分布式锁 - 添加锁续约任务发生异常,key : {}, token : {}", key, token, e);
        }
    }
}

下面是并发单元测试代码

    @Test
    public void concurrencyTest() {
        String[] nums = {"1", "2", "3", "4", "5"};
        List<CompletableFuture<Void>> list = Lists.newArrayListWithExpectedSize(100);
        for (int i = 0; i < 50; i++) {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                for (int count = 0; count < 10; count++) {
                    int random = new Random().nextInt(100) % 5;
                    String key = "test_" + nums[random];
                    while (true) {
                        String token = redisSimpleLockUtil.lock(key, 3_000L);
                        if (StringUtils.isNotBlank(token)) {
                            log.info("concurrencyTest - key : {}", key);
                            try {
                                Thread.sleep(new Random().nextInt(1500));
                            } catch (Exception e) {
                                log.error("concurrencyTest - 发生异常, key : {}", key, e);
                            } finally {
                                boolean unlock = redisSimpleLockUtil.releaseLock(key, token);
                                if (!unlock) {
                                    log.error("concurrencyTest - 释放锁失败,key : {}", key);
                                }
                            }
                            break;
                        }
                    }
                }
            });
            list.add(future);
        }
        CompletableFuture<?>[] futures = new CompletableFuture[list.size()];
        list.toArray(futures);
        CompletableFuture.allOf(futures).join();
    }

5 Redlock

一般公司使用 redis 时都不可能是单节点的,要么主从+哨兵架构,要么就是 cluster 架构。面对集群,我们不得不思考如何应对网络分区导致的脑裂问题。而 Redlock 是redis官方网站给出解决方案

脑裂问题,redis 官方给出的解决方案是 Redlock。
核心是:上锁需要在集群中半数以上的 master 操作成功了才算成功。

红锁是通过过半原则来规避脑裂,但是这就让我们不得不考虑访问节点的等待超时时间应该要多长。而且,也会降低 redis 分布式锁的吞吐量。如果有半数节点不可用,那么分布式锁也将变得不可用。因此,实际使用中我们还要结合自己实际的业务场景来权衡要不要用红锁或者修改实现方案

6 第三方工具

上面的示例代码只是用于学习探讨,其实我们不需要重复造轮子。如果没有什么特殊的场景,需要自己定制的话,那可以考虑一些已有的框架,比如:redisson。


https://www.xamrdz.com/backend/3rh1923415.html

相关文章: