💖分布式任务调度有那么难吗?来,10分钟带你实战💖

举报
XiaoLin_Java 发表于 2022/02/22 15:16:31 2022/02/22
【摘要】 四、小案例 4.1、单机版本 4.1.1、需求描述数据库中有一些列的数据,需要对这些数据进行备份操作,备份完之后,修改数据的状态,标记已经备份了。 4.1.2、创建数据库SET FOREIGN_KEY_CHECKS=0;-- ------------------------------ Table structure for t_file_custom-- ----------------...

四、小案例

4.1、单机版本

4.1.1、需求描述

数据库中有一些列的数据,需要对这些数据进行备份操作,备份完之后,修改数据的状态,标记已经备份了。

4.1.2、创建数据库

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for t_file_custom
-- ----------------------------
DROP TABLE IF EXISTS `t_file_custom`;
CREATE TABLE `t_file_custom` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `content` varchar(255) DEFAULT NULL,
  `type` varchar(255) DEFAULT NULL,
  `backedUp` tinyint(4) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of t_file_custom
-- ----------------------------
INSERT INTO `t_file_custom` VALUES ('1', '文件1', '内容1', 'text', '1');
INSERT INTO `t_file_custom` VALUES ('2', '文件2', '内容2', 'text', '1');
INSERT INTO `t_file_custom` VALUES ('3', '文件3', '内容3', 'text', '1');
INSERT INTO `t_file_custom` VALUES ('4', '文件4', '内容4', 'image', '1');
INSERT INTO `t_file_custom` VALUES ('5', '文件5', '内容5', 'image', '1');
INSERT INTO `t_file_custom` VALUES ('6', '文件6', '内容6', 'text', '1');
INSERT INTO `t_file_custom` VALUES ('7', '文件6', '内容7', 'radio', '1');
INSERT INTO `t_file_custom` VALUES ('8', '文件8', '内容8', 'radio', '1');
INSERT INTO `t_file_custom` VALUES ('9', '文件9', '内容9', 'vedio', '1');
INSERT INTO `t_file_custom` VALUES ('10', '文件10', '内容10', 'vedio', '1');
INSERT INTO `t_file_custom` VALUES ('11', '文件11', '内容11', 'vedio', '1');
INSERT INTO `t_file_custom` VALUES ('12', '文件12', '内容12', 'vedio', '1');
INSERT INTO `t_file_custom` VALUES ('13', '文件13', '内容13', 'image', '1');
INSERT INTO `t_file_custom` VALUES ('14', '文件14', '内容14', 'text', '1');
INSERT INTO `t_file_custom` VALUES ('15', '文件15', '内容15', 'image', '1');
INSERT INTO `t_file_custom` VALUES ('16', '文件16', '内容16', 'text', '1');
INSERT INTO `t_file_custom` VALUES ('17', '文件17', '内容17', 'radio', '1');
INSERT INTO `t_file_custom` VALUES ('18', '文件18', '内容18', 'image', '1');
INSERT INTO `t_file_custom` VALUES ('19', '文件19', '内容19', 'radio', '1');
INSERT INTO `t_file_custom` VALUES ('20', '文件20', '内容20', 'vedio', '1');

4.1.3、Druid&MyBatis

4.1.3.1、添加依赖

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.10</version>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.2.0</version>
        </dependency>
        <!--mysql驱动-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>

4.1.3.2、集成数据库

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/elastic-job-demo?serverTimezone=GMT%2B8
    driverClassName: com.mysql.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
    username: root
    password: admin

4.1.4、添加实体类

@Data
public class FileCustom {
    //唯一标识
    private Long id;
    //文件名
    private String name;
    //文件类型
    private String type;
    //文件内容
    private String content;
    //是否已备份
    private Boolean backedUp = false;
    public FileCustom(){}
    public FileCustom(Long id, String name, String type, String content){
        this.id = id;
        this.name = name;
        this.type = type;
        this.content = content;
    }
}

4.1.5、添加任务类

@Slf4j
@Component
public class FileCustomElasticJob implements SimpleJob {

  @Autowired
  FileCopyMapper fileCopyMapper;

  @Override
  public void execute(ShardingContext shardingContext) {
    doWork();
  }

  private void doWork() {
    List<FileCustom> fileCustoms = fileCopyMapper.selectAll();
    if (fileCustoms.size() == 0){
      log.info("备份完成");
      return;
    }
    log.info("需要备份的文件个数为:"+fileCustoms.size());
    for (FileCustom fileCustom : fileCustoms) {
      backUpFile(fileCustom);
    }
  }

