当前位置: 首页>后端>正文

xxl-job使用spring端口(不额外占用端口)

前言

在使用xxl-job的过程中,需要给每个执行器额外配置一个端
口(默认9999),这导致服务除了web服务端口,
还要额外多占用一个端口,多少有些不爽,有没有可能xxl直接
复用spring-boot所占用的端口呐?

EmbedServer

要想知道是否可行,首先得清楚为什么xxl-job要独占一个端口

实际上,每个要执行定时任务得微服务都是xxl-job的一个执行器,
执行器要与调度中心进行通讯:接受调度指令/上传日志文件/心跳
等,因此在xxl-job-core包中,会在初始化时启动一个EmbedServer

xxl-job使用spring端口(不额外占用端口),第1张
img.png

其内部开启一个socket负责与调度中心通讯(主要是接受调度中心的指令),使用的网络框架是netty


xxl-job使用spring端口(不额外占用端口),第2张
img_1.png

于是我们的服务往往呈现如下场景


xxl-job使用spring端口(不额外占用端口),第3张
img_2.png

那么问题来了,调度中心与执行器通讯使用的什么协议呐?看一下netty的handler
就可得出结论,我们最熟悉的:HTTP

xxl-job使用spring端口(不额外占用端口),第4张
img_3.png

思路

既然调度中心的调度指令是通过http协议传输过来的,从理论来讲完全可以让调度中心
的请求发送到spring-boot的端口上,接受请求后按原来的执行逻辑执行对应的代码即可,
这样EmbedServer就可以删除,netty可以不用,最重要的是服务不会额外占用端口了

实现

spring接口

贴一下执行器接受请求处理的核心代码(EmbedHttpServerHandler中):

switch(uri){
        case"/beat": // 心跳
        return executorBiz.beat();
        case"/idleBeat": // 空闲心跳
        IdleBeatParam idleBeatParam=GsonTool.fromJson(requestData,IdleBeatParam.class);
        return executorBiz.idleBeat(idleBeatParam);
        case"/run": // 执行任务
        TriggerParam triggerParam=GsonTool.fromJson(requestData,TriggerParam.class);
        return executorBiz.run(triggerParam);
        case"/kill": // 终止任务
        KillParam killParam=GsonTool.fromJson(requestData,KillParam.class);
        return executorBiz.kill(killParam);
        case"/log": // 获取日志
        LogParam logParam=GsonTool.fromJson(requestData,LogParam.class);
        return executorBiz.log(logParam);
default:
        return new ReturnT<String>(ReturnT.FAIL_CODE,"invalid request, uri-mapping("+uri+") not found.");
        }

其实就是根据不同的接口uri做对应的处理,一共五个接口,spring实现这5个接口再简单不过了,
直接用@RequestMapping就可以了,但我采用的方式是使用spring的动态注册接口工具RequestMappingHandlerMapping

代码如下:


@Component
@Slf4j
public class JobServer {

    /**
     * 定义一个请求的前缀
     */
    @Value("${job.executor.pre}")
    private String pre;

    private ExecutorBiz executorBiz;

    @Autowired
    private RequestMappingHandlerMapping requestMappingHandlerMapping;

    @PostConstruct
    public void init() throws NoSuchMethodException {
        // 初始化执行器
        this.executorBiz = new ExecutorBizImpl();
        // 处理器
        final RequestHandler handler = new RequestHandler(this.executorBiz);
        // 回调处理方法
        final Method method =
                RequestHandler.class.getDeclaredMethod("invoke", HttpServletRequest.class, String.class);
        // 注册路由和回调方法
        this.requestMappingHandlerMapping.registerMapping(
                RequestMappingInfo
                        .paths(this.pre + "/beat", this.pre + "/idleBeat", this.pre + "/run",
                                this.pre + "/kill", this.pre + "/log")
                        .methods(RequestMethod.POST).build(),
                handler,
                method);
    }

    @AllArgsConstructor
    private class RequestHandler {
        private ExecutorBiz executorBiz;

        /**
         * 客户端接受中心调度请求处理
         */
        @ResponseBody
        public Object invoke(final HttpServletRequest request, @RequestBody final String body) throws Throwable {
            String uri = request.getRequestURI();
            uri = uri.replace(JobServer.this.pre, "");
            final String requestData = body;

            // services mapping
            try {
                switch (uri) {
                    case "/beat":
                        return this.executorBiz.beat();
                    case "/idleBeat":
                        final IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                        return this.executorBiz.idleBeat(idleBeatParam);
                    case "/run":
                        final TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                        return this.executorBiz.run(triggerParam);
                    case "/kill":
                        final KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                        return this.executorBiz.kill(killParam);
                    case "/log":
                        final LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                        return this.executorBiz.log(logParam);
                    default:
                        return new ReturnT<String>(ReturnT.FAIL_CODE,
                                "invalid request, uri-mapping(" + uri + ") not found.");
                }
            } catch (final Exception e) {
                JobServer.log.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
            }
        }
    }
}

此时spring就拥有了与原netty一样功能的5个接口,我还加了一个前缀,毕竟例如"/run"的接口地址太宽泛

注册地址

有了五个接口,下一步就是让调度中心发出指令时走这五个接口即可,如何实现呐?

调度中心中的注册地址是自动注册的,就是执行器的ip+port,调度中心发送指令其实就是通过httpClient调用这个
地址再加上五个接口的uri,所以只要执行器注册时候注册新的地址(spring的端口),事情就完美解决了

xxl-job-core中启动netty服务成功时才会去调度中心注册地址:


xxl-job使用spring端口(不额外占用端口),第5张
img_4.png

由于现在不需要netty了,所以这段要删掉,但要保留注册逻辑,并注册我们的新地址,所以不可避免的要
修改xxl-job-core的代码

修改XxlJobExecutorstart方法

public void start()throws Exception{

        // init logpath
        JobFileAppender.initLogPath(this.logPath);

        // init invoker, admin-client
        this.initAdminBizList(this.adminAddresses,this.accessToken);

        // init JobLogFileCleanThread
        JobLogFileCleanThread.getInstance().start(this.logRetentionDays);

        // init TriggerCallbackThread
        TriggerCallbackThread.getInstance().start();

        /** 这之下原来的代码是initEmbedServer(address, ip, port, appname, accessToken),现在直接改为注册 **/
        // get ip
        String ip=(this.ip!=null&&this.ip.trim().length()>0)?this.ip:IpUtil.getIp();
        // generate address,这里的port就是spring的port,并加入前缀
        String address=this.address;
        if(this.address==null||this.address.trim().length()==0){
        String ip_port_address=IpUtil.getIpPort(ip,this.port); // registry-address:default use address to registry , otherwise use ip:port if address is null
        address="http://{ip_port}".replace("{ip_port}",ip_port_address);
        }
        // start registry,开始注册
        ExecutorRegistryThread.getInstance().start(this.appname,address+this.pre);
        }
解除注册

XxlJobExecutordestroy方法,负责在执行器关闭时关闭EmbedServer,由于现在EmbedServer
已删除,所以只保留解除注册和之后的逻辑即可

public void destroy(){
        // stop registry 原stopEmbedServer()
        ExecutorRegistryThread.getInstance().toStop();
// 其余保留

总结

到此就实现了xxl-job走spring的接口,不额外占用端口,好处显而易见,但也有一点坏处:导致定时任务调度
共用了处理web请求的线程池,自行评估即可


https://www.xamrdz.com/backend/34h1945752.html

相关文章: