>>>>>>>>>> xxl-job config init."); XxlJobSpri...">执行器启动流程分析 | 向往@>>>>>>>>>> xxl-job config init."); XxlJobSpri...">
跳至主要內容

执行器启动流程分析

向往大约 4 分钟《XXL-JOB》源码分析

初始化流程

声明XxlJobSpringExecutorbean后,执行器会自动开始进行初始化逻辑,声明代码如下:

@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
    logger.info(">>>>>>>>>>> xxl-job config init.");
    XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
    xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
    xxlJobSpringExecutor.setAppname(appname);
    xxlJobSpringExecutor.setAddress(address);
    xxlJobSpringExecutor.setIp(ip);
    xxlJobSpringExecutor.setPort(port);
    xxlJobSpringExecutor.setAccessToken(accessToken);
    xxlJobSpringExecutor.setLogPath(logPath);
    xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

    return xxlJobSpringExecutor;
}

XxlJobSpringExecutor 交给Spring预初始化后会自动执行执行器初始化操作,这是怎么实现的呢?查看XxlJobSpringExecutor相关逻辑,代码如下:

image-20221117173553470

执行器实现了 ApplicationContextAware, SmartInitializingSingleton, DisposableBean三个接口,作用分别如下:

  • ApplicationContextAware可以获取到ApplicationContext
  • org.springframework.beans.factory.DisposableBean#destroy,该方法在bean销毁后调用。
  • org.springframework.beans.factory.SmartInitializingSingleton#afterSingletonsInstantiated方法在单例bean预加载后被调用,XxlJobSpringExecutor实现了该方法,初始化的核心流程就通过该方法实现。

核心流程如下:

@Override
    public void afterSingletonsInstantiated() {

        // init JobHandler Repository
        /*initJobHandlerRepository(applicationContext);*/

        // init JobHandler Repository (for method)
        initJobHandlerMethodRepository(applicationContext);

        // refresh GlueFactory
        GlueFactory.refreshInstance(1);

        // super start
        try {
            super.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

大致分为三个步骤:

  • 初始化JobHandler
  • 刷新工厂类
  • 调用父类的启动方法

initJobHandlerMethodRepository方法如下:

private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
    if (applicationContext == null) {
        return;
    }
    // init job handler from method
    //从容器获取bean数组
    String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
    for (String beanDefinitionName : beanDefinitionNames) {

        // get bean
        Object bean = null;
        // 忽略懒加载bean
        Lazy onBean = applicationContext.findAnnotationOnBean(beanDefinitionName, Lazy.class);
        if (onBean!=null){
            logger.debug("xxl-job annotation scan, skip @Lazy Bean:{}", beanDefinitionName);
            continue;
        }else {
            bean = applicationContext.getBean(beanDefinitionName);
        }

        // filter method
        // 获取@XxlJob注解的方法
        Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
        try {
            annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                    new MethodIntrospector.MetadataLookup<XxlJob>() {
                        @Override
                        public XxlJob inspect(Method method) {
                            return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                        }
                    });
        } catch (Throwable ex) {
            logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
        }
        if (annotatedMethods==null || annotatedMethods.isEmpty()) {
            continue;
        }
        //初始化
        // generate and regist method job handler
        for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
            Method executeMethod = methodXxlJobEntry.getKey();
            XxlJob xxlJob = methodXxlJobEntry.getValue();
            // regist
            //注册
            registJobHandler(xxlJob, bean, executeMethod);
        }

    }
}

registJobHandler首先做了一系列校验和准备工作,包括:

  • 校验名称是否为空
  • 校验是否存在同名job
  • 将目标执行方法设置成可访问
  • 注册jobHandler

使用ConcurrentMap存储jobHandler。

 private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
  
 public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
        logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
        return jobHandlerRepository.put(name, jobHandler);
    }

IJobHandler接口提供了init、execute、destory方法,并提供了几种实现,类结构如下图所示:

image-20221117180325512

  • ScriptJobHandler :任务使用GLUE(Shell)、GLUE(Python)、GLUE(PHP)、GLUE(Nodejs)、GLUE(PowerShell)运行模式
  • GlueJobHandler:任务使用GLUE(Java)运行模式
  • MethodJobHandler: 任务使用Bean运行模式

至此initJobHandlerMethodRepository(applicationContext)逻辑大致分析到这里,GlueFactory.refreshInstance(1)提供了对象相关字段自动补齐,不再具体描述。着重关心start()里面的逻辑,代码如下:

public void start() throws Exception {

    // init logpath
    //初始化日志路径
    XxlJobFileAppender.initLogPath(logPath);

    // init invoker, admin-client
    //初始化admin-client
    initAdminBizList(adminAddresses, accessToken);


    // init JobLogFileCleanThread
    //启动日志文件清理线程
    JobLogFileCleanThread.getInstance().start(logRetentionDays);

    // init TriggerCallbackThread
    //启动触发回调线程
    TriggerCallbackThread.getInstance().start();

    // init executor-server
    //启动自研rpc
    initEmbedServer(address, ip, port, appname, accessToken);
}

重点关注下initEmbedServer方法,核心逻辑是初始化内置的RPC服务:

embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);

依赖Netty实现自研RPC服务,核心代码如下

image-20221118164450768

EmbedHttpServerHandler继承了SimpleChannelInboundHandler,会对消息进行解析,并通过线程池异步处理后将结果返回。

protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
    // request parse
    //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());    // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
    String requestData = msg.content().toString(CharsetUtil.UTF_8);
    String uri = msg.uri();
    HttpMethod httpMethod = msg.method();
    boolean keepAlive = HttpUtil.isKeepAlive(msg);
    String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);

    // invoke
    bizThreadPool.execute(new Runnable() {
        @Override
        public void run() {
            // do invoke
            Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);

            // to json
            String responseJson = GsonTool.toJson(responseObj);

            // write response
            writeResponse(ctx, keepAlive, responseJson);
        }
    });
}

注册逻辑:

image-20221118165246047

当xxl-job-admin部署了多个服务时,HTTP默认都只会请求第一个,除非请求第一个服务抛出异常 ,才会请求其他admin服务。

注册使用Http请求进行通讯

至此,执行器的初始化逻辑已经结束,总流程如下图所示:

image-20221118170910059

总结

  • SmartInitializingSingleton中只有一个接口afterSingletonsInstantiated(),其作用是是 在spring容器管理的所有单例对象(非懒加载对象)初始化完成之后调用的回调接口。
  • DisposableBean在bean被销毁后调用。
  • XXL-JOB任务调度底层基于Netty实现。