XxlJob注册及发现原理
各位程序员伙伴在工作中一定会接触到定时触发任务的场景,而目前业界内使用最多的就是XXL-JOB了,在接下来的几篇文章中,笔者会带各位深入了解一下XXL-JOB底层的原理,话不多说,我们现在开始
简介
XXL-JOB是一款轻量级的分布式任务调度中间件,默认支持6000个定时任务,如果生产环境的任务数量在这个范围内,可以选择使用 XXL-JOB。目前很多大型公司在任务调度的场景使用的都是XXL-JOB。
整体架构
先简单介绍一下xxl的整体架构
XXL-JOB分为执行器和调度中心,执行器在启动时注册到调度中心上,任务相关的配置,包括执行时间,阻塞策略,执行的方法,执行的策略保存在调度中心中,上述配置完毕后,调度中心就会跟据配置在指定的时间根据指定的策略执行任务。此外调度中心也会记录每次任务执行的状态。
注册与销毁
XXL-JOB采用的是Http通信,实际上,调度中心和执行器各启动了一个Netty的服务器,用来通信,调度中心也提供了对应的接口用来处理执行器发送的请求。
下面这个类的记录的是调度中心的相关属性
在执行器启动时,初始netty的过程中,会调用注册方法,通知调度中心,执行器已经启动
public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {
@Override
public void run() {
// param
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});
try {
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// start registry
startRegistry(appname, address);
// wait util stop
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
} finally {
// stop
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}
public void startRegistry(final String appname, final String address) {
// start registry
ExecutorRegistryThread.getInstance().start(appname, address);
}
ExecutorRegistryThread 类处理的就是执行器与调度中心的心跳,在首次启动时会立刻通知调度中心,其次,会与调度中心维护一个30s间隔的心跳,使调度中心能够了解各执行器的状态,在停止时也会调用 registryRemove 方法通知调度中心,使自己下线
registryThread = new Thread(new Runnable() {
@Override
public void run() {
// registry
while (!toStop) {
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
if (!toStop) {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
}
} catch (InterruptedException e) {
if (!toStop) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
}
}
}
// registry remove
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
if (!toStop) {
logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
}
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy.");
}
});
registryThread.setDaemon(true);
registryThread.setName("xxl-job, executor ExecutorRegistryThread");
registryThread.start();
调度中心提供了通用的restful接口,处理执行器的请求,当执行器注册时,会将注册器的信息保存在xxl_job_registry 表中,在移除时也会修改对应记录的信息
此外,为防止意外导致执行器不能正常下线下线,调度中心在启动时也维护了一个线程用来管理管理各执行器
在这个线程中会定期处理长时间未与调度中心保持心跳的执行器
总结
当执行器启动时,它通过Netty框架提供的服务暴露调度接口。通过调度中心的注册接口,执行器将自己的名称、IP地址和端口信息传输到调度中心,并每30秒更新注册信息。同样地,在执行器停止时,它会请求调度中心的注销接口进行注销。
调度中心提供注册和注销接口供执行器调用。当调度中心接收到注册或注销请求后,会更新xxl_job_registry表中的执行器注册信息。此外,调度中心还启动一个探活线程,定期检查90秒内未更新注册信息的执行器,并将其删除。
需要注意的是,以上流程仅涉及执行器的注册与发现,任务调度和回调相关的逻辑将在下一篇文章中进行讨论。
- 点赞
- 收藏
- 关注作者
评论(0)