addax适配GaussDB开源开发任务
addax适配GaussDB开源开发任务
数据中台是一种企业级的数据管理架构,它旨在通过整合和优化企业的各种数据资源,提供一个统一的数据服务层,以支持业务的快速创新和发展。数据中台的核心能力包括数据采集、数据存储、数据处理、数据分析和数据服务等。它帮助企业在复杂多变的市场环境中实现数据驱动决策,提高运营效率,提升客户体验。
数据集成模块的重要性
在数据中台架构中,数据集成模块扮演着至关重要的角色。它负责将来自不同来源、结构各异的数据进行抽取(Extract)、转换(Transform)和加载(Load),即ETL过程,确保数据的一致性和完整性。具体来说:
数据抽取:从多个不同的数据源(如数据库、文件系统、实时消息队列等)中提取数据。
数据转换:对抽取的数据进行清洗、格式化、聚合等操作,使之符合目标系统的数据模型要求。
数据加载:将转换后的数据加载到目标数据仓库或数据湖中,供后续分析使用。
数据集成模块的有效性直接影响到整个数据中台的质量和服务水平。良好的数据集成可以确保数据的及时性、准确性和一致性,为上层应用提供坚实的数据基础。
ADDAX组件的作用
ADDMX(应为Adax,可能是拼写错误)是一个开源的数据同步工具,由阿里巴巴集团开发并维护。它特别适合用于大规模数据迁移和增量数据同步场景。Adax能够高效地处理多种异构数据源之间的数据传输,并且支持高并发、低延迟的数据同步特性。以下是Adax在数据集成中的几个关键优势:
广泛的连接器支持:Adadx内置了对多种常见数据源的支持,例如关系型数据库(MySQL, PostgreSQL)、NoSQL数据库(HBase, MongoDB)、大数据平台(Hive, Spark SQL)以及云存储服务(OSS, RDS)等。
灵活的任务配置:用户可以通过简单的配置文件定义复杂的ETL任务,而无需编写大量代码。这大大降低了开发成本和技术门槛。
高性能与可扩展性:Addax采用了分布式架构设计,可以根据实际需求动态调整计算资源,从而保证即使面对海量数据也能保持高效的处理速度。
容错机制:内置了重试、断点续传等功能,确保即使在网络波动或其他异常情况下也能顺利完成数据同步任务。
安全可靠:提供了数据加密传输、权限控制等措施,保障数据的安全性。
总之,在构建数据中台时,选择像Adax这样的高效数据集成工具是非常重要的。它可以简化数据集成流程,提高工作效率,同时确保数据质量和安全性,进而为企业提供更强大的数据服务能力。
1. Addax介绍
Addax是一个异构数据源离线同步工具,致力于实现包括关系型数据库(如MySQL、PostgreSQL、Oracle等)、HDFS、Hive、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
1.1.Addax的工作原理和主要功能
Addax通过读取源数据,经过处理后写入目标数据存储。其核心功能包括:
数据读取:支持多种数据源的读取,如MySQL、PostgreSQL等关系型数据库,以及HDFS、Hive、HBase等大数据存储系统,本文主要介绍对华为云gaussDB插件二次开发作相应改造。
数据处理:在读取数据后,Addax可以进行必要的数据清洗和转换操作(可以注入自己特有业务)。
数据写入:将处理后的数据写入目标存储系统,支持多种数据仓库和大数据平台。本文主要介绍对华为云gaussDB插件二次开发作相应改造。
1.2. Addax的使用场景和优势
Addax的主要使用场景包括数据迁移、数据备份、数据同步等。其优势包括:
高效稳定:Addax经过优化,能够在保证数据同步效率的同时,提供稳定的运行表现。
广泛支持:支持多种数据源和目标存储系统,满足多样化的数据同步需求。
易于使用:提供详细的配置文档和示例,方便用户快速上手和使用。
1.3. addax适配GaussDB开源数据源
现在互联网应用越来越复杂,每个公司都会有多种多样的数据库。通常是用最好的硬件来跑 OLTP,甚至还在 OLTP 中进行分库分表来足业务,这样对于一些分析,聚合,排序操作非常麻烦。这也有了异构数据库的数据同步需求,使用Addax 一样能够胜任完成,今天重点给大家:Addax 结合GaussDB云数据库实现数据库数据流转插件开发流程(其中带有GaussDB数据库,达梦数据库,teradata数据库增改造的插件)。
2.开发前的准备
2.1 准备阶段
开始之前,下载 WIKI[https://developer.huaweicloud.com/programs/opensource/contrib],根据README的步骤,本地编译和发布文档。阅读内容“Java示例项目-示例项目介绍”了解DEMO
2.2 克隆仓库
2.2.1 创建SSH公钥
在我使用时华为云开发者账号和gitcode账号不互通,需要重新注册账号。
完成登录后,点击右上角的个人头像进入个人设置界面。点击【安全设置】-【SSH公钥】添加当前计算机的SSH公钥以便后续免登录提交代码。如果当前系统还没生成过,请参考 如何生成SSH公钥
2.2.2 克隆项目
forkJava示例项目(https://gitcode.com/HuaweiCloudDeveloper/OpenSourceForHuaweiDemoJava.git)到自己的仓库。
git clone [你的gitcode地址]cd OpenSourceForHuaweiDemoJava
mvn clean install
2.2.3 安装Docker
对于Windows/MacOS等拥有可视化环境的系统通过下载Docker Desktop(https://www.docker.com/get-started/)
对于Centos/Debian或者其他只使用终端的系统,安装请参考Docker Engine安装(https://docs.docker.com/engine/install/)
2.2.4 安装Zookeeper(必须)
使用docker安装Zookeeper
# 创建Zookeeper目录mkdir /opt/zookeeper# 进入目录cd /opt/zookeeper#创建docker-compose.yml文件nano docker-compose.yml
docker-compose.yml文件如下:
version: '3'
services:
zoo1:
image: zookeeper
hostname: zoo1
ports:
- 2181:2181
额外配置请自行修改
# 启动opengauss
dockers compose up -d
使用docker ps检查服务运行状态
2.2.5 安装OpenGauss(可选)
OpenGauss可使用物理机安装或Docker安装。在本文中使用Docker安装OpenGauss。物理机安装请参考OpenGauss安装准备
使用docker安装OpenGauss
# 创建OpenGauss目录mkdir /opt/opengauss# 进入目录cd /opt/opengauss# 创建挂载目录mkdir db# 编写docker-compose.yml文件nano docker-compose.yml
docker-compose.yml文件内容如下:
version: "3"
services:
open_gauss:
image: enmotech/opengauss-lite:latest
environment:
GS_PASSWORD: Admin123##
#GS_USERNAME: gaussdb
GS_PORT: 5432
volumes:
- ./db:/var/lib/opengauss
ports:
- '8000:5432'
如果需要更改端口或者opengauss的账号和密码请自行修改。
# 启动opengauss
dockers compose up -d
使用docker ps检查服务运行状态
登录数据库:addaxgauss,创建两张作实验验证插件表,分别为WEBSITESS 和WEBSITESD,接着把对应的数据插入到对应的源表中。
创建表:WEBSITESS。
# 启动opengauss
dockers compose up -d
CREATE TABLE WEBSITESS (
ID INT ,
NAME VARCHAR(50000) COMMENT '站点名称',
URL VARCHAR(500) ,
ALEXA INT COMMENT 'ALEXA 排名',
COUNTRY VARCHAR(500) COMMENT '国家'
);
插入6条模拟数据:
INSERT INTO WEBSITESS VALUES ('1', 'GoogleGUASS', 'https://www.google.cm/', '1', 'USA'),
('2', '淘宝GUASS', 'https://www.taobao.com/', '13', 'CN'),
('3', '菜鸟教程GUASS', 'http://www.runoob.com', '5892', ''),
('4', '微博GUASS', 'http://weibo.com/', '20', 'CN'),
('5', 'FacebookGUASS', 'https://www.facebook.com/', '3', 'USA'),
('6', 'JueJinGUASS', 'https://www.juejin.cn/', '2213', 'CN');
创建目标表:WEBSITESD
CREATE TABLE WEBSITESD (
ID INT ,
NAME VARCHAR(50000) COMMENT '站点名称',
URL VARCHAR(500) ,
ALEXA INT COMMENT 'ALEXA 排名',
COUNTRY VARCHAR(500) COMMENT '国家'
);
2.2.6安装 Addax
开发环境准备
安装必要的工具和依赖:
确保你已经安装了Java开发环境(JDK),因为ADAX是基于Java的
# more info https://wgzhao.github.io/Addax/4.0.7/
# wget https://github.com/wgzhao/Addax/releases/download/4.0.7/addax-4.0.7.tar.gz;
# tar xvf addax-4.0.7.tar.gz;
3.开发GaussDB插件模块
3.1. 插件开发导入依赖包,正式进入插件开发阶段
获取ADAX源码:从GitHub或其他代码托管平台克隆ADAX项目的源代码。
根据官方文档配置本地开发环境。
安装Maven或Gradle等构建工具,以便管理项目依赖。设置POM文件(如果你使用的是Maven),添加必要的依赖项,包括GAUSSDB的JDBC驱动。
获取并设置好GAUSSDB的JDBC驱动。
<opengauss.jdbc.version>5.1.0-og</opengauss.jdbc.version>
增加兼容opengauss的连接类型,以及配置好对应JDBC驱动,如下图所示:
Addax是一个用于大规模数据同步和迁移的框架,它允许开发者通过编写插件来支持不同的数据源。要为特定的数据库如gaussDB开发Addax插件,你需要遵循一定的步骤。下面将详细介绍以gaussDB为例的Addax插件开发流程。
3.2. 插件开发,创建两个读写模块
架构原理图:
创建新插件模块:
在ADAX项目中创建一个新的两个模块作为你的GAUSSDB插件opengaussreader和opengausswriter如图。
实现读写器接口:
ADAX提供了Reader和Writer接口,你需要实现这些接口来适配GAUSSDB。
Reader负责从GAUSSDB读取数据,而Writer则负责将数据写入到GAUSSDB。
编写SQL查询逻辑:创建包名称:opengaussreader对于Reader,需要定义如何构造SQL查询语句来从GAUSSDB提取数据。
com.wgzhao.addax.plugin.reader.opengaussreader
类:com.wgzhao.addax.plugin.reader.opengaussreader.OpenGaussReader
对于Writer,定义插入、更新或删除操作的SQL语句。创建包名称:opengausswriter
com.wgzhao.addax.plugin.writer.opengausswriter
com.wgzhao.addax.plugin.writer.opengausswriter.OpenGaussWriter
修改核心的写模块:检查与初始化方法编写与优化。
@Override
public void preCheck()
{
this.init();
this.commonRdbmsWriterJob.writerPreCheck(this.originalConfig, DATABASE_TYPE);
}
@Override
public void init()
{
this.originalConfig = super.getPluginJobConf();
this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
this.commonRdbmsWriterJob.init(this.originalConfig);
}
- 完成写到GAUSSDB数据库后的核心的初始化以及写的方法
@Override
public void init()
{
this.writerSliceConfig = super.getPluginJobConf();
this.commonRdbmsWriterTask = new CommonRdbmsWriter.Task(DATABASE_TYPE) {
@Override
protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex,
int columnSqlType, Column column)
throws SQLException
{
if (column == null || column.getRawData() == null) {
preparedStatement.setObject(columnIndex, null);
return preparedStatement;
}
if (columnSqlType == Types.BIT) {
// BIT(1) -> java.lang.Boolean
if (column.getType() == Column.Type.BOOL) {
preparedStatement.setBoolean(columnIndex, column.asBoolean());
} else {
// BIT ( > 1) -> byte[]
preparedStatement.setObject(columnIndex, Integer.valueOf(column.asString(), 2));
}
return preparedStatement;
}
if (columnSqlType == Types.DATE && "YEAR".equals(this.resultSetMetaData.get(columnIndex).get("typeName"))) {
preparedStatement.setLong(columnIndex, column.asLong());
return preparedStatement;
}
return super.fillPreparedStatementColumnType(preparedStatement, columnIndex, columnSqlType, column);
}
};
this.commonRdbmsWriterTask.init(this.writerSliceConfig);
}
@Override
public void prepare()
{
this.commonRdbmsWriterTask.prepare(this.writerSliceConfig);
}
//TODO 改用连接池,确保每次获取的连接都是可用的(注意:连接可能需要每次都初始化其 session)
public void startWrite(RecordReceiver recordReceiver)
{
this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig,
super.getTaskPluginCollector());
}
@Override
public void post()
{
this.commonRdbmsWriterTask.post(this.writerSliceConfig);
}
@Override
public void destroy()
{
this.commonRdbmsWriterTask.destroy(this.writerSliceConfig);
}
@Override
public boolean supportFailOver()
{
String writeMode = writerSliceConfig.getString(Key.WRITE_MODE);
return "replace".equalsIgnoreCase(writeMode);
}
}
处理数据类型映射:
GAUSSDB中的数据类型可能与ADAX内部使用的数据类型不同,因此需要正确地进行类型转换和映射。
3.3. 设置打包模块:
3.3.1精简打包模块:
为了节省打包时间,只把需要验证的插件模块打包进来,修改总工程pom.xml文件(重点验证GAUSSDB,达梦,TERADATA,GBASE在本次不作验证)。
3.3.2配置部署需要应用的目标模块:
进行对应的打包注入,修改根目录下package.xml打包文件,操作里面插件是否打包成插件模块。
配置读写数据的JSON文件,以便最后进行ETL数据测试。如下面opengauss2mysql.json文件。在主工程下面的core/job目录下,增加对应JSON文件。内容参考如下内容:
测试和调试:
编写单元测试来验证你的插件是否能正确地从GAUSSDB读取和写入数据。
使用ADAX提供的工具对插件进行集成测试,确保其在实际环境中稳定运行。
部署,打包插件:使用Maven将你的插件打包成一个jar文件。
部署到ADAX环境:将生成的jar文件复制到ADAX系统的plugin目录下。
更新ADAX配置文件以包含新的GAUSSDB插件信息。在对应主工程core/job下面创建一个JSON文件:gauss2gauss.json文件,生成对应的ETL操作脚本。提供通过开发的Gauss读写的插件方式,同步数据到目标表。
创建一个ADAX任务配置,指定使用新开发的GAUSSDB插件。
启动ADAX任务,监控其执行情况,并根据需要调整参数。
参考脚本JSON配置信息如下:如通用的JDBC连接的信息规则一致。
{
"job": {
"content": [
{
"writer": {
"name": "opengausswriter",
"parameter": {
"username": "root",
"password": "Z******",
"column": [
"id","name","url","alexa","country"
],
"connection": [
{
"table": [ "WEBSITESD"],
"jdbcUrl": "jdbc:opengauss://119.*.*.*:8000/postgres?currentSchema=public"
}
],
"preSql": ["truncate table websitesd"] }},
"reader": {
"name": "opengaussreader",
"parameter": {
"column": [
"id","name","url","alexa","country"
],
"username": "root",
"password": "Z*****",
"connection": [
{
"table": ["WEBSITESS" ],
"jdbcUrl": [
"jdbc:opengauss://119.*.*.*:8000/postgres?currentSchema=pu****" ] } ]}}}],
"setting": {
"speed": {
"record": -1,
"byte": -1,
"channel": 1
}
}
}
}
运行任务:
总结:源代码编译安装
你可以选择从源代码编译安装,基本操作如下:
git clone https://github.com/wgzhao/addax.git
cd addax
mvn clean package
mvn package assembly:single
cd target/addax/addax-<version>
cd D:\Addax-4.0.7\target\addax\addax-4.0.7
D:\Addax-4.0.7\target\addax\addax-4.0.7>mvn clean package
D:\Addax-4.0.7\target\addax\addax-4.0.7\job
python ./bin/addax.py ./job/mysql2mysql.json
python ./bin/addax.py ./job/gauss2gauss.json
com.wgzhao.addax.plugin.writer.opengausswriter.OpenGaussWriter
com.wgzhao.addax.plugin.reader.opengaussreader.OpenGaussReader
4.测试验证
4.1.1 将开发好的插件工程源代码进行编译,打包,转化成可执行的工具。通过脚本执行程序。
cd addax
mvn clean package
mvn package assembly:single
打包: mvn package assembly:single
进入程序目录:
cd D:\Addax-4.0.7\target\addax\addax-4.0.7
执行程序,测试程序读写GaussDB功能是否正确,数据是否达到预期:插件程序 顺利从源表读取数据,经过对应业务处理后,写到目标表:
python ./bin/addax.py ./job/gauss2gauss.json
查看观察结果
十二月 23, 2024 10:16:49 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl
信息: Connect complete. ID: af8db054-fbd9-4f1f-8a2b-485bd3649a8b
十二月 23, 2024 10:16:50 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl
信息: [a030faee-0312-4f6a-bc16-46d85fe71c69] Try to connect. IP: 119.3.227.120:8000
十二月 23, 2024 10:16:50 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl
信息: [7.249.18.11:52772/119.3.227.120:8000] Connection is established. ID: a030faee-0312-4f6a-bc16-46d85fe71c69
十二月 23, 2024 10:16:50 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl
信息: Connect complete. ID: a030faee-0312-4f6a-bc16-46d85fe71c69
十二月 23, 2024 10:16:50 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl
信息: [84c74db4-e3d2-4528-9c1d-804848feb1be] Try to connect. IP: 119.3.227.120:8000
十二月 23, 2024 10:16:50 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl
信息: [7.249.18.11:52773/119.3.227.120:8000] Connection is established. ID: 84c74db4-e3d2-4528-9c1d-804848feb1be
十二月 23, 2024 10:16:50 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl
信息: Connect complete. ID: 84c74db4-e3d2-4528-9c1d-804848feb1be
十二月 23, 2024 10:16:51 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl
信息: [791d4692-8ea9-481a-9688-1f8055b1e031] Try to connect. IP: 119.3.227.120:8000
[36m2024-12-23 10:16:51.150[0;39m [32m[0-0-0-reader][0;39m [34mINFO [0;39m [35mCommonRdbmsReader$Task[0;39m - Finished read record by Sql: [select id,name,url,alexa,country from WEBSITESS
] jdbcUrl:[jdbc:opengauss://119.3.227.120:8000/addaxgauss?currentSchema=public].
十二月 23, 2024 10:16:51 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl
信息: [7.249.18.11:52775/119.3.227.120:8000] Connection is established. ID: 791d4692-8ea9-481a-9688-1f8055b1e031
十二月 23, 2024 10:16:51 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl
信息: Connect complete. ID: 791d4692-8ea9-481a-9688-1f8055b1e031
十二月 23, 2024 10:16:51 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl
信息: [87037b20-3fc8-4c91-ac1e-bdd59640ae96] Try to connect. IP: 119.3.227.120:8000
十二月 23, 2024 10:16:51 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl
信息: [7.249.18.11:52777/119.3.227.120:8000] Connection is established. ID: 87037b20-3fc8-4c91-ac1e-bdd59640ae96
十二月 23, 2024 10:16:51 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl
信息: Connect complete. ID: 87037b20-3fc8-4c91-ac1e-bdd59640ae96
[36m2024-12-23 10:16:54.974[0;39m [32m[ job-0][0;39m [34mINFO [0;39m [35mAbstractScheduler [0;39m - Scheduler accomplished all tasks.
[36m2024-12-23 10:16:54.975[0;39m [32m[ job-0][0;39m [34mINFO [0;39m [35mJobContainer [0;39m - Addax Writer.Job [opengausswriter] do post work.
[36m2024-12-23 10:16:54.976[0;39m [32m[ job-0][0;39m [34mINFO [0;39m [35mJobContainer [0;39m - Addax Reader.Job [opengaussreader] do post work.
[36m2024-12-23 10:16:54.976[0;39m [32m[ job-0][0;39m [34mINFO [0;39m [35mJobContainer [0;39m - PerfTrace not enable!
[36m2024-12-23 10:16:54.977[0;39m [32m[ job-0][0;39m [34mINFO [0;39m [35mStandAloneJobContainerCommunicator[0;39m - Total 6 records, 199 bytes | Speed 33B/s, 1 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
[36m2024-12-23 10:16:54.978[0;39m [32m[ job-0][0;39m [34mINFO [0;39m [35mJobContainer [0;39m -
任务启动时刻 : 2024-12-23 10:16:37
任务结束时刻 : 2024-12-23 10:16:54
任务总计耗时 : 17s
任务平均流量 : 33B/s
记录写入速度 : 1rec/s
读出记录总数 : 6
读写失败总数 : 0
登录华为云终端,查看结果是否已经到达目标表:
注意事项
性能优化:考虑大数据量时的数据传输效率,适当调整批量大小、并发数等参数。
错误处理:确保有良好的错误处理机制,可以捕获异常并做出适当的响应。
安全性:保护敏感信息,如数据库连接字符串、用户名和密码等,不要硬编码在代码中。
以上就是针对GAUSSDB的ADAX插件开发的基本流程。请注意,具体实现细节可能会根据ADAX版本的不同而有所变化,所以务必参考最新的官方文档。
- 点赞
- 收藏
- 关注作者
评论(0)