前 言
本篇我们主要讲一下《xxl-job的调度流程》,在讲调度流程前,我们先概述一下:客户端接入流程、服务端配置流程和路由策略参数详解。
一、客户端接入流程
1 添加Maven依赖
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>${选择合适的版本}</version>
</dependency>
2 添加xxl-job的配置
在application.yml中添加xxl-job的配置
基础参数:
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
xxl.job.accessToken=default_token
xxl.job.executor.appname=xxl-job-executor-sample
xxl.job.executor.address=
xxl.job.executor.ip=
xxl.job.executor.port=9999
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
xxl.job.executor.logretentiondays=30
拓展参数,非必填
xxl.job.i18n=zh_CN
xxl.job.triggerpool.fast.max=200
xxl.job.triggerpool.slow.max=100
3 代码创建执行函数
任务开发:在Spring Bean实例中,开发Job方法;
注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。
执行日志:需要通过 "XxlJobHelper.log" 打印执行日志;
-
任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过:
"XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果;
@XxlJob(“demoJobHandler”)
public void demoJobHandler() throws Exception {
XxlJobHelper.log(“XXL-JOB, Hello World.”);
for (int i = 0; i < 5; i++) {
XxlJobHelper.log(“beat at:” + i);
TimeUnit.SECONDS.sleep(2);
}
// default success
}
4 客户端配置参数说明:
二、服务端配置流程
1 执行器管理
AppName:执行组Name,Name相同的执行器视为同一个执行组
名称:执行组中文名。
注册方式:
自动注册:IP地址由执行器上报,通常这样使用;
手动注册:手动输入执行器地址IP,不建议使用。
2 任务管理
基础配置:
执行器:任务的绑定的执行器,任务触发调度时将会自动发现注册成功的执行器, 实现任务自动发现功能; 另一方面也可以方便的进行任务分组。每个任务必须绑定一个执行器, 可在 "执行器管理" 进行设置;
任务描述:任务的描述信息,便于任务管理;
负责人:任务的负责人;
报警邮件:任务调度失败时邮件通知的邮箱地址,支持配置多邮箱地址,配置多个邮箱地址时用逗号分隔。
触发配置:
CRON:触发任务执行的Cron表达式;
固定速度:固件速度的时间间隔,单位为秒;
固定延迟:固件延迟的时间间隔,单位为秒。
任务配置:
- 运行模式:
(1)BEAN模式:任务以JobHandler方式维护在执行器端;需要结合 "JobHandler" 属性匹配执行器中任务;
(2)GLUE模式(Java):任务以源码方式维护在调度中心;该模式的任务实际上是一段继承自IJobHandler的Java类代码并 "groovy" 源码方式维护,它在执行器项目中运行,可使用@Resource/@Autowire注入执行器里中的其他服务;
(3)GLUE模式(Shell):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "shell" 脚本;
(4)GLUE模式(Python):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "python" 脚本;
(5)GLUE模式(PHP):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "php" 脚本;
(6)GLUE模式(NodeJS):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "nodejs" 脚本;
(7)GLUE模式(PowerShell):任务以源码方式维护在调度中心;该模式的任务实际上是一段 "PowerShell" 脚本;
JobHandler:运行模式为 "BEAN模式" 时生效,对应执行器中新开发的JobHandler类“@JobHandler”注解自定义的value值;
执行参数:任务执行所需的参数;
高级配置
- 路由策略:当执行器集群部署时,提供丰富的路由策略,包括;
(1)FIRST(第一个):固定选择第一个机器;
(2)LAST(最后一个):固定选择最后一个机器;
(3)ROUND(轮询):;
(4)RANDOM(随机):随机选择在线的机器;
(5)CONSISTENT\_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
(6)LEAST\_FREQUENTLY\_USED(最不经常使用):使用频率最低的机器优先被选举;
(7)LEAST\_RECENTLY\_USED(最近最久未使用):最久未使用的机器优先被选举;
(8)FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
(9)BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
(10)SHARDING\_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度。
调度过期策略:
(1)忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
(2)立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
- 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
(1)单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
(2)丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
(3)覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务;
失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;
三、路由策略参数详解
上面我们讲解了客户端接入流程、服务端配置流程和路由策略参数详解,接下来我们讲一下《xxj-job服务端架构流程》
四、xxl-job的调度流程
任务调度器和执行器使用http协议通信,各自有轮询线程处理不同业务。
五、xxl-job的调度中心详解
1 XXL-JOB的启动和销毁逻辑:
如代码可见,xxl-job调度中心的启动和销毁,核心是处理几个线程池的创建和销毁。对每一个业务线程池,后续有详细讲解。
public class XxlJobScheduler {
private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);
public void init() throws Exception {
// init i18n
initI18n();
// admin trigger pool start
JobTriggerPoolHelper.toStart();
// admin registry monitor run
JobRegistryHelper.getInstance().start();
// admin fail-monitor run
JobFailMonitorHelper.getInstance().start();
// admin lose-monitor run ( depend on JobTriggerPoolHelper )
JobCompleteHelper.getInstance().start();
// admin log report start
JobLogReportHelper.getInstance().start();
// start-schedule ( depend on JobTriggerPoolHelper )
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
public void destroy() throws Exception {
// stop-schedule
JobScheduleHelper.getInstance().toStop();
// admin log report stop
JobLogReportHelper.getInstance().toStop();
// admin lose-monitor stop
JobCompleteHelper.getInstance().toStop();
// admin fail-monitor stop
JobFailMonitorHelper.getInstance().toStop();
// admin registry stop
JobRegistryHelper.getInstance().toStop();
// admin trigger pool stop
JobTriggerPoolHelper.toStop();
}
}
2 任务触发线程池
任务触发线程池:JobTriggerPoolHelper.toStart();
启动两个执行任务的线程池,通常任务在fastTriggerPool,统计一分钟内超时10次的任务,对超时任务再执行放进slowTriggerPool。
// job-timeout 10 times in 1 min
public class JobTriggerPoolHelper {
private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);
// ---------------------- trigger pool ---------------------
// fast/slow thread pool
private ThreadPoolExecutor fastTriggerPool = null;
private ThreadPoolExecutor slowTriggerPool = null;
public void start(){
fastTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
}
});
slowTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
}
});
}
}
3 执行器管理线程
执行器管理线程:
JobRegistryHelper.getInstance().start();
保证任务执行的时候拿到的执行器列表都是运行状态的
启动一个守护线程;
每隔三十秒,查询一次数据库 注册表 中自动注册的执行器;
删除超过90秒未再次注册(心跳)的执行器;
将执行器注册信息加载到内存Map中;
更新注册上了的执行器地址信息到 任务执行表 中。
public void start(){
// for monitor
registryMonitorThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
// auto registry group
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {
// remove dead address (admin/executor)
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// fresh online address (admin/executor)
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
for (XxlJobRegistry item: list) {
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
appAddressMap.put(appname, registryList);
}
}
}
// fresh group address
for (XxlJobGroup group: groupList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
}
});
registryMonitorThread.setDaemon(true);
registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
registryMonitorThread.start();
}
}
4 失败任务管理线程
失败任务管理线程:JobFailMonitorHelper.getInstance().start();
管理执行失败的任务,重试或者发送告警
每隔10s查询执行失败的任务;
如果设置重试次数,就进行重试操作;
如未设置重试次数,或已经重试超过重试次数,就发送告警信息(邮件或短信等)。
5 完成任务管理线程
完成任务管理线程:JobCompleteHelper.getInstance().start();
管理超时任务,或者执行器宕机的任务,做轮询补偿。
每隔1min查询查询状态未结束的任务;
如果距任务开始时间10min 并且 注册执行器不在线,那么就标记任务执行结束。
6 日志管理线程
日志管理线程:JobLogReportHelper.getInstance().start();
统计任务执行成功率,删除过期日志。
每隔 1min 执行一次;
按天统计总任务数,成功和失败的个数,可通过 xxl.job.logretentiondays 配置天数 默认30天。
7 任务执行调度线程
任务执行调度线程:
JobScheduleHelper.getInstance().start();
scheduleThread:任务查询并计算执行时间线程
每一秒 查询数据库中执行时间在 当前时间 至 (当前时间 + 5s)区间的任务;
根据CronHelp类计算出下次执行时间;
将任务的下次执行时间写入数据库;
加载此次执行任务Id到缓存中。
使用ConcurrentHashMap缓存,Key是分钟内的秒数(0-59),Value是任务Id组成的数组
{
"1":[
251,
172
],
"2":[
643,
172
],
"39":[
273
],
"59":[
188,
175
]
}
ringThread: 任务执行线程
每一秒轮询一次,查找当前秒的任务Id ;
根据任务Id,查出任务详情,并投递到发送线程池;
发送线程池查询到执行器地址列表,根据配置的发送策略,通过http请求发送到执行器。
发送策略:(对应页面的路由策略)
六、附录
一致性哈希算法详解
private static int VIRTUAL_NODE_NUM = 100;
public String hashJob(int jobId, List<String> addressList) {
// ------A1------A2-------A3------
// -----------J1------------------
// Address的hashCode为Key,address本身为Value;
TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
for (String address: addressList) {
// 对Address进行 100 次取模,每次对Key+1,
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
long addressHash = hash("SHARD-" + address + "-NODE-" + i);
addressRing.put(addressHash, address);
}
}
// 对任务Id取模,以Hash树最近的Address作为选定的
long jobHash = hash(String.valueOf(jobId));
SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
return addressRing.firstEntry().getValue();
}