执行器启动流程分析
初始化流程
声明XxlJobSpringExecutor
bean后,执行器会自动开始进行初始化逻辑,声明代码如下:
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
XxlJobSpringExecutor
交给Spring预初始化后会自动执行执行器初始化操作,这是怎么实现的呢?查看XxlJobSpringExecutor
相关逻辑,代码如下:
执行器实现了 ApplicationContextAware, SmartInitializingSingleton, DisposableBean
三个接口,作用分别如下:
ApplicationContextAware
可以获取到ApplicationContext
org.springframework.beans.factory.DisposableBean#destroy
,该方法在bean销毁后调用。org.springframework.beans.factory.SmartInitializingSingleton#afterSingletonsInstantiated
方法在单例bean预加载后被调用,XxlJobSpringExecutor
实现了该方法,初始化的核心流程就通过该方法实现。
核心流程如下:
@Override
public void afterSingletonsInstantiated() {
// init JobHandler Repository
/*initJobHandlerRepository(applicationContext);*/
// init JobHandler Repository (for method)
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
大致分为三个步骤:
- 初始化JobHandler
- 刷新工厂类
- 调用父类的启动方法
initJobHandlerMethodRepository方法如下:
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
// init job handler from method
//从容器获取bean数组
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {
// get bean
Object bean = null;
// 忽略懒加载bean
Lazy onBean = applicationContext.findAnnotationOnBean(beanDefinitionName, Lazy.class);
if (onBean!=null){
logger.debug("xxl-job annotation scan, skip @Lazy Bean:{}", beanDefinitionName);
continue;
}else {
bean = applicationContext.getBean(beanDefinitionName);
}
// filter method
// 获取@XxlJob注解的方法
Map<Method, XxlJob> annotatedMethods = null; // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
new MethodIntrospector.MetadataLookup<XxlJob>() {
@Override
public XxlJob inspect(Method method) {
return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
}
});
} catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
}
if (annotatedMethods==null || annotatedMethods.isEmpty()) {
continue;
}
//初始化
// generate and regist method job handler
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
Method executeMethod = methodXxlJobEntry.getKey();
XxlJob xxlJob = methodXxlJobEntry.getValue();
// regist
//注册
registJobHandler(xxlJob, bean, executeMethod);
}
}
}
registJobHandler首先做了一系列校验和准备工作,包括:
- 校验名称是否为空
- 校验是否存在同名job
- 将目标执行方法设置成可访问
- 注册jobHandler
使用ConcurrentMap存储jobHandler。
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return jobHandlerRepository.put(name, jobHandler);
}
IJobHandler接口提供了init、execute、destory方法,并提供了几种实现,类结构如下图所示:
- ScriptJobHandler :任务使用GLUE(Shell)、GLUE(Python)、GLUE(PHP)、GLUE(Nodejs)、GLUE(PowerShell)运行模式
- GlueJobHandler:任务使用GLUE(Java)运行模式
- MethodJobHandler: 任务使用Bean运行模式
至此initJobHandlerMethodRepository(applicationContext)
逻辑大致分析到这里,GlueFactory.refreshInstance(1)提供了对象相关字段自动补齐,不再具体描述。着重关心start()里面的逻辑,代码如下:
public void start() throws Exception {
// init logpath
//初始化日志路径
XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client
//初始化admin-client
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
//启动日志文件清理线程
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread
//启动触发回调线程
TriggerCallbackThread.getInstance().start();
// init executor-server
//启动自研rpc
initEmbedServer(address, ip, port, appname, accessToken);
}
重点关注下initEmbedServer方法,核心逻辑是初始化内置的RPC服务:
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
依赖Netty实现自研RPC服务,核心代码如下
EmbedHttpServerHandler继承了SimpleChannelInboundHandler,会对消息进行解析,并通过线程池异步处理后将结果返回。
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
// request parse
//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
String requestData = msg.content().toString(CharsetUtil.UTF_8);
String uri = msg.uri();
HttpMethod httpMethod = msg.method();
boolean keepAlive = HttpUtil.isKeepAlive(msg);
String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
// invoke
bizThreadPool.execute(new Runnable() {
@Override
public void run() {
// do invoke
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
// to json
String responseJson = GsonTool.toJson(responseObj);
// write response
writeResponse(ctx, keepAlive, responseJson);
}
});
}
注册逻辑:
注
当xxl-job-admin部署了多个服务时,HTTP默认都只会请求第一个,除非请求第一个服务抛出异常 ,才会请求其他admin服务。
注册使用Http请求进行通讯
至此,执行器的初始化逻辑已经结束,总流程如下图所示:
总结
- SmartInitializingSingleton中只有一个接口afterSingletonsInstantiated(),其作用是是 在spring容器管理的所有单例对象(非懒加载对象)初始化完成之后调用的回调接口。
- DisposableBean在bean被销毁后调用。
- XXL-JOB任务调度底层基于Netty实现。