addax适配GaussDB开源开发任务

举报
白云飞5131421 发表于 2024/12/23 14:52:28 2024/12/23
【摘要】 Addax是一款支持多种主流数据库和数据源的通用数据采集工具。它采用先进的框架设计,具备高效的数据同步功能。本文将深入解析Addax的工作原理和特点,并探讨如何在实际应用中发挥其优势。在数据驱动的时代,数据采集是至关重要的一环。Addax作为一款功能强大的数据采集工具,备受关注。它支持多种主流数据库和数据源,包括Cassandra、ClickHouse、DBF、Hive、InfluxDB

addax适配GaussDB开源开发任务

数据中台是一种企业级的数据管理架构,它旨在通过整合和优化企业的各种数据资源,提供一个统一的数据服务层,以支持业务的快速创新和发展。数据中台的核心能力包括数据采集、数据存储、数据处理、数据分析和数据服务等。它帮助企业在复杂多变的市场环境中实现数据驱动决策,提高运营效率,提升客户体验。

数据集成模块的重要性

在数据中台架构中,数据集成模块扮演着至关重要的角色。它负责将来自不同来源、结构各异的数据进行抽取(Extract)、转换(Transform)和加载(Load),即ETL过程,确保数据的一致性和完整性。具体来说:

数据抽取从多个不同的数据源(如数据库、文件系统、实时消息队列等)中提取数据。

数据转换对抽取的数据进行清洗、格式化、聚合等操作,使之符合目标系统的数据模型要求。

数据加载将转换后的数据加载到目标数据仓库或数据湖中,供后续分析使用。

数据集成模块的有效性直接影响到整个数据中台的质量和服务水平。良好的数据集成可以确保数据的及时性、准确性和一致性,为上层应用提供坚实的数据基础。

ADDAX组件的作用

ADDMX(应为Adax,可能是拼写错误)是一个开源的数据同步工具,由阿里巴巴集团开发并维护。它特别适合用于大规模数据迁移和增量数据同步场景。Adax能够高效地处理多种异构数据源之间的数据传输,并且支持高并发、低延迟的数据同步特性。以下是Adax在数据集成中的几个关键优势:

广泛的连接器支持Adadx内置了对多种常见数据源的支持,例如关系型数据库(MySQL, PostgreSQL)、NoSQL数据库(HBase, MongoDB)、大数据平台(Hive, Spark SQL)以及云存储服务(OSS, RDS)等。

灵活的任务配置:用户可以通过简单的配置文件定义复杂的ETL任务,而无需编写大量代码。这大大降低了开发成本和技术门槛。

高性能与可扩展性:Addax采用了分布式架构设计,可以根据实际需求动态调整计算资源,从而保证即使面对海量数据也能保持高效的处理速度。

容错机制:内置了重试、断点续传等功能,确保即使在网络波动或其他异常情况下也能顺利完成数据同步任务。

安全可靠:提供了数据加密传输、权限控制等措施,保障数据的安全性。

总之,在构建数据中台时,选择像Adax这样的高效数据集成工具是非常重要的。它可以简化数据集成流程,提高工作效率,同时确保数据质量和安全性,进而为企业提供更强大的数据服务能力。

1. Addax介绍

Addax是一个异构数据源离线同步工具,致力于实现包括关系型数据库(如MySQLPostgreSQLOracle等)、HDFSHiveHBaseFTP等各种异构数据源之间稳定高效的数据同步功能‌‌。

1.1.Addax的工作原理和主要功能

Addax通过读取源数据,经过处理后写入目标数据存储。其核心功能包括:

‌数据读取‌:支持多种数据源的读取,如MySQL、PostgreSQL等关系型数据库,以及HDFS、Hive、HBase等大数据存储系统本文主要介绍对华为云gaussDB插件二次开发作相应改造

‌数据处理‌:在读取数据后,Addax可以进行必要的数据清洗和转换操作可以注入自己特有业务

‌数据写入‌:将处理后的数据写入目标存储系统,支持多种数据仓库和大数据平台。本文主要介绍对华为云gaussDB插件二次开发作相应改造

1.2. Addax的使用场景和优势

Addax的主要使用场景包括数据迁移、数据备份、数据同步等。其优势包括:

‌高效稳定‌:Addax经过优化,能够在保证数据同步效率的同时,提供稳定的运行表现。

‌广泛支持‌:支持多种数据源和目标存储系统,满足多样化的数据同步需求。

‌易于使用‌:提供详细的配置文档和示例,方便用户快速上手和使用。

