前言
在使用xxl-job的过程中,需要给每个执行器额外配置一个端
口(默认9999),这导致服务除了web服务端口,
还要额外多占用一个端口,多少有些不爽,有没有可能xxl直接
复用spring-boot所占用的端口呐?
EmbedServer
要想知道是否可行,首先得清楚为什么xxl-job要独占一个端口
实际上,每个要执行定时任务得微服务都是xxl-job的一个执行器,
执行器要与调度中心进行通讯:接受调度指令/上传日志文件/心跳
等,因此在xxl-job-core包中,会在初始化时启动一个EmbedServer
其内部开启一个socket负责与调度中心通讯(主要是接受调度中心的指令),使用的网络框架是netty
于是我们的服务往往呈现如下场景
那么问题来了,调度中心与执行器通讯使用的什么协议呐?看一下netty的handler
就可得出结论,我们最熟悉的:HTTP
思路
既然调度中心的调度指令是通过http协议传输过来的,从理论来讲完全可以让调度中心
的请求发送到spring-boot的端口上,接受请求后按原来的执行逻辑执行对应的代码即可,
这样EmbedServer
就可以删除,netty
可以不用,最重要的是服务不会额外占用端口了
实现
spring接口
贴一下执行器接受请求处理的核心代码(EmbedHttpServerHandler中):
switch(uri){
case"/beat": // 心跳
return executorBiz.beat();
case"/idleBeat": // 空闲心跳
IdleBeatParam idleBeatParam=GsonTool.fromJson(requestData,IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
case"/run": // 执行任务
TriggerParam triggerParam=GsonTool.fromJson(requestData,TriggerParam.class);
return executorBiz.run(triggerParam);
case"/kill": // 终止任务
KillParam killParam=GsonTool.fromJson(requestData,KillParam.class);
return executorBiz.kill(killParam);
case"/log": // 获取日志
LogParam logParam=GsonTool.fromJson(requestData,LogParam.class);
return executorBiz.log(logParam);
default:
return new ReturnT<String>(ReturnT.FAIL_CODE,"invalid request, uri-mapping("+uri+") not found.");
}
其实就是根据不同的接口uri做对应的处理,一共五个接口,spring实现这5个接口再简单不过了,
直接用@RequestMapping
就可以了,但我采用的方式是使用spring的动态注册接口工具RequestMappingHandlerMapping
代码如下:
@Component
@Slf4j
public class JobServer {
/**
* 定义一个请求的前缀
*/
@Value("${job.executor.pre}")
private String pre;
private ExecutorBiz executorBiz;
@Autowired
private RequestMappingHandlerMapping requestMappingHandlerMapping;
@PostConstruct
public void init() throws NoSuchMethodException {
// 初始化执行器
this.executorBiz = new ExecutorBizImpl();
// 处理器
final RequestHandler handler = new RequestHandler(this.executorBiz);
// 回调处理方法
final Method method =
RequestHandler.class.getDeclaredMethod("invoke", HttpServletRequest.class, String.class);
// 注册路由和回调方法
this.requestMappingHandlerMapping.registerMapping(
RequestMappingInfo
.paths(this.pre + "/beat", this.pre + "/idleBeat", this.pre + "/run",
this.pre + "/kill", this.pre + "/log")
.methods(RequestMethod.POST).build(),
handler,
method);
}
@AllArgsConstructor
private class RequestHandler {
private ExecutorBiz executorBiz;
/**
* 客户端接受中心调度请求处理
*/
@ResponseBody
public Object invoke(final HttpServletRequest request, @RequestBody final String body) throws Throwable {
String uri = request.getRequestURI();
uri = uri.replace(JobServer.this.pre, "");
final String requestData = body;
// services mapping
try {
switch (uri) {
case "/beat":
return this.executorBiz.beat();
case "/idleBeat":
final IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return this.executorBiz.idleBeat(idleBeatParam);
case "/run":
final TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return this.executorBiz.run(triggerParam);
case "/kill":
final KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return this.executorBiz.kill(killParam);
case "/log":
final LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return this.executorBiz.log(logParam);
default:
return new ReturnT<String>(ReturnT.FAIL_CODE,
"invalid request, uri-mapping(" + uri + ") not found.");
}
} catch (final Exception e) {
JobServer.log.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
}
}
}
}
此时spring就拥有了与原netty一样功能的5个接口,我还加了一个前缀,毕竟例如"/run"的接口地址太宽泛
注册地址
有了五个接口,下一步就是让调度中心发出指令时走这五个接口即可,如何实现呐?
调度中心中的注册地址是自动注册的,就是执行器的ip+port,调度中心发送指令其实就是通过httpClient调用这个
地址再加上五个接口的uri,所以只要执行器注册时候注册新的地址(spring的端口),事情就完美解决了
xxl-job-core中启动netty服务成功时才会去调度中心注册地址:
由于现在不需要netty了,所以这段要删掉,但要保留注册逻辑,并注册我们的新地址,所以不可避免的要
修改xxl-job-core的代码
修改XxlJobExecutor
的start
方法
public void start()throws Exception{
// init logpath
JobFileAppender.initLogPath(this.logPath);
// init invoker, admin-client
this.initAdminBizList(this.adminAddresses,this.accessToken);
// init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(this.logRetentionDays);
// init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();
/** 这之下原来的代码是initEmbedServer(address, ip, port, appname, accessToken),现在直接改为注册 **/
// get ip
String ip=(this.ip!=null&&this.ip.trim().length()>0)?this.ip:IpUtil.getIp();
// generate address,这里的port就是spring的port,并加入前缀
String address=this.address;
if(this.address==null||this.address.trim().length()==0){
String ip_port_address=IpUtil.getIpPort(ip,this.port); // registry-address:default use address to registry , otherwise use ip:port if address is null
address="http://{ip_port}".replace("{ip_port}",ip_port_address);
}
// start registry,开始注册
ExecutorRegistryThread.getInstance().start(this.appname,address+this.pre);
}
解除注册
原XxlJobExecutor
的destroy
方法,负责在执行器关闭时关闭EmbedServer,由于现在EmbedServer
已删除,所以只保留解除注册和之后的逻辑即可
public void destroy(){
// stop registry 原stopEmbedServer()
ExecutorRegistryThread.getInstance().toStop();
// 其余保留
总结
到此就实现了xxl-job走spring的接口,不额外占用端口,好处显而易见,但也有一点坏处:导致定时任务调度
共用了处理web请求的线程池,自行评估即可