PowerJob开源软件for Huawei开源开发任务历程和总结

举报
纷纷扰扰 发表于 2024/11/08 22:04:48 2024/11/08
【摘要】 在我们分布式系统开发中,基本都会有定时任务,周期处理数据,简单的数据聚合等需求场景,这时就需要一个分布式任务框架来处理这些场景。之前简单调研过PowerJob,很优秀的开源分布式组件,但是PowerJob暂不支持GaussDB数据库,正好计划中有个任务,那么就给它做个扩展,希望大家之后使用到时候可以做个参考。


背景

华为官网了解到华为云沃土云创计划激励计划,它这个计划的目的让开发者将开源软件工具、开源应用和开源组件与华为云对象存储OBS数仓DWS、云容器CCE等云服务对接,同时基于Terraform模板,上架到华为云云商店,支持其他开发者一键部署使用开源组件,称为"开源xxx for HuaweiCloud"参与贡献的开发者将会获得华为云沃土云创计划激励

在我们分布式系统开发中,基本都会有定时任务,周期处理数据,简单的数据聚合等需求场景,这时就需要一个分布式任务框架来处理这些场景。之前简单调研过PowerJob,很优秀的开源分布式组件,但是PowerJob暂不支持GaussDB数据库,正好计划中有个任务,那么就给它做个扩展,希望大家之后使用时候可以做个参考。


设计

根据计划书 的内容,主要开发处理的事项如下:

1. PowerJob存储支持GaussDB

2. PowerJob组件集成到华为开发者组件框架示例servicecomb-fence 中做Demo演示

3. 使用华为CCE鲲鹏环境来部署验证

4. 输出博客验收材料


基于计划书和华为云组件简易的画了个模型图帮助理解,最核心的处理点是PowerJob-server用GaussDB存储上面应用层的数据,然后把任务调度到resource-server示例上面执行,查看执行结果做验证。模型如下:


开发过程

1. 开发适配

引入OpenGaussDBJDBC驱动,修改PowerJob-server-starter中的配置,具体改动如下:

<opengauss.version>5.1.0-og</opengauss.version>

<dependency>

<groupId>org.opengauss</groupId>

<artifactId>opengauss-jdbc</artifactId>

<version>${opengauss.version}</version>

</dependency>

添加针对OpenGauss数据方言的处理类

package tech.powerjob.server.persistence.config.dialect;

import org.hibernate.dialect.PostgreSQL10Dialect;
import org.hibernate.type.descriptor.sql.LongVarbinaryTypeDescriptor;
import org.hibernate.type.descriptor.sql.LongVarcharTypeDescriptor;
import org.hibernate.type.descriptor.sql.SqlTypeDescriptor;

import java.sql.Types;

/**
 * 拷贝自AdpPostgreSQLDialect,用来处理OpenGauss相关数据类型发言
 * 使用方自行通过配置文件激活:spring.datasource.remote.hibernate.properties.hibernate.dialect=tech.powerjob.server.persistence.config.dialect.AdpOpenGaussSQLDialect
 *
 * @since 2024/11/12
 */
public class AdpOpenGaussSQLDialect extends PostgreSQL10Dialect {

    public AdpOpenGaussSQLDialect() {
        super();
        registerColumnType(Types.BLOB, "bytea");
        registerColumnType(Types.CLOB, "text");
    }

    @Override
    public SqlTypeDescriptor remapSqlTypeDescriptor(SqlTypeDescriptor sqlTypeDescriptor) {
        switch (sqlTypeDescriptor.getSqlType()) {
            case Types.CLOB:
                return LongVarcharTypeDescriptor.INSTANCE;
            case Types.BLOB:
                return LongVarbinaryTypeDescriptor.INSTANCE;
            case Types.NCLOB:
                return LongVarbinaryTypeDescriptor.INSTANCE;
        }
        return super.remapSqlTypeDescriptor(sqlTypeDescriptor);
    }
}

具体使用注释掉其它数据库配置,注释开本配置即可使用

####### Huawei GaussDB properties #######

#spring.datasource.remote.hibernate.properties.hibernate.dialect=tech.powerjob.server.persistence.config.dialect.AdpOpenGaussSQLDialect

