分布式锁的实现方式
分布式锁的特点
分布式锁的特点:
- 排他性:保证在分布式部署、服务集群的环境下,共享的资源在同一时间只能被同一台机器上的一个线程执行。
- 避免死锁:在获取一段时间后,一定会被释放(正常情况或者异常情况)。
- 高可用:获取活释放的机制必须高可用而且性能高。
- 可重入:当前机器的线程如果没有获取到,那么在等待一定的时间后要保证被获取到。
实现分布式锁的几种方式
实现分布式锁的几种方式:
- 基于数据库级别的:乐观锁、悲观锁
- 基于Redis原子操作:SETNX与EXPIRE原子操作
- 基于Zookeeper的互斥排他锁:创建临时节点+Watcher机制
- 基于开源Redisson分布式锁
- 基于ETCD的分布式锁
分布式锁的应用场景:防止重复提交、商场高并发抢单。
基于数据库级别
在数据库中存在乐观锁、悲观锁,乐观锁是只在需要的时候用,给信息添加相应的版本号来控制;悲观锁是每次都要使用锁,使用select .. for update ..
使用事务,来保证可以原子操作。
乐观锁
<update id="updateByPKVersion">
update user_account set amount = amount - #{money},version=version+1
where id = #{id} and version=#{version} and amount >0 and (amount - #{money})>=0
</update>
悲观锁
使用select...for update 来实现事务
<!--根据用户id查询-用于悲观锁-->
<select id="selectByUserIdLock" resultType="com.edu.middleware.lock.entity.UserAccount">
SELECT
<include refid="Base_Column_List"/>
FROM user_account
WHERE user_id=#{userId} FOR UPDATE
</select>
<!--根据主键id更新账户余额-悲观锁的方式-->
<update id="updateAmountLock">
UPDATE user_account SET amount = amount - #{money}
WHERE is_active=1 AND id=#{id} and amount >0 and (amount - #{money})>=0
</update>
基于Redis实现分布式锁
基于Redis实现分布式锁分为三步:1.构建共享资源KEY;2.使用SETNX和EXPIRE获取到锁;3.释放锁
SETNX命令获取锁时,如果返回的是0,表示获取锁失败,否则表示获取成功,为了防止进入死锁状态,为KEY设置一个过期时间,在释放锁时,执行del操作。
/**
* 处理用户提交注册的请求-加分布式锁
*
* @param dto
* @throws Exception
*/
@Override
public void userRegWithLock(UserRegDto dto) throws Exception {
//精心设计并构造SETNX操作中的Key-一定要跟实际的业务或共享资源挂钩
final String key = dto.getUserName() + "-lock";
//设计Key对应的Value
//为了具有随机性,在这里采用系统提供的纳秒级别的时间戳 + UUID生成的随机数作为Value
final String value = System.nanoTime() + "" + UUID.randomUUID();
//获取操作Key的ValueOperations实例
ValueOperations valueOperations = stringRedisTemplate.opsForValue();
//调用SETNX操作获取锁,如果返回true,则获取锁成功
//代表当前的共享资源还没被其他线程所占用
Boolean res = valueOperations.setIfAbsent(key, value);
//返回true,即代表获取到分布式锁
if (res) {
//为了防止出现死锁的状况,加上EXPIRE操作,即Key的过期时间,在这里设置为20s
//具体应根据实际情况而定
stringRedisTemplate.expire(key, 20L, TimeUnit.SECONDS);
try {
//根据用户名查询用户实体信息
UserReg reg = userRegMapper.selectByUserName(dto.getUserName());
//如果当前用户名还未被注册,则将当前用户信息注册入数据库中
if (reg == null) {
log.info("---加了分布式锁---,当前用户名为:{} ", dto.getUserName());
//创建用户注册实体信息
UserReg entity = new UserReg();
//将提交的用户注册请求实体信息中对应的字段取值
//复制到新创建的用户注册实体的相应字段中
BeanUtils.copyProperties(dto, entity);
//设置注册时间
entity.setCreateTime(new Date());
//插入用户注册信息
userRegMapper.insertSelective(entity);
} else {
//如果用户名已被注册,则抛出异常
throw new Exception("用户信息已经存在!");
}
} catch (Exception e) {
throw e;
} finally {
//不管发生任何情况,都需要在redis加锁成功并访问操作完共享资源后释放锁
if (value.equals(valueOperations.get(key).toString())) {
stringRedisTemplate.delete(key);
}
}
}
}
基于Zookeeper实现分布式锁
Zookeeper的作用:
- 统一配置管理:将每个子系统都需要配置的文件统一放置到Zookeeper的ZNode节点中
- 统一命名:通过给ZNode进行统一命名,各个子系统便可以通过名字获取到节点上相应的资源
- 分布式锁:通过创建与该共康资源相关的顺序临时节点与动态监听机制,从而控制多线程对共享资源的并发访问
- 集群状态:通过动态感知节点的增加、删除、从而保证集群下的相关节点数据主副本的一致性
Zookeeper的功能特性:
- 顺序一致性:从同一客户端发起的事务请求,最终将会严格地按照顺序被应用到Zookeeper中
- 原子性:所有事务请求的处理结果在整个集群中所有机器上的应用情况是一致的。
- 单一系统影响:无论客户端连接到哪一个Zookeeper服务器上,其看到的服务端数据模型都是一致的
- 可靠性:一旦一次更改请求被应用,更改结果就会被持久化,直到被下一次更改覆盖。
创建临时顺序节点以及采用Watcher监听机制监听临时节点的增减,来判断当前线程是否可以成功获取到锁。
/**
* 处理用户提交注册的请求-加ZooKeeper分布式锁
*
* @param dto
* @throws Exception
*/
@Override
public void userRegWithZKLock(UserRegDto dto) throws Exception {
//创建ZooKeeper互斥锁组件实例,需要将监控用的客户端实例、精心构造的共享资源 作为构造参数
InterProcessMutex mutex = new InterProcessMutex(client, pathPrefix + dto.getUserName() + "-lock");
try {
//采用互斥锁组件尝试获取分布式锁-其中尝试的最大时间在这里设置为10s
//当然,具体的情况需要根据实际的业务而定
if (mutex.acquire(10L, TimeUnit.SECONDS)) {
//TODO:真正的核心处理逻辑
//根据用户名查询用户实体信息
UserReg reg = userRegMapper.selectByUserName(dto.getUserName());
//如果当前用户名还未被注册,则将当前用户信息注册入数据库中
if (reg == null) {
log.info("---加了ZooKeeper分布式锁---,当前用户名为:{} ", dto.getUserName());
//创建用户注册实体信息
UserReg entity = new UserReg();
//将提交的用户注册请求实体信息中对应的字段取值
//复制到新创建的用户注册实体的相应字段中
BeanUtils.copyProperties(dto, entity);
//设置注册时间
entity.setCreateTime(new Date());
//插入用户注册信息
userRegMapper.insertSelective(entity);
} else {
//如果用户名已被注册,则抛出异常
throw new Exception("用户信息已经存在!");
}
} else {
throw new RuntimeException("获取ZooKeeper分布式锁失败!");
}
} catch (Exception e) {
throw e;
} finally {
//TODO:不管发生何种情况,在处理完核心业务逻辑之后,需要释放该分布式锁
mutex.release();
}
}
除了使用互斥锁,可以使用临时节点来解决多重复提交的问题。
基于Redisson实现分布式锁
@Override
@Transactional(rollbackFor = Exception.class)
public void robWithRedisson(BookRobDto dto) throws Exception {
final String lockName = "redissonTryLock-" + dto.getBookNo() + "-" + dto.getUserId();
/**
* 获取锁对象
*/
RLock lock = redissonClient.getLock(lockName);
try {
Boolean result = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (result) {
//TODO:真正的核心处理逻辑
//根据书籍编号查询记录
BookStock stock = bookStockMapper.selectByBookNo(dto.getBookNo());
//统计每个用户每本书的抢购数量
int total = bookRobMapper.countByBookNoUserId(dto.getUserId(), dto.getBookNo());
//商品记录存在、库存充足,而且用户还没抢购过本书,则代表当前用户可以抢购
if (stock != null && stock.getStock() > 0 && total <= 0) {
//当前用户抢购到书籍,库存减一
int res = bookStockMapper.updateStockWithLock(dto.getBookNo());
//如果允许商品超卖-达成饥饿营销的目的,则可以调用下面的方法
//int res=bookStockMapper.updateStock(dto.getBookNo());
//更新库存成功后,需要添加抢购记录
if (res > 0) {
//创建书籍抢购记录实体信息
BookRob entity = new BookRob();
//将提交的用户抢购请求实体信息中对应的字段取值
//复制到新创建的书籍抢购记录实体的相应字段中
entity.setBookNo(dto.getBookNo());
entity.setUserId(dto.getUserId());
//设置抢购时间
entity.setRobTime(new Date());
//插入用户注册信息
bookRobMapper.insertSelective(entity);
log.info("---处理书籍抢购逻辑-加Redisson分布式锁---,当前线程成功抢到书籍:{} ", dto);
}
} else {
//如果不满足上述的任意一个if条件,则抛出异常
throw new Exception("该书籍库存不足!");
}
} else {
throw new Exception("----获取Redisson分布式锁失败!----");
}
} catch (Exception e) {
throw e;
} finally {
//TODO:不管发生何种情况,在处理完核心业务逻辑之后,需要释放该分布式锁
if (lock != null) {
lock.unlock();
//在某些严格的业务场景下,也可以调用强制释放分布式锁的方法
//lock.forceUnlock();
}
}
}
参考文献
(分布式中间件技术实战(Java版).钟林森.机械工业出版社)