Paimon Flink cdc GaussDB开源开发

举报
韦文满 发表于 2024/11/18 16:31:29 2024/11/18
【摘要】 Paimon+Flink+GaussDB集成开发

Paimon概述

Apache Paimon 原名 Flink Table Store,是阿里云开源大数据团队孵化出来的开源项目。2022年1月在 Apache Flink 社区从零开始研发,Flink 社区希望能够将 Flink 的 Streaming 实时计算能力和 Lakehouse 新架构优势进一步结合,促进数据在数据湖上真正实时流动起来,并为用户提供实时离线一体化的开发体验。

Flink概述

Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此

flink-sql-connector-gaussdb-cdc.jar生成

项目地址:https://gitcode.com/weiwenman/GaussDB-For-Apache-Flink/overview
GaussDB驱动请到官网下载最新驱动
安装GaussDB驱动jar包到本地maven仓

mvn install:install-file -Dfile=gaussdbjdbc.jar \
                         -DgroupId=com.huawei.gaussdb \
                         -DartifactId=gaussdbjdbc \
                         -Dversion=5.0.0-htrunk4.csi.gaussdb_kernel.opengaussjdbc.r2 \
                         -Dpackaging=jar
  1. 编译构建jar
wmwei@DESKTOP-7490: export MAVEN_OPTS='--add-exports=java.base/jdk.internal.module=ALL-UNNAMED --add-exports jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED   --add-exports jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED   --add-exports jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED   --add-exports jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED   --add-exports jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED'
wmwei@DESKTOP-7490: cd flink-cdc-connect/flink-cdc-source-connectors/flink-connector-gaussdb-cdc
wmwei@DESKTOP-7490: mvn spotless:apply
...
wmwei@DESKTOP-7490: mvn install -DskipTests
...
wmwei@DESKTOP-7490: cd ../flink-sql-connector-opengauss-cdc/
wmwei@DESKTOP-7490: mvn clean package
  1. 得到flink-sql-connector-gaussdb-cdc-3.2.0.jar
wmwei@DESKTOP-7490: ls flink-sql-connector-opengauss-cdc/target//flink-sql-connector-opengauss-cdc-3.2.0.jar

Flink集成Paimon步骤

  1. paimon下载地址:https://repository.apache.org/content/groups/snapshots/org/apache/paimon/paimon-flink-1.20/1.0-SNAPSHOT/paimon-flink-1.20-1.0-20241110.002724-53.jar
  2. flink下载地址:https://www.apache.org/dyn/closer.lua/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz
  3. flink安装过程
  1. 解压flink-1.20.0-bin-scala_2.12.tgz
tar -xzf flink-1.20.0-bin-scala_2.12.tgz
  1. 复制paimon jar包和gaussdb cdc jar包到flink-1.20.0/lib目录下
wmwei@DESKTOP-7490:~/flink-1.20.0$ pwd
/home/wmwei/flink-1.20.0
wmwei@DESKTOP-7490:~/flink-1.20.0$ ls lib/
flink-cep-1.20.0.jar                         flink-table-api-java-uber-1.20.0.jar
flink-connector-files-1.20.0.jar             flink-table-planner-loader-1.20.0.jar
flink-csv-1.20.0.jar                         flink-table-runtime-1.20.0.jar
flink-dist-1.20.0.jar                        log4j-1.2-api-2.17.1.jar
flink-json-1.20.0.jar                        log4j-api-2.17.1.jar
flink-scala_2.12-1.20.0.jar                  log4j-core-2.17.1.jar
flink-sql-connector-gaussdb-cdc-3.2.0.jar    log4j-slf4j-impl-2.17.1.jar
paimon-flink-1.20-1.0-20241108.002507-51.jar
  1. 启动flink
wmwei@DESKTOP-7490:~/flink-1.20.0$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host DESKTOP-7490.
Starting taskexecutor daemon on host DESKTOP-7490.
  1. 创建GaussDB CDC Table
wmwei@DESKTOP-7490:~/flink-1.20.0$ ./bin/sql-client.sh
Flink SQL> CREATE TABLE shipments1 (
>   shipment_id INT,
>   order_id INT,
>   origin STRING,
>   destination STRING,
>   is_arrived BOOLEAN
> ) WITH (
>   'connector' = 'gaussdb-cdc',
>   'hostname' = '127.0.01',
>   'port' = '8001',
>   'username' = 'postgres',
>   'password' = 'Postgres@123',
>   'database-name' = 'test',
>   'schema-name' = 'public',
>   'table-name' = 'actor',
>   'slot.name' = 'flink_0',
>   'decoding.plugin.name' = 'pgoutput',
>   'scan.incremental.snapshot.enabled' = 'true'
> );
[INFO] Execute statement succeeded.

Flink SQL> select * from shipments1;
+----+-------------+-------------+--------------------------------+--------------------------------+------------+
| op | shipment_id |    order_id |                         origin |                    destination | is_arrived |
+----+-------------+-------------+--------------------------------+--------------------------------+------------+
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。