SpringBoot 使用 Elastic-Job分布式任务调度框架
SpringBoot 使用 Elastic-Job分布式任务调度框架
一、Elastic-Job简介
ElasticJob 是一个分布式调度解决方案,由两个独立的项目组成,即 ElasticJob-Lite 和 ElasticJob-Cloud。通过灵活调度、资源管理和作业管理等功能,创建适合 Internet 场景的分布式调度解决方案,通过开放式架构设计提供多元化的作业生态系统。它每个项目都使用统一的作业 API。开发人员只需一次代码,就可以进行部署。
使用 ElasticJob 可以使开发人员不再担心非功能性要求,如作业横向扩展,这样他们就可以更专注于业务编码;同时,它也可以释放操作员,这样他们不必担心作业的高可用性和管理,并可以通过简单地添加服务器来自动操作。
github:https://github.com/apache/shardingsphere-elasticjob
二、Elastic-Job特性
-
弹性时间表
支持分布式系统中的作业分片和高可用性
扩展,提高吞吐量和效率
作业处理能力灵活且可扩展,可分配资源 -
资源分配
在适当的时间和分配的资源上执行作业
将同一作业聚合到同一作业执行器
动态地将资源追加到新分配的作业 -
工作治理
故障
失火
分布环境不稳定时的自我诊断和恢复 -
作业依赖项 (TODO)
基于 DAG 的作业依赖项
基于 DAG 的作业项依赖项 -
工作开放生态系统
为扩展统一作业 api
支持丰富的作业类型 lib,如数据流、脚本、HTTP、文件、大数据
重点业务 SDK ,可与春季 IOC 合作 -
管理控制台
工作管理
作业事件跟踪查询
注册中心管理
三、Elastic-Job作业分片策略
Elastic-Job 有 AverageAllocationJobShardingStrategy、OdevitySortByNameJobShardingStrategy、RotateServerByNameJobShardingStrategy 三种分片策略。
- 其中AverageAllocationJobShardingStrategy分片策略是默认的分片策略,基于平均分配算法的分片策略,作业数能被服务器数整除情况下均匀分配。如果分片不能整除,则不能整除的多余分片将依次追加到序号小的服务器。
- OdevitySortByNameJobShardingStrategy分片策略,该策略核心思想为根据作业名的哈希值奇偶数决定采用IP升/降序算法实现分片,作业名的哈希值为奇数则IP升序,作业名的哈希值为偶数则IP降序,通过这种方式用于将不同的作业分片负载均衡至不同的服务器。
- RotateServerByNameJobShardingStrategy分片策略,根据作业名的哈希值对服务器列表进行轮转的分片策略,其内部也是采用平均分片算。
四、Elastic-Job作业类型
-
Simple类型作业:
SimpleJob需要实现SimpleJob接口,意为简单实现,未经过任何封装,与quartz原生接口相似,比如示例代码中所使用的job。 -
Dataflow类型作业:
Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。可通过DataflowJobConfiguration配置是否流式处理。流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。 -
Script类型作业:
Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。
五、SpringBoot简单搭建使用
创建4个分片,两个应用程序去执行,看效果。
分片参数:0=A,1=B,2=C,3=D
服务:localhost:8090,localhost:8091
- 添加maven依赖
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
</dependency>
- application.yml
server:
port: 8090
elastic-job:
zk:
ip: 192.168.40.129:2181 #zk地址,可添加多个
namespace: ejob-springboot #命名空间,多个项目要一致
job:
cron: 0/5 * * * * ? #cron表达式,多久执行一次
shardingTotalCount: 4 #分片数量
shardingItemParameters: 0=A,1=B,2=C,3=D #分片参数
- 注册ZK
@Configuration
public class ElasticRegCenterConfig {
@Value("${elastic-job.zk.ip}")
private String serverList;
@Value("${elastic-job.zk.namespace}")
private String namespace;
@Bean(initMethod = "init")
public ZookeeperRegistryCenter regCenter() {
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverList, namespace);
zookeeperConfiguration.setMaxRetries(3); //设置重试次数,可设置其他属性
zookeeperConfiguration.setSessionTimeoutMilliseconds(500000); //设置会话超时时间,尽量大一点,否则项目无法正常启动
return new ZookeeperRegistryCenter(zookeeperConfiguration );
}
}
- 创建job
@Slf4j
@Component
public class SimpleJobDemo implements SimpleJob {
public void execute(ShardingContext shardingContext) {
log.info("**********************Job Start***********************");
log.info("当前任务名称--->"+shardingContext.getJobName());
log.info("任务总片数--->"+shardingContext.getShardingTotalCount());
log.info("当前分片项--->"+shardingContext.getShardingItem());
log.info("当前参数--->"+shardingContext.getShardingParameter());
log.info("当前任务参数--->"+ shardingContext.getJobParameter());
log.info("======================Job End=========================");
}
}
- 注册job
@Configuration
public class ElasticJobConfig {
@Autowired
private ZookeeperRegistryCenter regCenter;
@Autowired
SimpleJobDemo simpleJob;
@Value("${elastic-job.job.cron}")
private String cron;
@Value("${elastic-job.job.shardingTotalCount}")
private int shardingTotalCount;
@Value("${elastic-job.job.shardingItemParameters}")
private String shardingItemParameters;
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler() {
JobCoreConfiguration coreConfig = JobCoreConfiguration
.newBuilder(simpleJob.getClass().getName(), cron, shardingTotalCount)
.shardingItemParameters(shardingItemParameters).build();
String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName();
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(
new SimpleJobConfiguration(coreConfig, simpleJob.getClass().getCanonicalName()))
.jobShardingStrategyClass(jobShardingStrategyClass)
.overwrite(true).build();
return new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration);
}
}
- 效果
当8090,和8091服务同时运行时,打印日志:
当将其中一台停止后:
文章来源: blog.csdn.net,作者:小毕超,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_43692950/article/details/107443762
- 点赞
- 收藏
- 关注作者
评论(0)