当前位置: 首页>后端>正文

spring batch 分布式执行 spring 分布式锁

**目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题就存在问题,解决方案分布式锁。**

下面介绍两种首先分布式锁的方案:

1. **基于Spring Integration实现分布式锁**

2. **基于redisson实现分布式锁**

优缺点:第一种引入简单,使用方便,但只支持重入锁。第二种较第一种麻烦一点点,但支持重入锁、公平锁、读锁、写锁等多种类型。

**第一种方案**:提供的全局锁目前为以下存储提供了实现

Gemfire

JDBC

Redis

Zookeeper

因为第二种方案基于redis存储,为了方便该方案讲解选择redis作为存储。同时项目都采用springboot方式。

1. 首先创建springboot项目,添加基本依赖及redis、integration-redis依赖。如果选择其他存储方式,添加对应的integration依赖即可,无需更改业务,非常方便。

```

<!--integration-->
 <dependency>
 <groupId>org.springframework.integration</groupId>
 <artifactId>spring-integration-redis</artifactId>
 </dependency>
 <!--redis-->
 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-data-redis</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.session</groupId>
 <artifactId>spring-session-data-redis</artifactId>
 </dependency>
 <dependency>
 <groupId>org.redisson</groupId>
 <artifactId>redisson-spring-boot-starter</artifactId>
 <version>3.9.1</version>
 <exclusions>
 <exclusion>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-web</artifactId>
 </exclusion>
 </exclusions>
 </dependency>
```
2. 添加yml配置
```
spring:
 http:
 encoding:
 charset: UTF-8
 force: true
 enabled: true
 server:
 port: 8080
 mvc:
 static-path-pattern: /**
 resources:
 static-locations: classpath:/static/
 redis:
 database: 1
 host: xx.xx.xx.xx
 port: 6379
 password: xxx
 jedis:
 pool:
 max-active: 8
 max-idle: 8
 min-idle: 0
 redisson:
 address: "xxxx"
 password: xxx
```
3. 创建config配置,RedisLockRegistry的第三个参数设置为上锁以后xxx秒自动解锁的时间,默认60s。
```java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.integration.redis.util.RedisLockRegistry;
/**
 * 描述:spring-integration-redis锁配置
 *
 * @Auther: gc.x
 * @Date: 2019/10/28
 */
@Configuration
public class RedisLockConfiguration {
 @Bean
 public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory){
 return new RedisLockRegistry(redisConnectionFactory,"spring-integration-redis",60000L);
 }
}
```
4. 选择基于注解的形式加锁,先自定义锁注解,然后创建对应的切面
**@interface**
```java
/**
 * 描述: 基于spring-integration-redis实现分布式锁
 * 只支持重入锁ReentrantLock
 * @Auther: gc.x
 * @Ddate:2019/10/28
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockDistributed {
 /**
 * 锁的名称
 * @return
 */
 String name() default "";
 /**
 * 尝试加锁 最多等待时间
 * @return
 */
 int time() default 30;
 /**
 * 自定义业务key
 * @return
 */
 String [] keys() default {};
 /**
 * 上锁以后xxx秒自动解锁
 *
 */
 /**
 *上锁以后xxx秒自动解锁
 * long leaseTime();
 * 在RedisLockConfiguration里面配置,如果要使用该参数,需要在切面里面new RedisLockRegistry
 * @return
 */
}
```
**Aspect**
 主要就是拦截自定义的注解,获取数据生成lockname,及竞争锁时间,
 redisLockRegistry.obtain方法获取锁,
 lock.tryLock方法竞争锁
 由于篇幅省略了部分私有方法,不用纠结,结尾会提供完整源码
 ---getKeyName是获取自定义的作用于参数上的一个注解,可以将传入的参数作为 lockname的一部分,增强锁的力度。
 ---getName获取完整路径的方法名。
```java
/**
 * 描述:添加了 LzxLockDistributed 注解 的Aop
 *
 * @Auther: gc
 * @Date: 2019/6/18 10:56
 */
@Component
@Aspect
@Slf4j
public class LockAspect {
 /**
 * 锁前缀名
 */
 public static final String LOCK_NAME_PREFIX = "lock";
 /**
 * 分隔符
 */
 public static final String LOCK_NAME_SEPARATOR = ".";
 /**
 * 表达式解析器
 */
 private ExpressionParser parser = new SpelExpressionParser();
 /**
 * 参数解析
 */
 private ParameterNameDiscoverer nameDiscoverer = new DefaultParameterNameDiscoverer();
 @Autowired
 private RedisLockRegistry redisLockRegistry;
 /* @Autowired
 private RedisConnectionFactory redisConnectionFactory;*/
 @Around(value = "@annotation(lockDistributed)")
 public Object aroundApi(ProceedingJoinPoint point, LockDistributed lockDistributed) throws Throwable {
 MethodSignature signature = (MethodSignature) point.getSignature();
 String businessKeyName = getKeyName(point,lockDistributed);
 String lockName = LOCK_NAME_PREFIX+LOCK_NAME_SEPARATOR + getName(lockDistributed.name(), signature) + businessKeyName;
log.info("lockName===>"+lockName);
 // RedisLockRegistry redisLockRegistry=new RedisLockRegistry(redisConnectionFactory,"spring-integration-redis",lockDistributed.leaseTime());
 Lock lock = redisLockRegistry.obtain(lockName);
 boolean currentThreadLock = false;
 Object proceed = null;
 try{
 currentThreadLock = lock.tryLock(lockDistributed.time(), TimeUnit.SECONDS);
Home("获取锁====="+currentThreadLock);
 if (!currentThreadLock) {
 throw new TimeoutException("获取锁资源等待超时");
 }
 proceed = point.proceed();
 }catch (Exception e){
 throw e;
 }finally {
 if(currentThreadLock){
 lock.unlock();
 }
 }
 return proceed;
 }
}
```
5. 测试锁
```java
@RestController
public class DistributedLock {
 @LockDistributed
 @RequestMapping("/redisLockTest")
 public void redisLockTest() {
 System.out.println("获取锁,执行");
 try {
 Thread.sleep(20*1000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 System.out.println("结束-------------------->>>>>>>>>>>>>>");
 }
}
```
**第二种方案**:提供多种锁实现
 1. 同理导入依赖
 2. 配置yml
 3. 创建config