#spring.datasource.core.driver-class-name=org.opengauss.Driver

#spring.datasource.core.jdbc-url=jdbc:opengauss://<IP>:<port>/powerjob_daily?currentSchema=public

#spring.datasource.core.username=<username>

#spring.datasource.core.password=<password>

#spring.datasource.core.maximum-pool-size=20

#spring.datasource.core.minimum-idle=5

#spring.jpa.database=postgresql

#spring.jpa.hibernate.ddl-auto=update

配置说明:PowerJob中使用了SpringDataJpa作为ORM框架,配置可以自动建表,GaussDB官网说最早GaussDB内核引擎基于PostgreSQL9.2开源版本演进,保留了PostgreSQL的标准接口和公共函数,那么可以基于postgresql做表生成。

2.集成Powerjob-worker

在servicecomb-fence的resource-server中引入依赖,因为PowerJob中日志框架和servicecomb-fence中的日志框架冲突做了排除。

<!-- PowerJob-starter -->

<dependency>

<groupId>tech.powerjob</groupId>

<artifactId>powerjob-worker-spring-boot-starter</artifactId>

<version>5.1.0</version>

<exclusions>

<exclusion>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-logging</artifactId>

</exclusion>

</exclusions>

</dependency>

添加连接配置:

powerjob:
  worker:
    enabled: ${powerjob.worker.enable:false}
    server-address: ${powerjob.worker.server-address:127.0.0.1:7700}
    port: ${powerjob.worker-port:27777}
    protocol: ${powerjob.worker-protocol:http}
    app-name: ${powerjob.worker-namespace:test}
    tag: ${powerjob.worker-tag:test}

添加一个Broadcast广播task示例:

package org.apache.servicecomb.fence.job;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.TaskResult;
import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor;
import tech.powerjob.worker.log.OmsLogger;
import java.util.List;

/**
 * PowerJob BroadcastProcessor  demo
 */
@Component
public class PowerJobBroadcastDemo implements BroadcastProcessor {

    private static final Logger LOGGER = LoggerFactory.getLogger(PowerJobBroadcastDemo.class);

    @Override
    public ProcessResult process(TaskContext taskContext) throws Exception {
        OmsLogger omsLogger = taskContext.getOmsLogger();
        Long jobId = taskContext.getJobId();
        omsLogger.info("Job:{} 前置启动", jobId);
        LOGGER.info("Job:{} 前置启动",jobId);
        return new ProcessResult(true, "init success");
    }

    @Override
    public ProcessResult preProcess(TaskContext context) throws Exception {
        OmsLogger omsLogger = context.getOmsLogger();
        Long jobId = context.getJobId();
        omsLogger.info("Job:{} 处理器处理中", jobId);
        LOGGER.info("Job:{} 处理器处理中", jobId);
        return new ProcessResult(true, "release resource success");
    }

    @Override
    public ProcessResult postProcess(TaskContext context, List<TaskResult> taskResults) throws Exception {
        OmsLogger omsLogger = context.getOmsLogger();
        Long jobId = context.getJobId();
        omsLogger.info("Job:{} 处理器处理完成", jobId);
        LOGGER.info("Job:{} 处理器处理完成", jobId);
        return new ProcessResult(true, "process success");
    }
}

生成的表如下图:


环境准备

本地验证成功之后在华为云上搭建环境,使用鲲鹏相关的服务器做演示验证验收。

集群节点使用鲲鹏类型:

编译构建使用鲲鹏ARM服务器:

最终部署结果如下:


验证

1.Resource-serverWoker的注册实例

2. 新增应用和任务


3. 查询任务运行结果和日志

PowerJob-server端结果与日志如下:

PowerJob-woker日志如下:

4. 修改任务

5. 删除任务

结论

通过适配开发,根据上面设计中的模型图部署环境验证之后,证明PowerJob组件可以支持GaussDB底层数据存储读写,可以流畅运行在华为鲲鹏生态组件中。


收获

1. 更加了解了PowerJob的架构和功能

2. 掌握了华为云一整套微服务web构建过程

3. 完成了一点小贡献,收获了喜悦和快乐


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。