一、背景
在项目中有一个需求是:将某某信息在多少秒后发布,这个需求的解决方案很多,简单点可以定义单线程去执行或者采用@Scheduled,分布式的情况也可以采用乐观锁等。在闲暇之余,想起以前看到的基于原有的中间件Redis也是可以的。
二、关于Redisson
Redisson是架设在Redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。【Redis官方推荐】
Redisson在基于NIO的Netty框架上,充分的利用了Redis键值数据库提供的一系列优势,在Java实用工具包中常用接口的基础上,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。
三、Redisson的延迟队列优缺点
对比 | Redisson | springboot(Scheduled)| 单线程+乐观锁+分布式锁 | |
---|---|---|---|
分布式 | 支持 | 支持,实现比较Redisson麻烦 | |
实时性 | 支持,支持自定义时间 | 支持,支持自定义时间 | |
持久性 | 基于redis的持久性配置,如果宕机之前已经持久化,重启后会继续执行,否则自己看着办 | 可以逻辑处理 | |
性能表现 | 基于redis,采用时间轮 | 看具体情况 | |
复杂性 | 简单 | 看具体情况 | |
依赖性 | 基于Redis | 不依赖其它中间件,纯代码 |
四、集成Redisson
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>版本号</version>
</dependency>
五,配置(基于springboot)
1、application.yml(就是一般的配置)
server:
port: 8080
spring:
redis:
database: 0
host: 127.0.0.1
port: 6379
password:
2、Redisson的Config
@Configuration
@EnableAutoConfiguration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://" + host + ":" + port)
.setTcpNoDelay(true)
.setConnectTimeout(30000)
.setSubscriptionsPerConnection(5)
.setKeepAlive(true)
.setSubscriptionConnectionPoolSize(50)
.setPingConnectionInterval(60000);
// 添加主从配置
// config.useMasterSlaveServers().setMasterAddress("").setPassword("").addSlaveAddress(new String[]{"",""});
return Redisson.create(config);
}
}
3、主要的代码部分
@Slf4j
@Component
public class JobTimer {
public static final String jobsTag = "delay_job";
private RedissonClient client;
private ApplicationContext context;
private static ExecutorService executorService;
@Autowired
public JobTimer(RedissonClient client, ApplicationContext context) {
this.client = client;
this.context = context;
}
@PostConstruct
public void startJobTimer() {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNamePrefix("delay-job-service").build();
executorService = new ThreadPoolExecutor(1, 10, 30,
TimeUnit.MINUTES, new LinkedBlockingQueue<>(10), namedThreadFactory);
executorService.execute(new ExecutorTask());
}
class ExecutorTask implements Runnable {
@SneakyThrows
@Override
public void run() {
RBlockingQueue blockingQueue = client.getBlockingQueue(jobsTag);
while (true) {
try {
DelayJobEntity job = (DelayJobEntity) blockingQueue.take();
// 执行逻辑
ExecuteJob service = (ExecuteJob) context.getBean(job.getAClass());
service.execute(job);
} catch (Exception e) {
log.error(e.getMessage());
}
// 防止疯狂打印日志
TimeUnit.SECONDS.sleep(10);
}
}
}
}
四、测试结果
五、项目demo
redission.zip