```java
@Configuration
@ConditionalOnClass(Config.class)
@EnableConfigurationProperties(RedissonProperties.class)
public class RedissonAutoConfiguration {
 @Autowired
 private RedissonProperties redssionProperties;
 /**
 * 单体的
 * @return
 */
 @Bean
 @ConditionalOnProperty(value = "redisson.address")
 public RedissonClient redissonSingle(){
 Config config = new Config();
 SingleServerConfig serverConfig = config.useSingleServer().setAddress(redssionProperties.getAddress())
 .setTimeout(redssionProperties.getTimeout())
 .setDatabase(redssionProperties.getDatabase())
 .setConnectionPoolSize(redssionProperties.getConnectionPoolSize())
 .setConnectionMinimumIdleSize(redssionProperties.getConnectionMinimumIdleSize());
 if(StringUtils.isNotEmpty(redssionProperties.getPassword())){
 serverConfig.setPassword(redssionProperties.getPassword());
 }
 return Redisson.create(config);
 }
 /**
 * 集群的
 * @return
 */
 @Bean
 @ConditionalOnProperty(value = "redisson.masterAddresses")
 public RedissonClient redissonSentinel(){
 Config config = new Config();
 ClusterServersConfig serverConfig = config.useClusterServers().addNodeAddress(redssionProperties.getSentinelAddresses())
 .setTimeout(redssionProperties.getTimeout())
 //设置集群扫描时间
 .setScanInterval(redssionProperties.getScanInterval())
 //主节点线程池数量
 .setMasterConnectionPoolSize(redssionProperties.getMasterConnectionPoolSize())
 //从节点线程池数量
 .setSlaveConnectionPoolSize(redssionProperties.getSlaveConnectionPoolSize());
 if(StringUtils.isNotEmpty(redssionProperties.getPassword())){
 serverConfig.setPassword(redssionProperties.getPassword());
 }
 return Redisson.create(config);
 }
```
4. 自定义注解,属性多了锁类型
```java
/**
 * 描述: 基于redisson实现分布式锁
 * 支持多种锁类型
 * @Auther: gc.x
 * @Ddate:2019/10/28
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LockDistributed2 {
 /**
 * 锁的名称
 * @return
 */
 String name() default "";
 /**
 * 锁类型,默认可重入锁
 * @return
 */
 LockType lockType() default LockType.Reentrant;
 /**
 * 尝试加锁,最多等待时间
 * @return
 */
 long waitTime() default 60;
 /**
 *上锁以后xxx秒自动解锁
 * @return
 */
 long leaseTime() default 60*5;
 /**
 * 自定义业务key
 * @return
 */
 String [] keys() default {};
}
```
5. 切面,和方案一类似,主要根据注解上的锁类型获取对应的redisson提供的锁。
该实现内容较多不贴出来了,结构如下
![在这里插入图片描述]()
```java
/**
 * @Auther: gc.x
 * @Date: 2019/10/28
 * @Description:
 */
@Aspect
@Component
@Slf4j
public class LockAspect2 {
 /**
 * 锁前缀名
 */
 public static final String LOCK_NAME_PREFIX = "lock";
 /**
 * 分隔符
 */
 public static final String LOCK_NAME_SEPARATOR = ".";
 /**
 * 表达式解析器
 */
 private ExpressionParser parser = new SpelExpressionParser();
 /**
 * 参数解析
 */
 private ParameterNameDiscoverer nameDiscoverer = new DefaultParameterNameDiscoverer();
 @Autowired
 private LockFactory lockFactory;
 @Around(value = "@annotation(lockDistributed2)")
 public Object around(ProceedingJoinPoint joinPoint, LockDistributed2 lockDistributed2) throws Throwable{
 // 获取方法
 MethodSignature signature = (MethodSignature) joinPoint.getSignature();
 // 获取锁类型
 LockType type= lockDistributed2.lockType();
 String businessKeyName = getKeyName(joinPoint,lockDistributed2);
 String lockName = LOCK_NAME_PREFIX+LOCK_NAME_SEPARATOR + getName(lockDistributed2.name(), signature) + businessKeyName;
Home("lockName===>"+lockName);
 long waitTime = lockDistributed2.waitTime();
 long leaseTime =lockDistributed2.leaseTime();
 LockInfo lockInfo = new LockInfo(type,lockName,waitTime,leaseTime);
 ILock ilock = lockFactory.getLock(lockInfo);
 boolean currentThreadLock = false;
 try {
 currentThreadLock = ilock.acquire();
 if (!currentThreadLock) {
 throw new TimeoutException("获取锁资源等待超时");
 }
 return joinPoint.proceed();
 } finally {
 if (currentThreadLock) {
 ilock.release();
 }
 }
 }
```

 


https://www.xamrdz.com/backend/3f71964174.html

相关文章: