跳至主要內容

XXL-JOB任务执行流程分析

向往大约 5 分钟《XXL-JOB》源码分析

概览

一次任务调度流程的过程如下图所示:

image-20221121142847580

源码分析

调度中心流程

任务的调度执行是从调用com.xxl.job.admin.core.thread.JobTriggerPoolHelper#trigger接口开始,该方法如下所示:

public void addTrigger(final int jobId,
                       final TriggerTypeEnum triggerType,
                       final int failRetryCount,
                       final String executorShardingParam,
                       final String executorParam,
                       final String addressList) {

    // choose thread pool
    // 首先选择线程池,一共有两个线程池,一个用户处理耗时较短的任务,一个用户处理耗时较长的任务。
    ThreadPoolExecutor triggerPool_ = fastTriggerPool;
    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
    if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
        triggerPool_ = slowTriggerPool;
    }

    // trigger
    triggerPool_.execute(new Runnable() {
        @Override
        public void run() {

            long start = System.currentTimeMillis();

            try {
                // do trigger
                XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {

                // check timeout-count-map
                long minTim_now = System.currentTimeMillis()/60000;
                if (minTim != minTim_now) {
                    minTim = minTim_now;
                    jobTimeoutCountMap.clear();
                }

                // incr timeout-count-map
                long cost = System.currentTimeMillis()-start;
                if (cost > 500) {       // ob-timeout threshold 500ms
                    AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                    if (timeoutCount != null) {
                        timeoutCount.incrementAndGet();
                    }
                }

            }

        }
    });
}

大致流程如下:

  • 选择线程池,默认有两个线程池,一个用于执行耗时较短的任务,一个用户执行耗时较长的任务,当某任务一分钟内执行超时超过10次时(超时时间默认为500ms),该任务将会被标记为耗时较长。

  • 在线程池执行调度逻辑,com.xxl.job.admin.core.trigger.XxlJobTrigger#processTrigger核心代码如下,大概流程

    • 保存执行日志ID
    • 初始化触发参数
    • 初始化执行器地址
    • HTTP调用执行器服务端
    • 收集采集信息,保存执行结果
     private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
    
            // param
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
            ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
            String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
    
            // 1、save log-id
            //保存执行日志
            XxlJobLog jobLog = new XxlJobLog();
            jobLog.setJobGroup(jobInfo.getJobGroup());
            jobLog.setJobId(jobInfo.getId());
            jobLog.setTriggerTime(new Date());
            XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
            logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
    
            // 2、init trigger-param
            //初始化触发参数
            TriggerParam triggerParam = new TriggerParam();
            triggerParam.setJobId(jobInfo.getId());
            triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
            triggerParam.setExecutorParams(jobInfo.getExecutorParam());
            triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
            triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
            triggerParam.setLogId(jobLog.getId());
            triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
            triggerParam.setGlueType(jobInfo.getGlueType());
            triggerParam.setGlueSource(jobInfo.getGlueSource());
            triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
            triggerParam.setBroadcastIndex(index);
            triggerParam.setBroadcastTotal(total);
    
            // 3、init address
            // 初始化地址
            String address = null;
            ReturnT<String> routeAddressResult = null;
            if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
                if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
                    if (index < group.getRegistryList().size()) {
                        address = group.getRegistryList().get(index);
                    } else {
                        address = group.getRegistryList().get(0);
                    }
                } else {
                    routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
                    if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                        address = routeAddressResult.getContent();
                    }
                }
            } else {
                routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
            }
    
            // 4、trigger remote executor
            // 远程调用,HTTP调用
            ReturnT<String> triggerResult = null;
            if (address != null) {
                triggerResult = runExecutor(triggerParam, address);
            } else {
                triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
            }
    
            // 5、collection trigger info
            // 收集采集信息
            StringBuffer triggerMsgSb = new StringBuffer();
            triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
                    .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
            if (shardingParam != null) {
                triggerMsgSb.append("("+shardingParam+")");
            }
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
    
            triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
                    .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");
    
            // 6、save log trigger-info
            // 保存日志触发结果
            jobLog.setExecutorAddress(address);
            jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
            jobLog.setExecutorParam(jobInfo.getExecutorParam());
            jobLog.setExecutorShardingParam(shardingParam);
            jobLog.setExecutorFailRetryCount(finalFailRetryCount);
            //jobLog.setTriggerTime();
            jobLog.setTriggerCode(triggerResult.getCode());
            jobLog.setTriggerMsg(triggerMsgSb.toString());
            XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
    
            logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
        }
    

