Paimon Flink cdc GaussDB开源开发
【摘要】 Paimon+Flink+GaussDB集成开发
Paimon概述
Apache Paimon 原名 Flink Table Store,是阿里云开源大数据团队孵化出来的开源项目。2022年1月在 Apache Flink 社区从零开始研发,Flink 社区希望能够将 Flink 的 Streaming 实时计算能力和 Lakehouse 新架构优势进一步结合,促进数据在数据湖上真正实时流动起来,并为用户提供实时离线一体化的开发体验。
Flink概述
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此
flink-sql-connector-gaussdb-cdc.jar生成
项目地址:https://gitcode.com/weiwenman/GaussDB-For-Apache-Flink/overview
GaussDB驱动请到官网下载最新驱动
安装GaussDB驱动jar包到本地maven仓
mvn install:install-file -Dfile=gaussdbjdbc.jar \
-DgroupId=com.huawei.gaussdb \
-DartifactId=gaussdbjdbc \
-Dversion=5.0.0-htrunk4.csi.gaussdb_kernel.opengaussjdbc.r2 \
-Dpackaging=jar
- 编译构建jar
wmwei@DESKTOP-7490: export MAVEN_OPTS='--add-exports=java.base/jdk.internal.module=ALL-UNNAMED --add-exports jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED'
wmwei@DESKTOP-7490: cd flink-cdc-connect/flink-cdc-source-connectors/flink-connector-gaussdb-cdc
wmwei@DESKTOP-7490: mvn spotless:apply
...
wmwei@DESKTOP-7490: mvn install -DskipTests
...
wmwei@DESKTOP-7490: cd ../flink-sql-connector-opengauss-cdc/
wmwei@DESKTOP-7490: mvn clean package
- 得到flink-sql-connector-gaussdb-cdc-3.2.0.jar
wmwei@DESKTOP-7490: ls flink-sql-connector-opengauss-cdc/target/flink-sql-connector-opengauss-cdc-3.2.0.jar
Flink集成Paimon步骤
- paimon下载地址:https://repository.apache.org/content/groups/snapshots/org/apache/paimon/paimon-flink-1.20/1.0-SNAPSHOT/paimon-flink-1.20-1.0-20241110.002724-53.jar
- flink下载地址:https://www.apache.org/dyn/closer.lua/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz
- flink安装过程
- 解压flink-1.20.0-bin-scala_2.12.tgz
tar -xzf flink-1.20.0-bin-scala_2.12.tgz
- 复制paimon jar包和gaussdb cdc jar包到flink-1.20.0/lib目录下
wmwei@DESKTOP-7490:~/flink-1.20.0$ pwd
/home/wmwei/flink-1.20.0
wmwei@DESKTOP-7490:~/flink-1.20.0$ ls lib/
flink-cep-1.20.0.jar flink-table-api-java-uber-1.20.0.jar
flink-connector-files-1.20.0.jar flink-table-planner-loader-1.20.0.jar
flink-csv-1.20.0.jar flink-table-runtime-1.20.0.jar
flink-dist-1.20.0.jar log4j-1.2-api-2.17.1.jar
flink-json-1.20.0.jar log4j-api-2.17.1.jar
flink-scala_2.12-1.20.0.jar log4j-core-2.17.1.jar
flink-sql-connector-gaussdb-cdc-3.2.0.jar log4j-slf4j-impl-2.17.1.jar
paimon-flink-1.20-1.0-20241108.002507-51.jar
- 启动flink
wmwei@DESKTOP-7490:~/flink-1.20.0$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host DESKTOP-7490.
Starting taskexecutor daemon on host DESKTOP-7490.
- 创建GaussDB CDC Table
wmwei@DESKTOP-7490:~/flink-1.20.0$ ./bin/sql-client.sh
Flink SQL> CREATE TABLE shipments1 (
> shipment_id INT,
> order_id INT,
> origin STRING,
> destination STRING,
> is_arrived BOOLEAN
> ) WITH (
> 'connector' = 'gaussdb-cdc',
> 'hostname' = '127.0.01',
> 'port' = '8001',
> 'username' = 'postgres',
> 'password' = 'Postgres@123',
> 'database-name' = 'test',
> 'schema-name' = 'public',
> 'table-name' = 'actor',
> 'slot.name' = 'flink_0',
> 'decoding.plugin.name' = 'pgoutput',
> 'scan.incremental.snapshot.enabled' = 'true'
> );
[INFO] Execute statement succeeded.
Flink SQL> select * from shipments1;
+----+-------------+-------------+--------------------------------+--------------------------------+------------+
| op | shipment_id | order_id | origin | destination | is_arrived |
+----+-------------+-------------+--------------------------------+--------------------------------+------------+
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)