跳至主要內容

XXL-JOB任务调度分析

向往大约 5 分钟源码分析《XXL-JOB》源码分析

概述

之前在使用XXL-JOB的时候一直存在两个疑问,第一个是XXL-JOB多节点部署情况下为什么可以保证任务调度的正确性?第二个是任务调度算法怎么保证任务精确调度?带着这两个问题去看下任务调度具体是怎么实现的。

源码分析

关键代码逻辑在 com.xxl.job.admin.core.thread.JobScheduleHelper#start 方法里面,由两个线程协作运行触发任务调度。scheduleThread每(5000 - System.currentTimeMillis()%1000)当前执行一次,这样做的目的是为了保证在整秒情况下执行下面的逻辑,减少误差。scheduleThread执行的逻辑如下,具体详情可参考源代码注释:

  • 关闭事务自动提交,通过select for update获取锁,锁获取成功执行下面逻辑,不然将被阻塞
  • 超时5秒未被执行的任务如果调度过期策略是立即执行一次,触发任务,否则直接丢弃。超时小于5秒,直接触发,将未来5秒后将执行的任务和未超时的任务放入时间轮。
// schedule thread
scheduleThread = new Thread(new Runnable() {
    @Override
    public void run() {

        try {
            //当前时间2022-10-05 14:23:50 400ms为例,sleep时间为4600ms,取整秒数
            TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
        } catch (InterruptedException e) {
            if (!scheduleThreadToStop) {
                logger.error(e.getMessage(), e);
            }
        }
        logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

        // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
        int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

        while (!scheduleThreadToStop) {

            // Scan Job
            long start = System.currentTimeMillis();

            Connection conn = null;
            Boolean connAutoCommit = null;
            PreparedStatement preparedStatement = null;

            boolean preReadSuc = true;
            try {

                conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                connAutoCommit = conn.getAutoCommit();
                // 关闭自动提交
                conn.setAutoCommit(false);

                // 获取锁,如果有其他进程先获取到查询的悲观锁,该命令会阻塞,将不会往下运行
                preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                preparedStatement.execute();

                // tx start

                // 1、pre read
                long nowTime = System.currentTimeMillis();
                List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                if (scheduleList!=null && scheduleList.size()>0) {
                    // 2、push time-ring
                    for (XxlJobInfo jobInfo: scheduleList) {

                        // time-ring jump
                        // 已经超时5秒但还未被执行的任务
                        if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                            // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                            logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

                            // 1、misfire match
                            MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                            //如果调度超时策略是立即执行一次,触发调度
                            if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                // FIRE_ONCE_NOW 》 trigger
                                JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                            }

                            // 2、fresh next
                            // 刷新下一次执行时间
                            refreshNextValidTime(jobInfo, new Date());
                        // 超时过期小于5秒,直接触发
                        } else if (nowTime > jobInfo.getTriggerNextTime()) {
                            // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

                            // 1、trigger
                            JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                            logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

                            // 2、fresh next
                            refreshNextValidTime(jobInfo, new Date());

                            // next-trigger-time in 5s, pre-read again
                            // 未来5秒将执行的任务,放入秒级的时间轮
                            if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

                                // 1、make ring second
                                // 获取秒数
                                int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                // 2、push time ring
                                //放入时间轮
                                pushTimeRing(ringSecond, jobInfo.getId());

                                //刷新下一次执行时间
                                // 3、fresh next
                                refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                            }

                        } else {
                            // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time

                            // 1、make ring second
                            int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                            // 2、push time ring
                            pushTimeRing(ringSecond, jobInfo.getId());

                            // 3、fresh next
                            refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                        }

                    }

                    // 3、update trigger info
                    for (XxlJobInfo jobInfo: scheduleList) {
                        XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                    }

                } else {
                    preReadSuc = false;
                }

                // tx stop


            } catch (Exception e) {
                if (!scheduleThreadToStop) {
                    logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                }
            } finally {

                // commit
                // 提交事务
                if (conn != null) {
                    try {
                        conn.commit();
                    } catch (SQLException e) {
                        if (!scheduleThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    try {
                        conn.setAutoCommit(connAutoCommit);
                    } catch (SQLException e) {
                        if (!scheduleThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    try {
                        conn.close();
                    } catch (SQLException e) {
                        if (!scheduleThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }

                // close PreparedStatement
                if (null != preparedStatement) {
                    try {
                        preparedStatement.close();
                    } catch (SQLException e) {
                        if (!scheduleThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
            }
            long cost = System.currentTimeMillis()-start;


            // Wait seconds, align second
            if (cost < 1000) {  // scan-overtime, not wait
                try {
                    // pre-read period: success > scan each second; fail > skip this period;
                    TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                } catch (InterruptedException e) {
                    if (!scheduleThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }

        }

        logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
    }
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();

ringThread将时间轮的数据任务取出来执行,代码如下:

    ringThread = new Thread(new Runnable() {
        @Override
        public void run() {

            while (!ringThreadToStop) {

                // align second,在整秒执行,一秒执行一次
                try {
                    TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                } catch (InterruptedException e) {
                    if (!ringThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }

                try {
                    // second data
                    // 将当前秒数和上一秒任务取出来执行
                    List<Integer> ringItemData = new ArrayList<>();
                    int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                    for (int i = 0; i < 2; i++) {
                        List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                        if (tmpData != null) {
                            ringItemData.addAll(tmpData);
                        }
                    }

                    // ring trigger
                    logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                    if (ringItemData.size() > 0) {
                        // do trigger
                        for (int jobId: ringItemData) {
                            // do trigger
                            JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                        }
                        // clear
                        ringItemData.clear();
                    }
                } catch (Exception e) {
                    if (!ringThreadToStop) {
                        logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                    }
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
        }
    });
    ringThread.setDaemon(true);
    ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
    ringThread.start();
}

XXL-JOB的时间轮时间跨度为1秒,一共60秒,每秒执行一次,,取出对应的任务ID集合并执行,XXL-JOB的时间轮是一个Map结构,跟标准的时间轮结构有点不太一样,时间轮的数据结构可参考下图:

image-20221122112217101

时间轮的应用并非Kafka独有,其应用场景还有很多,在Netty、Akka、Quartz、ZooKeeper等组件中都存在时间轮的踪影,想了解时间轮可参考《深入理解kafka》一书,这里不做过多阐述。

至此,回顾下开头的两个问题。

  • XXL-JOB通过数据库的悲观锁解决多实例部署下的竞争问题
  • 触发调度分为两个线程,一个用于将要被执行任务放到时间轮,一个线程获取时间轮的数据进行处理,可以减少误差。