Kafka适配GaussDB开源验证任务心得
首先先看下任务计划书 ,大概了解了下这个任务的主要工作是通过flink读取Kafka写入GaussDB并且可以写入数据的验证工作。主要是需要花时间熟悉kafka和flink的架构、环境搭建、扩展新功能和开发简单的界面展示效果。下面介绍下适配过程中的一些关键步骤。开发过程首先参考任务计划书给的参考资料: https://kafka.apache.org/和 https://flink.apache.org/ 。 把官网看了下。 官网上是支持很多...
A、环境部署:
1、linux上基本环境java
2、zookeeper、kafa 和flink的架构环境,通过apache官网下载zookeepe、kafka 和flink软件包,然后上传到ecs上部署安装
- apache-zookeeper-3.7.2-bin.tar.gz
- kafka_2.12-3.5.1.tgz
- flink-1.18.1-bin-scala_2.12.tgz
zookeeper和kafka部署
建议:在配置kafka的server.properties文件时,建议zookeeper.connect=hostname:2181/kafka像这种加一个指定kafka目录路径,kafka注册在zookeeper中文件会很好归类;搭建其他版本的kafka也能清晰查找,类似以下
遇到的问题:我配置这个版本的kafa需要开启listeners=PLAINTEXT://hostname:9092(当前服务器hostname),否则kafka生产者和消费者连接报错
flink部署
小结:flink部署相对简单,注意flink-conf.yaml绑定的ip
B、Flink程序开发
- 配置flink的专属依赖flink1.18.0版本
- GaussDB(For Mysql)的flink-connector-mysql-cdc依赖
- GaussDB可以使用opengauss-jdbc依赖
这里用的是FlinkCDC读取GaussDB(For Mysql),写入到kafka的topic中;从topic读取写入GaussDB中,flink web上job如下
步骤一:flinkCDC读取GaussDB(For Mysql)
步骤二:查看GaussDB的t1_dest表
开发问题:
- idea上开发flink stream时会碰到依赖不兼容的,找对应版本
- 在flink 写入GaussDB表时,当update为避免主键冲突,需要先select判断是否有数据再insert或者update
- 之前写入kafka使用的是json序列化,所有读取kafka的时候需要json解析调试转成t1_dest表字段格式
开发总结
本任务适配中,环境搭建不复杂;主要是flink读取和写入GaussDB数据库的时候进行map算子转换过程中代码调试以及版本依赖花费较多时间。
- 点赞
- 收藏
- 关注作者
评论(0)