  private void backUpFile(FileCustom fileCustom) {
    try {
      Thread.sleep(1000);
      log.info("执行备份文件:"+fileCustom);
      fileCopyMapper.backUpFile(fileCustom.getId());

    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

4.1.6、添加Mapper

@Mapper
public interface FileCopyMapper {

  @Select("select * from t_file_custom where backedUp = 0")
  List<FileCustom> selectAll();

  @Update("update t_file_custom set backedUp = 1 where id = #{id}")
  void backUpFile(Long id);
}

4.1.7、添加任务调度配置

  @Bean(initMethod = "init")
  public SpringJobScheduler initFileCustomElasticJob(FileCustomElasticJob fileCustomElasticJob){
    SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileCustomElasticJob,registryCenter,createJobConfiguration(XiaoLinJob.class,"0/3 * * * * ?",1,null));
    return springJobScheduler;
  }

4.1.8、存在的问题

为了高可用,我们会对这个项目做集群的操作,可以保证其中一台挂了,另外一台可以继续工作.但是在集群的情况下,调度任务只在一台机器上运行,如果单个任务调度比较耗时,耗资源的情况下,对这台机器的消耗还是比较大的。

但是这个时候,其他机器却是空闲着的,如何合理的利用集群的其他机器且如何让任务执行得更快些呢?这时候Elastic-Job提供了任务调度分片的功能。

4.2、集群版本

4.2.1、分片概念

作业分片是指任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的应用实例分别执行某一个或者几个分布项。

例如在单机版本的备份数据的案例,如果有两台服务器,每台服务器分别跑一个应用实例。为了快速执行作业,那么可以讲任务分成4片,每个应用实例都执行两片。作业遍历数据逻辑应为:实例1查找text和image类型文件执行备份,实例2查找radio和vedio类型文件执行备份。

如果由于服务器拓容应用实例数量增加为4,则作业遍历数据的逻辑应为: 4个实例分别处理text、image、radio、video类型的文件。

通过对任务的合理分片化,从而达到任务并行处理的效果,他的好处是:

  1. 分片项与业务处理解耦:Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。
  2. 最大限度利用资源:将分片项设置大于服务器的数据,最好是大于服务器倍数的数量,作业将会合理利用分布式资源,动态的分配分片项。例如:3台服务器,分成10片,则分片项结果为服务器A=0、1、2。服务器B=3、4、5。服务器C=6、7、8、9。如果 服务器C奔溃,则分片项分配结果为服务器A=0、1、2、3、4。服务器B=5、6、7、8、9。在不丢失分片项的情况下,最大限度利用现有的资源提高吞吐量。

4.2.2、配置类修改

如果想将单机版本改为集群版本,我们首先需要在任务配置类中增加分片个数以及分片参数。

@Bean(initMethod = "init")
  public SpringJobScheduler initFileCustomElasticJob(FileCustomElasticJob fileCustomElasticJob){
    SpringJobScheduler springJobScheduler = new 
 //第一个参数表示自定义任务类,第二个参数是corn表达式,第三个参数是分片个数,第四个参数是分片的名称,第一个分片作用是查询类型为test的,以此类推 
 SpringJobScheduler(fileCustomElasticJob,registryCenter,createJobConfiguration(XiaoLinJob.class,"0/3 * * * * ?",4,"0=text,1=image,2=radio,3=vedio"));
    return springJobScheduler;
  }

4.2.3、 新增作业分片逻辑

@Slf4j
@Component
public class FileCustomElasticJob implements SimpleJob {

  @Autowired
  FileCopyMapper fileCopyMapper;

  @Override
  public void execute(ShardingContext shardingContext) {
    // 获取到指定分片的类型
    doWork(shardingContext.getShardingParameter());
  }

  private void doWork(String fileType) {
    List<FileCustom> fileCustoms = fileCopyMapper.selectByType(fileType);
    if (fileCustoms.size() == 0){
      log.info("备份完成");
      return;
    }
    log.info("需要备份的文件类型是:"+fileType+"文件个数为:"+fileCustoms.size());
    for (FileCustom fileCustom : fileCustoms) {
      backUpFile(fileCustom);
    }
  }

  private void backUpFile(FileCustom fileCustom) {
    try {
      Thread.sleep(2000);
      log.info("执行备份文件:"+fileCustom);
      fileCopyMapper.backUpFile(fileCustom.getId());

    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

4.2.4、Mapper类修改

@Mapper
public interface FileCopyMapper {

  @Select("select * from t_file_custom where backedUp = 0")
  List<FileCustom> selectAll();

  @Update("update t_file_custom set backedUp = 1 where id = #{id}")
  void backUpFile(Long id);

  @Select("select * from t_file_custom where backedUp = 0 and type = #{fileType}")
  List<FileCustom> selectByType(String fileType);
}

4.2.5、测试

4.2.5.1、一台机器

一台机器启动四个线程直接跑完。

4.2.5.2、四台机器

当四台机器启动的时候,每台机器分得一个线程,查询并备份一种类型的数据。

image-20210509143349112

----------------------------------------------------------我是分割线----------------------------------------------------------

image-20210509143404053

----------------------------------------------------------我是分割线----------------------------------------------------------

image-20210509143603194

----------------------------------------------------------我是分割线----------------------------------------------------------

image-20210509143614562

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。