1.3. addax适配GaussDB开源数据源

现在互联网应用越来越复杂,每个公司都会有多种多样的数据库。通常是用最好的硬件来跑 OLTP,甚至还在 OLTP 中进行分库分表来足业务,这样对于一些分析,聚合,排序操作非常麻烦。这也有了异构数据库的数据同步需求使用Addax 一样能够胜任完成,今天重点给大家:Addax 结合GaussDB云数据库实现数据库数据流转插件开发流程(其中带有GaussDB数据库,达梦数据库,teradata数据库增改造的插件)

2.开发前的准备

2.1 准备阶段

开始之前,下载 WIKI[https://developer.huaweicloud.com/programs/opensource/contrib],根据README的步骤,本地编译和发布文档。阅读内容“Java示例项目-示例项目介绍”了解DEMO

2.2 克隆仓库

2.2.1 创建SSH公钥

在我使用时华为云开发者账号和gitcode账号不互通,需要重新注册账号。

完成登录后,点击右上角的个人头像进入个人设置界面。点击【安全设置】-【SSH公钥】添加当前计算机的SSH公钥以便后续免登录提交代码。如果当前系统还没生成过,请参考 如何生成SSH公钥

2.2.2 克隆项目

forkJava示例项目(https://gitcode.com/HuaweiCloudDeveloper/OpenSourceForHuaweiDemoJava.git)到自己的仓库。

git clone [你的gitcode地址]cd OpenSourceForHuaweiDemoJava

mvn clean install

2.2.3 安装Docker

对于Windows/MacOS等拥有可视化环境的系统通过下载Docker Desktop(https://www.docker.com/get-started/)
对于Centos/Debian或者其他只使用终端的系统,安装请参考Docker Engine安装(https://docs.docker.com/engine/install/)

2.2.4 安装Zookeeper(必须)

使用docker安装Zookeeper

# 创建Zookeeper目录mkdir /opt/zookeeper# 进入目录cd /opt/zookeeper#创建docker-compose.yml文件nano docker-compose.yml

docker-compose.yml文件如下:

 

version: '3'

 

services:

  zoo1:

    image: zookeeper

    hostname: zoo1

    ports:

      - 2181:2181

额外配置请自行修改

# 启动opengauss

dockers compose up -d

使用docker ps检查服务运行状态

2.2.5 安装OpenGauss(可选)

OpenGauss可使用物理机安装或Docker安装。在本文中使用Docker安装OpenGauss。物理机安装请参考OpenGauss安装准备

使用docker安装OpenGauss

# 创建OpenGauss目录mkdir /opt/opengauss# 进入目录cd /opt/opengauss# 创建挂载目录mkdir db# 编写docker-compose.yml文件nano docker-compose.yml

 

docker-compose.yml文件内容如下:

version: "3"

services:

  open_gauss:

    image: enmotech/opengauss-lite:latest

    environment:

      GS_PASSWORD: Admin123##

      #GS_USERNAME: gaussdb

      GS_PORT: 5432

    volumes:

      - ./db:/var/lib/opengauss

    ports:

      - '8000:5432'

如果需要更改端口或者opengauss的账号和密码请自行修改。

# 启动opengauss

dockers compose up -d

使用docker ps检查服务运行状态

登录数据库:addaxgauss,创建两张作实验验证插件表,分别为WEBSITESS WEBSITESD,接着把对应的数据插入到对应的源表中。  

创建表:WEBSITESS

# 启动opengauss

dockers compose up -d

 

CREATE TABLE WEBSITESS (

  ID INT ,

  NAME VARCHAR(50000) COMMENT '站点名称',

  URL VARCHAR(500) ,

  ALEXA INT COMMENT 'ALEXA 排名',

  COUNTRY VARCHAR(500) COMMENT '国家'

);

 

插入6条模拟数据:

INSERT INTO WEBSITESS VALUES ('1', 'GoogleGUASS', 'https://www.google.cm/', '1', 'USA'),

('2', '淘宝GUASS', 'https://www.taobao.com/', '13', 'CN'),

('3', '菜鸟教程GUASS', 'http://www.runoob.com', '5892', ''),

('4', '微博GUASS', 'http://weibo.com/', '20', 'CN'),

('5', 'FacebookGUASS', 'https://www.facebook.com/', '3', 'USA'),

('6', 'JueJinGUASS', 'https://www.juejin.cn/', '2213', 'CN');

创建目标表:WEBSITESD

CREATE TABLE WEBSITESD (

  ID INT ,

  NAME VARCHAR(50000) COMMENT '站点名称',

  URL VARCHAR(500) ,

  ALEXA INT COMMENT 'ALEXA 排名',

  COUNTRY VARCHAR(500) COMMENT '国家'

);

2.2.6安装 Addax

开发环境准备

安装必要的工具和依赖:

确保你已经安装了Java开发环境(JDK),因为ADAX是基于Java的

# more info https://wgzhao.github.io/Addax/4.0.7/

# wget https://github.com/wgzhao/Addax/releases/download/4.0.7/addax-4.0.7.tar.gz;

# tar xvf addax-4.0.7.tar.gz;

3.开发GaussDB插件模块

3.1. 插件开发导入依赖包,正式进入插件开发阶段

获取ADAX源码:从GitHub或其他代码托管平台克隆ADAX项目的源代码。

根据官方文档配置本地开发环境。

安装Maven或Gradle等构建工具,以便管理项目依赖。设置POM文件(如果你使用的是Maven),添加必要的依赖项,包括GAUSSDB的JDBC驱动。

获取并设置好GAUSSDB的JDBC驱动。

<opengauss.jdbc.version>5.1.0-og</opengauss.jdbc.version>

 

增加兼容opengauss的连接类型,以及配置好对应JDBC驱动,如下图所示:

 

Addax是一个用于大规模数据同步和迁移的框架,它允许开发者通过编写插件来支持不同的数据源。要为特定的数据库如gaussDB开发Addax插件,你需要遵循一定的步骤。下面将详细介绍以gaussDB为例的Addax插件开发流程。

 

 

3.2. 插件开发创建两个读写模块

架构原理图:

 

创建新插件模块:

ADAX项目中创建一个新的两个模块作为你的GAUSSDB插件opengaussreader和opengausswriter如图

实现读写器接口:

ADAX提供了Reader和Writer接口,你需要实现这些接口来适配GAUSSDB。

Reader负责从GAUSSDB读取数据,而Writer则负责将数据写入到GAUSSDB。

编写SQL查询逻辑:创建包名称:opengaussreader对于Reader,需要定义如何构造SQL查询语句来从GAUSSDB提取数据。

com.wgzhao.addax.plugin.reader.opengaussreader

类:com.wgzhao.addax.plugin.reader.opengaussreader.OpenGaussReader

对于Writer,定义插入、更新或删除操作的SQL语句。创建包名称:opengausswriter

com.wgzhao.addax.plugin.writer.opengausswriter

com.wgzhao.addax.plugin.writer.opengausswriter.OpenGaussWriter

 

修改核心的写模块:检查与初始化方法编写与优化。

@Override

public void preCheck()

{

this.init();

this.commonRdbmsWriterJob.writerPreCheck(this.originalConfig, DATABASE_TYPE);

}

@Override

public void init()

{

this.originalConfig = super.getPluginJobConf();

this.commonRdbmsWriterJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);

this.commonRdbmsWriterJob.init(this.originalConfig);

}

  • 完成写到GAUSSDB数据库后的核心的初始化以及写的方法

@Override

public void init()

{

this.writerSliceConfig = super.getPluginJobConf();

this.commonRdbmsWriterTask = new CommonRdbmsWriter.Task(DATABASE_TYPE) {

 

@Override

protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex,

int columnSqlType, Column column)

throws SQLException

{

if (column == null || column.getRawData() == null) {

preparedStatement.setObject(columnIndex, null);

return preparedStatement;

}

if (columnSqlType == Types.BIT) {

// BIT(1) -> java.lang.Boolean

if (column.getType() == Column.Type.BOOL) {

preparedStatement.setBoolean(columnIndex, column.asBoolean());

} else {

// BIT ( > 1) -> byte[]

preparedStatement.setObject(columnIndex, Integer.valueOf(column.asString(), 2));

}

return preparedStatement;

}

if (columnSqlType == Types.DATE && "YEAR".equals(this.resultSetMetaData.get(columnIndex).get("typeName"))) {

preparedStatement.setLong(columnIndex, column.asLong());

return preparedStatement;

}

return super.fillPreparedStatementColumnType(preparedStatement, columnIndex, columnSqlType, column);

}

};

this.commonRdbmsWriterTask.init(this.writerSliceConfig);

}

 

@Override

public void prepare()

{

this.commonRdbmsWriterTask.prepare(this.writerSliceConfig);

}

 

//TODO 改用连接池,确保每次获取的连接都是可用的(注意:连接可能需要每次都初始化其 session)

public void startWrite(RecordReceiver recordReceiver)

{

this.commonRdbmsWriterTask.startWrite(recordReceiver, this.writerSliceConfig,

super.getTaskPluginCollector());

}

 

@Override

public void post()

{

this.commonRdbmsWriterTask.post(this.writerSliceConfig);

}

@Override

public void destroy()

{

this.commonRdbmsWriterTask.destroy(this.writerSliceConfig);

}

@Override

public boolean supportFailOver()

{

String writeMode = writerSliceConfig.getString(Key.WRITE_MODE);

return "replace".equalsIgnoreCase(writeMode);

}

}

 

 

处理数据类型映射:

GAUSSDB中的数据类型可能与ADAX内部使用的数据类型不同,因此需要正确地进行类型转换和映射。

 

 

3.3. 设置打包模块:

3.3.1精简打包模块:

为了节省打包时间,只把需要验证的插件模块打包进来,修改总工程pom.xml文件(重点验证GAUSSDB,达梦,TERADATA,GBASE在本次不作验证)。

 

3.3.2配置部署需要应用的目标模块:

进行对应的打包注入,修改根目录下package.xml打包文件,操作里面插件是否打包成插件模块。

 

 

配置读写数据的JSON文件,以便最后进行ETL数据测试。如下面opengauss2mysql.json文件。在主工程下面的core/job目录下,增加对应JSON文件。内容参考如下内容:


测试和调试:

编写单元测试来验证你的插件是否能正确地从GAUSSDB读取和写入数据。

使用ADAX提供的工具对插件进行集成测试,确保其在实际环境中稳定运行。

部署打包插件:使用Maven将你的插件打包成一个jar文件。

部署到ADAX环境:将生成的jar文件复制到ADAX系统的plugin目录下。

更新ADAX配置文件以包含新的GAUSSDB插件信息。在对应主工程core/job下面创建一个JSON文件:gauss2gauss.json文件,生成对应的ETL操作脚本。提供通过开发的Gauss读写的插件方式,同步数据到目标表。

创建一个ADAX任务配置,指定使用新开发的GAUSSDB插件。

启动ADAX任务,监控其执行情况,并根据需要调整参数。

参考脚本JSON配置信息如下:如通用的JDBC连接的信息规则一致。 

 

{

  "job": {

    "content": [

      {

        "writer": {

          "name": "opengausswriter",

          "parameter": {

            "username": "root",

            "password": "Z******",

            "column": [

              "id","name","url","alexa","country"

            ],

            "connection": [

              {

                "table": [ "WEBSITESD"],

                "jdbcUrl": "jdbc:opengauss://119.*.*.*:8000/postgres?currentSchema=public"

              }

            ],

            "preSql": ["truncate table websitesd"] }},

        "reader": {

          "name": "opengaussreader",

          "parameter": {

            "column": [

              "id","name","url","alexa","country"

            ],

            "username": "root",

            "password": "Z*****",

            "connection": [

              {

                "table": ["WEBSITESS" ],

                "jdbcUrl": [

                  "jdbc:opengauss://119.*.*.*:8000/postgres?currentSchema=pu****"                ] } ]}}}],

    "setting": {

      "speed": {

        "record": -1,

        "byte": -1,

        "channel": 1

      }

    }

  }

}

运行任务:

总结:源代码编译安装

你可以选择从源代码编译安装,基本操作如下:

git clone https://github.com/wgzhao/addax.git

cd addax

mvn clean package

mvn package assembly:single

cd target/addax/addax-<version>

cd D:\Addax-4.0.7\target\addax\addax-4.0.7

D:\Addax-4.0.7\target\addax\addax-4.0.7>mvn clean package

D:\Addax-4.0.7\target\addax\addax-4.0.7\job

python ./bin/addax.py ./job/mysql2mysql.json

python ./bin/addax.py ./job/gauss2gauss.json

com.wgzhao.addax.plugin.writer.opengausswriter.OpenGaussWriter

 

com.wgzhao.addax.plugin.reader.opengaussreader.OpenGaussReader

 

4.测试验证

4.1.1 将开发好的插件工程源代码进行编译,打包,转化成可执行的工具。通过脚本执行程序。

cd addax

mvn clean package

mvn package assembly:single

打包: mvn package assembly:single

进入程序目录:

cd D:\Addax-4.0.7\target\addax\addax-4.0.7

执行程序,测试程序读写GaussDB功能是否正确,数据是否达到预期:插件程序 顺利从源表读取数据,经过对应业务处理后,写到目标表:

python ./bin/addax.py ./job/gauss2gauss.json

查看观察结果

 

十二月 23, 2024 10:16:49 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl

信息: Connect complete. ID: af8db054-fbd9-4f1f-8a2b-485bd3649a8b

十二月 23, 2024 10:16:50 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl

信息: [a030faee-0312-4f6a-bc16-46d85fe71c69] Try to connect. IP: 119.3.227.120:8000

十二月 23, 2024 10:16:50 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl

信息: [7.249.18.11:52772/119.3.227.120:8000] Connection is established. ID: a030faee-0312-4f6a-bc16-46d85fe71c69

十二月 23, 2024 10:16:50 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl

信息: Connect complete. ID: a030faee-0312-4f6a-bc16-46d85fe71c69

十二月 23, 2024 10:16:50 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl

信息: [84c74db4-e3d2-4528-9c1d-804848feb1be] Try to connect. IP: 119.3.227.120:8000

十二月 23, 2024 10:16:50 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl

信息: [7.249.18.11:52773/119.3.227.120:8000] Connection is established. ID: 84c74db4-e3d2-4528-9c1d-804848feb1be

十二月 23, 2024 10:16:50 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl

信息: Connect complete. ID: 84c74db4-e3d2-4528-9c1d-804848feb1be

十二月 23, 2024 10:16:51 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl

信息: [791d4692-8ea9-481a-9688-1f8055b1e031] Try to connect. IP: 119.3.227.120:8000

2024-12-23 10:16:51.150 [0-0-0-reader] INFO  CommonRdbmsReader$Task - Finished read record by Sql: [select id,name,url,alexa,country from WEBSITESS

] jdbcUrl:[jdbc:opengauss://119.3.227.120:8000/addaxgauss?currentSchema=public].

十二月 23, 2024 10:16:51 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl

信息: [7.249.18.11:52775/119.3.227.120:8000] Connection is established. ID: 791d4692-8ea9-481a-9688-1f8055b1e031

十二月 23, 2024 10:16:51 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl

信息: Connect complete. ID: 791d4692-8ea9-481a-9688-1f8055b1e031

十二月 23, 2024 10:16:51 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl

信息: [87037b20-3fc8-4c91-ac1e-bdd59640ae96] Try to connect. IP: 119.3.227.120:8000

十二月 23, 2024 10:16:51 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl

信息: [7.249.18.11:52777/119.3.227.120:8000] Connection is established. ID: 87037b20-3fc8-4c91-ac1e-bdd59640ae96

十二月 23, 2024 10:16:51 上午 org.opengauss.core.v3.ConnectionFactoryImpl openConnectionImpl

信息: Connect complete. ID: 87037b20-3fc8-4c91-ac1e-bdd59640ae96

2024-12-23 10:16:54.974 [       job-0] INFO  AbstractScheduler    - Scheduler accomplished all tasks.

2024-12-23 10:16:54.975 [       job-0] INFO  JobContainer         - Addax Writer.Job [opengausswriter] do post work.

2024-12-23 10:16:54.976 [       job-0] INFO  JobContainer         - Addax Reader.Job [opengaussreader] do post work.

2024-12-23 10:16:54.976 [       job-0] INFO  JobContainer         - PerfTrace not enable!

2024-12-23 10:16:54.977 [       job-0] INFO  StandAloneJobContainerCommunicator - Total 6 records, 199 bytes | Speed 33B/s, 1 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%

2024-12-23 10:16:54.978 [       job-0] INFO  JobContainer         -

任务启动时刻                    : 2024-12-23 10:16:37

任务结束时刻                    : 2024-12-23 10:16:54

任务总计耗时                    :                 17s

任务平均流量                    :               33B/s

记录写入速度                    :              1rec/s

读出记录总数                    :                   6

读写失败总数                    :                   0

 

 

登录华为云终端,查看结果是否已经到达目标表:

 

 

 

注意事项

性能优化:考虑大数据量时的数据传输效率,适当调整批量大小、并发数等参数。

错误处理:确保有良好的错误处理机制,可以捕获异常并做出适当的响应。

安全性:保护敏感信息,如数据库连接字符串、用户名和密码等,不要硬编码在代码中。

以上就是针对GAUSSDB的ADAX插件开发的基本流程。请注意,具体实现细节可能会根据ADAX版本的不同而有所变化,所以务必参考最新的官方文档。

 

 

 

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。