SpringBoot 使用 Elastic-Job分布式任务调度框架

举报
程序员-上善若水 发表于 2022/06/23 23:57:58 2022/06/23
【摘要】 SpringBoot 使用 Elastic-Job分布式任务调度框架 一、Elastic-Job简介 ElasticJob 是一个分布式调度解决方案,由两个独立的项目组成,即 ElasticJob-L...

SpringBoot 使用 Elastic-Job分布式任务调度框架

一、Elastic-Job简介

ElasticJob 是一个分布式调度解决方案,由两个独立的项目组成,即 ElasticJob-Lite 和 ElasticJob-Cloud。通过灵活调度、资源管理和作业管理等功能,创建适合 Internet 场景的分布式调度解决方案,通过开放式架构设计提供多元化的作业生态系统。它每个项目都使用统一的作业 API。开发人员只需一次代码,就可以进行部署。
使用 ElasticJob 可以使开发人员不再担心非功能性要求,如作业横向扩展,这样他们就可以更专注于业务编码;同时,它也可以释放操作员,这样他们不必担心作业的高可用性和管理,并可以通过简单地添加服务器来自动操作。

github:https://github.com/apache/shardingsphere-elasticjob

二、Elastic-Job特性

  1. 弹性时间表

    支持分布式系统中的作业分片和高可用性
    扩展,提高吞吐量和效率
    作业处理能力灵活且可扩展,可分配资源

  2. 资源分配

    在适当的时间和分配的资源上执行作业
    将同一作业聚合到同一作业执行器
    动态地将资源追加到新分配的作业

  3. 工作治理

    故障
    失火
    分布环境不稳定时的自我诊断和恢复

  4. 作业依赖项 (TODO)

    基于 DAG 的作业依赖项
    基于 DAG 的作业项依赖项

  5. 工作开放生态系统

    为扩展统一作业 api
    支持丰富的作业类型 lib,如数据流、脚本、HTTP、文件、大数据
    重点业务 SDK ,可与春季 IOC 合作

  6. 管理控制台

    工作管理
    作业事件跟踪查询
    注册中心管理

三、Elastic-Job作业分片策略

Elastic-Job 有 AverageAllocationJobShardingStrategy、OdevitySortByNameJobShardingStrategy、RotateServerByNameJobShardingStrategy 三种分片策略。

  1. 其中AverageAllocationJobShardingStrategy分片策略是默认的分片策略,基于平均分配算法的分片策略,作业数能被服务器数整除情况下均匀分配。如果分片不能整除,则不能整除的多余分片将依次追加到序号小的服务器。
  2. OdevitySortByNameJobShardingStrategy分片策略,该策略核心思想为根据作业名的哈希值奇偶数决定采用IP升/降序算法实现分片,作业名的哈希值为奇数则IP升序,作业名的哈希值为偶数则IP降序,通过这种方式用于将不同的作业分片负载均衡至不同的服务器。
  3. RotateServerByNameJobShardingStrategy分片策略,根据作业名的哈希值对服务器列表进行轮转的分片策略,其内部也是采用平均分片算。

四、Elastic-Job作业类型

  1. Simple类型作业:
    SimpleJob需要实现SimpleJob接口,意为简单实现,未经过任何封装,与quartz原生接口相似,比如示例代码中所使用的job。

  2. Dataflow类型作业:
    Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。可通过DataflowJobConfiguration配置是否流式处理。流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。

  3. Script类型作业:
    Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。

五、SpringBoot简单搭建使用

创建4个分片,两个应用程序去执行,看效果。

分片参数:0=A,1=B,2=C,3=D
服务:localhost:8090,localhost:8091

  1. 添加maven依赖
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>2.1.5</version>
        </dependency>

        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>2.1.5</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.16</version>
        </dependency>
  1. application.yml
server:
  port: 8090

elastic-job:
  zk:
    ip: 192.168.40.129:2181  #zk地址,可添加多个
    namespace: ejob-springboot #命名空间,多个项目要一致
  job:
    cron: 0/5 * * * * ?  #cron表达式,多久执行一次
    shardingTotalCount: 4  #分片数量
    shardingItemParameters: 0=A,1=B,2=C,3=D  #分片参数
  1. 注册ZK
@Configuration
public class ElasticRegCenterConfig {
    @Value("${elastic-job.zk.ip}")
    private String serverList;

    @Value("${elastic-job.zk.namespace}")
    private String namespace;

    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter() {
    	ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(serverList, namespace);
        zookeeperConfiguration.setMaxRetries(3); //设置重试次数,可设置其他属性
        zookeeperConfiguration.setSessionTimeoutMilliseconds(500000); //设置会话超时时间,尽量大一点,否则项目无法正常启动
        return new ZookeeperRegistryCenter(zookeeperConfiguration );
    }
}
  1. 创建job
@Slf4j
@Component
public class SimpleJobDemo implements SimpleJob {
    public void execute(ShardingContext shardingContext) {
        log.info("**********************Job Start***********************");
        log.info("当前任务名称--->"+shardingContext.getJobName());
        log.info("任务总片数--->"+shardingContext.getShardingTotalCount());
        log.info("当前分片项--->"+shardingContext.getShardingItem());
        log.info("当前参数--->"+shardingContext.getShardingParameter());
        log.info("当前任务参数--->"+ shardingContext.getJobParameter());
        log.info("======================Job End=========================");
    }
}
  1. 注册job
@Configuration
public class ElasticJobConfig {
    @Autowired
    private ZookeeperRegistryCenter regCenter;

    @Autowired
    SimpleJobDemo simpleJob;

    @Value("${elastic-job.job.cron}")
    private String cron;

    @Value("${elastic-job.job.shardingTotalCount}")
    private int shardingTotalCount;

    @Value("${elastic-job.job.shardingItemParameters}")
    private String shardingItemParameters;

    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler() {

        JobCoreConfiguration coreConfig = JobCoreConfiguration
                .newBuilder(simpleJob.getClass().getName(), cron, shardingTotalCount)
                .shardingItemParameters(shardingItemParameters).build();

        String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName();

        LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(
                new SimpleJobConfiguration(coreConfig, simpleJob.getClass().getCanonicalName()))
                .jobShardingStrategyClass(jobShardingStrategyClass)
                .overwrite(true).build();

        return new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration);
    }
}
  1. 效果
    当8090,和8091服务同时运行时,打印日志:
    在这里插入图片描述
    当将其中一台停止后:
    在这里插入图片描述

文章来源: blog.csdn.net,作者:小毕超,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_43692950/article/details/107443762

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。