执行器流程

执行器内置了一个Netty http服务端用于接收调度中心的请求,处理的核心逻辑在com.xxl.job.core.server.EmbedServer.EmbedHttpServerHandler#channelRead0,通过线程池处理调度逻辑,代码如下:

@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
    // request parse
    //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());    // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
    String requestData = msg.content().toString(CharsetUtil.UTF_8);
    String uri = msg.uri();
    HttpMethod httpMethod = msg.method();
    boolean keepAlive = HttpUtil.isKeepAlive(msg);
    String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);

    // invoke
    bizThreadPool.execute(new Runnable() {
        @Override
        public void run() {
            // do invoke
            Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);

            // to json
            String responseJson = GsonTool.toJson(responseObj);

            // write response
            writeResponse(ctx, keepAlive, responseJson);
        }
    });
}

process方法底层会调用com.xxl.job.core.biz.impl.ExecutorBizImpl#run方法将任务执行请求添加到JobThread的队列当中,JobThread是对任务执行的进一步封装,每一个JobThread有一个异步阻塞队列,正常情况下,一个任务对应一个JobThread对象,执行任务请求会放入到阻塞队列当中,JobThread会轮询获取阻塞队列的请求并处理。类属性和结构如下:

image-20221121152841873image-20221121153607193

执行方法源代码如下:

    @Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        // load old:jobHandler + jobThread
        JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
        IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
        String removeOldReason = null;

        // valid:jobHandler + jobThread
        GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
        if (GlueTypeEnum.BEAN == glueTypeEnum) {

            // new jobhandler
            IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

            // valid old jobThread
            if (jobThread!=null && jobHandler != newJobHandler) {
                // change handler, need kill old thread
                removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                jobHandler = newJobHandler;
                if (jobHandler == null) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
                }
            }

        } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

            // valid old jobThread
            if (jobThread != null &&
                    !(jobThread.getHandler() instanceof GlueJobHandler
                        && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change handler or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                try {
                    IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                    jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
                }
            }
        } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {

            // valid old jobThread
            if (jobThread != null &&
                    !(jobThread.getHandler() instanceof ScriptJobHandler
                            && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change script or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
            }
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
        }

        // executor block strategy
        //阻塞策略
        if (jobThread != null) {
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
          //如果是丢弃后续调度  
          if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                // discard when running
                if (jobThread.isRunningOrHasQueue()) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                }
            } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                // kill running jobThread
               //如果是覆盖之前调度,将jobThread置空
                if (jobThread.isRunningOrHasQueue()) {
                    removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                    jobThread = null;
                }
            } else {
                // just queue trigger
            }
        }

        // replace thread (new or exists invalid)
        if (jobThread == null) {
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        }

        // push data to queue
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
        return pushResult;
    }

任务与JobThread对应的流程图如下,以任务1执行为例,任务1对应一个JobThread,先从缓存判断对应任务的JobThread是否存在,如果不存在,新建一个,判断调度阻塞的策略,如果是覆盖之前的调度,将对应的JobThread停掉并开始一个新的JobThread并放入缓存。JobThread会循环获取任务请求队列的数据并执行,将执行结果放入结果回调队列。为了避免JobThread线程空转,当队列不存在数据并且循环次数超过30次后,线程将关闭,减少CPU使用。

image-20221121163216231

总结

为了提高吞吐量和性能,XXL-JOB使用了线程池、队列、NIO(Netty)、批量处理等技术,为了避免调度任务执行时间不一致可能会引起的全局阻塞,将调度任务线程池分成了处理耗时较长任务的线程池和处理耗时较短任务的线程池分开处理,从线程池角度对任务进行隔离。