XXL-JOB任务执行流程分析
概览
一次任务调度流程的过程如下图所示:
源码分析
调度中心流程
任务的调度执行是从调用com.xxl.job.admin.core.thread.JobTriggerPoolHelper#trigger
接口开始,该方法如下所示:
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
// choose thread pool
// 首先选择线程池,一共有两个线程池,一个用户处理耗时较短的任务,一个用户处理耗时较长的任务。
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
// trigger
triggerPool_.execute(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
// do trigger
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// check timeout-count-map
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {
minTim = minTim_now;
jobTimeoutCountMap.clear();
}
// incr timeout-count-map
long cost = System.currentTimeMillis()-start;
if (cost > 500) { // ob-timeout threshold 500ms
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}
}
}
});
}
大致流程如下:
选择线程池,默认有两个线程池,一个用于执行耗时较短的任务,一个用户执行耗时较长的任务,当某任务一分钟内执行超时超过10次时(超时时间默认为500ms),该任务将会被标记为耗时较长。
在线程池执行调度逻辑,
com.xxl.job.admin.core.trigger.XxlJobTrigger#processTrigger
核心代码如下,大概流程- 保存执行日志ID
- 初始化触发参数
- 初始化执行器地址
- HTTP调用执行器服务端
- 收集采集信息,保存执行结果
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){ // param ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null; // 1、save log-id //保存执行日志 XxlJobLog jobLog = new XxlJobLog(); jobLog.setJobGroup(jobInfo.getJobGroup()); jobLog.setJobId(jobInfo.getId()); jobLog.setTriggerTime(new Date()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId()); // 2、init trigger-param //初始化触发参数 TriggerParam triggerParam = new TriggerParam(); triggerParam.setJobId(jobInfo.getId()); triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); triggerParam.setExecutorParams(jobInfo.getExecutorParam()); triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout()); triggerParam.setLogId(jobLog.getId()); triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime()); triggerParam.setGlueType(jobInfo.getGlueType()); triggerParam.setGlueSource(jobInfo.getGlueSource()); triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); triggerParam.setBroadcastIndex(index); triggerParam.setBroadcastTotal(total); // 3、init address // 初始化地址 String address = null; ReturnT<String> routeAddressResult = null; if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) { if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) { if (index < group.getRegistryList().size()) { address = group.getRegistryList().get(index); } else { address = group.getRegistryList().get(0); } } else { routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList()); if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) { address = routeAddressResult.getContent(); } } } else { routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty")); } // 4、trigger remote executor // 远程调用,HTTP调用 ReturnT<String> triggerResult = null; if (address != null) { triggerResult = runExecutor(triggerParam, address); } else { triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null); } // 5、collection trigger info // 收集采集信息 StringBuffer triggerMsgSb = new StringBuffer(); triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":") .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") ); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle()); if (shardingParam != null) { triggerMsgSb.append("("+shardingParam+")"); } triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout()); triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount); triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>") .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():""); // 6、save log trigger-info // 保存日志触发结果 jobLog.setExecutorAddress(address); jobLog.setExecutorHandler(jobInfo.getExecutorHandler()); jobLog.setExecutorParam(jobInfo.getExecutorParam()); jobLog.setExecutorShardingParam(shardingParam); jobLog.setExecutorFailRetryCount(finalFailRetryCount); //jobLog.setTriggerTime(); jobLog.setTriggerCode(triggerResult.getCode()); jobLog.setTriggerMsg(triggerMsgSb.toString()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); }
执行器流程
执行器内置了一个Netty http服务端用于接收调度中心的请求,处理的核心逻辑在com.xxl.job.core.server.EmbedServer.EmbedHttpServerHandler#channelRead0
,通过线程池处理调度逻辑,代码如下:
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
// request parse
//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
String requestData = msg.content().toString(CharsetUtil.UTF_8);
String uri = msg.uri();
HttpMethod httpMethod = msg.method();
boolean keepAlive = HttpUtil.isKeepAlive(msg);
String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
// invoke
bizThreadPool.execute(new Runnable() {
@Override
public void run() {
// do invoke
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
// to json
String responseJson = GsonTool.toJson(responseObj);
// write response
writeResponse(ctx, keepAlive, responseJson);
}
});
}
process方法底层会调用com.xxl.job.core.biz.impl.ExecutorBizImpl#run
方法将任务执行请求添加到JobThread的队列当中,JobThread是对任务执行的进一步封装,每一个JobThread有一个异步阻塞队列,正常情况下,一个任务对应一个JobThread对象,执行任务请求会放入到阻塞队列当中,JobThread会轮询获取阻塞队列的请求并处理。类属性和结构如下:
执行方法源代码如下:
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
// load old:jobHandler + jobThread
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// valid:jobHandler + jobThread
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// new jobhandler
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// valid old jobThread
if (jobThread!=null && jobHandler != newJobHandler) {
// change handler, need kill old thread
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = newJobHandler;
if (jobHandler == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
}
}
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof GlueJobHandler
&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change handler or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
try {
IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
}
} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof ScriptJobHandler
&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change script or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
}
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
}
// executor block strategy
//阻塞策略
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
//如果是丢弃后续调度
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
// kill running jobThread
//如果是覆盖之前调度,将jobThread置空
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// just queue trigger
}
}
// replace thread (new or exists invalid)
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// push data to queue
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
任务与JobThread对应的流程图如下,以任务1执行为例,任务1对应一个JobThread,先从缓存判断对应任务的JobThread是否存在,如果不存在,新建一个,判断调度阻塞的策略,如果是覆盖之前的调度,将对应的JobThread停掉并开始一个新的JobThread并放入缓存。JobThread会循环获取任务请求队列的数据并执行,将执行结果放入结果回调队列。为了避免JobThread线程空转,当队列不存在数据并且循环次数超过30次后,线程将关闭,减少CPU使用。
总结
为了提高吞吐量和性能,XXL-JOB使用了线程池、队列、NIO(Netty)、批量处理等技术,为了避免调度任务执行时间不一致可能会引起的全局阻塞,将调度任务线程池分成了处理耗时较长任务的线程池和处理耗时较短任务的线程池分开处理,从线程池角度对任务进行隔离。