PowerJob开源软件for Huawei开源开发任务历程和总结
背景
在华为官网了解到华为云沃土云创计划激励计划,它这个计划的目的让开发者将开源软件工具、开源应用和开源组件与华为云对象存储OBS、数仓DWS、云容器CCE等云服务对接,同时基于Terraform模板,上架到华为云云商店,支持其他开发者一键部署使用开源组件,称为"开源xxx for HuaweiCloud"。参与贡献的开发者将会获得华为云沃土云创计划激励。
在我们分布式系统开发中,基本都会有定时任务,周期处理数据,简单的数据聚合等需求场景,这时就需要一个分布式任务框架来处理这些场景。之前简单调研过PowerJob,很优秀的开源分布式组件,但是PowerJob暂不支持GaussDB数据库,正好计划中有个任务,那么就给它做个扩展,希望大家之后使用时候可以做个参考。
设计
根据计划书 的内容,主要开发处理的事项如下:
1. PowerJob存储支持GaussDB
2. 把PowerJob组件集成到华为开发者组件框架示例servicecomb-fence 中做Demo演示
3. 使用华为CCE鲲鹏环境来部署验证
4. 输出博客和验收材料
基于计划书和华为云组件简易的画了个模型图帮助理解,最核心的处理点是PowerJob-server用GaussDB存储上面应用层的数据,然后把任务调度到resource-server示例上面执行,查看执行结果做验证。模型如下:
开发过程
1. 开发适配
引入OpenGaussDB的JDBC驱动,修改PowerJob-server-starter中的配置,具体改动如下:
<opengauss.version>5.1.0-og</opengauss.version>
<dependency>
<groupId>org.opengauss</groupId>
<artifactId>opengauss-jdbc</artifactId>
<version>${opengauss.version}</version>
</dependency>
添加针对OpenGauss数据方言的处理类
package tech.powerjob.server.persistence.config.dialect;
import org.hibernate.dialect.PostgreSQL10Dialect;
import org.hibernate.type.descriptor.sql.LongVarbinaryTypeDescriptor;
import org.hibernate.type.descriptor.sql.LongVarcharTypeDescriptor;
import org.hibernate.type.descriptor.sql.SqlTypeDescriptor;
import java.sql.Types;
/**
* 拷贝自AdpPostgreSQLDialect,用来处理OpenGauss相关数据类型发言
* 使用方自行通过配置文件激活:spring.datasource.remote.hibernate.properties.hibernate.dialect=tech.powerjob.server.persistence.config.dialect.AdpOpenGaussSQLDialect
*
* @since 2024/11/12
*/
public class AdpOpenGaussSQLDialect extends PostgreSQL10Dialect {
public AdpOpenGaussSQLDialect() {
super();
registerColumnType(Types.BLOB, "bytea");
registerColumnType(Types.CLOB, "text");
}
@Override
public SqlTypeDescriptor remapSqlTypeDescriptor(SqlTypeDescriptor sqlTypeDescriptor) {
switch (sqlTypeDescriptor.getSqlType()) {
case Types.CLOB:
return LongVarcharTypeDescriptor.INSTANCE;
case Types.BLOB:
return LongVarbinaryTypeDescriptor.INSTANCE;
case Types.NCLOB:
return LongVarbinaryTypeDescriptor.INSTANCE;
}
return super.remapSqlTypeDescriptor(sqlTypeDescriptor);
}
}
具体使用注释掉其它数据库配置,注释开本配置即可使用
####### Huawei GaussDB properties #######
#spring.datasource.remote.hibernate.properties.hibernate.dialect=tech.powerjob.server.persistence.config.dialect.AdpOpenGaussSQLDialect
#spring.datasource.core.driver-class-name=org.opengauss.Driver
#spring.datasource.core.jdbc-url=jdbc:opengauss://<IP>:<port>/powerjob_daily?currentSchema=public
#spring.datasource.core.username=<username>
#spring.datasource.core.password=<password>
#spring.datasource.core.maximum-pool-size=20
#spring.datasource.core.minimum-idle=5
#spring.jpa.database=postgresql
#spring.jpa.hibernate.ddl-auto=update
配置说明:PowerJob中使用了SpringDataJpa作为ORM框架,配置可以自动建表,GaussDB官网说最早GaussDB内核引擎基于PostgreSQL9.2开源版本演进,保留了PostgreSQL的标准接口和公共函数,那么可以基于postgresql做表生成。
2.集成Powerjob-worker
在servicecomb-fence的resource-server中引入依赖,因为PowerJob中日志框架和servicecomb-fence中的日志框架冲突做了排除。
<!-- PowerJob-starter -->
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>5.1.0</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
添加连接配置:
powerjob:
worker:
enabled: ${powerjob.worker.enable:false}
server-address: ${powerjob.worker.server-address:127.0.0.1:7700}
port: ${powerjob.worker-port:27777}
protocol: ${powerjob.worker-protocol:http}
app-name: ${powerjob.worker-namespace:test}
tag: ${powerjob.worker-tag:test}
添加一个Broadcast广播task示例:
package org.apache.servicecomb.fence.job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.TaskResult;
import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor;
import tech.powerjob.worker.log.OmsLogger;
import java.util.List;
/**
* PowerJob BroadcastProcessor demo
*/
@Component
public class PowerJobBroadcastDemo implements BroadcastProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(PowerJobBroadcastDemo.class);
@Override
public ProcessResult process(TaskContext taskContext) throws Exception {
OmsLogger omsLogger = taskContext.getOmsLogger();
Long jobId = taskContext.getJobId();
omsLogger.info("Job:{} 前置启动", jobId);
LOGGER.info("Job:{} 前置启动",jobId);
return new ProcessResult(true, "init success");
}
@Override
public ProcessResult preProcess(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
Long jobId = context.getJobId();
omsLogger.info("Job:{} 处理器处理中", jobId);
LOGGER.info("Job:{} 处理器处理中", jobId);
return new ProcessResult(true, "release resource success");
}
@Override
public ProcessResult postProcess(TaskContext context, List<TaskResult> taskResults) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
Long jobId = context.getJobId();
omsLogger.info("Job:{} 处理器处理完成", jobId);
LOGGER.info("Job:{} 处理器处理完成", jobId);
return new ProcessResult(true, "process success");
}
}
生成的表如下图:
环境准备
本地验证成功之后在华为云上搭建环境,使用鲲鹏相关的服务器做演示验证验收。
集群节点使用鲲鹏类型:
编译构建使用鲲鹏ARM服务器:
最终部署结果如下:
验证
1.Resource-server中Woker的注册实例
2. 新增应用和任务
3. 查询任务运行结果和日志
PowerJob-server端结果与日志如下:
PowerJob-woker日志如下:
4. 修改任务
5. 删除任务
结论
通过适配开发,根据上面设计中的模型图部署环境验证之后,证明PowerJob组件可以支持GaussDB底层数据存储读写,可以流畅运行在华为鲲鹏生态组件中。
收获
1. 更加了解了PowerJob的架构和功能
2. 掌握了华为云一整套微服务web构建过程
3. 完成了一点小贡献,收获了喜悦和快乐
- 点赞
- 收藏
- 关注作者
评论(0)