CDL写入Hudi全流程操作
1. 准备数据源
要求与集群在相同网段的数据库已安装,记录节点地址以及用户名密码。
本次测试数据库已准备,以Mysql为例。
本地解压Navicat Premium 15.rar并启动navicat.exe,先新建数据库连接。
若mysql中已存在数据库,则勾选自动打开
然后在左侧连接上右键,打开连接,然后可加载到所有数据库以及表。
创建一张测试表source1,并写入2行数据。
2. 配置CDL连接
环境准备
首先确认集群已添加CDL、Kafka服务,且运行正常。
然后在使用的业务用户中添加cdladmin、kafkaadmin用户组,如admintest用户。
使用该业务用户,进入CDL WebUI
上传驱动
驱动管理中添加MySQL和Oracle的驱动,Postgresql因为集群内部已集成,不需添加。
配置ENV
配置CDL启动Spark任务的相关资源配置
连接管理
CDL写Hudi表需要分别创建一个source link,一个sink link
如下是mysql连接,配置后测试连接通过即可
然后是hudi连接,需要上传业务用户的keytab文件,测试连接通过
3. 提交CDL作业
新增作业
default topic与作业名称一致即可,无实际作用
新建mysql的connector
从左侧拖动mysql图标到右侧,然后双击,配置如下,选择项见?有说明
新建hudi的connector
选择项见?有说明
提交作业
配置完成后,从mysql拖曳到hudi,连接之后保存
然后启动该作业,因为需要提交Spark任务到Yarn上,需要等待1分钟
运行后点击作业,可观察运行状态以及mysql, kafka, hudi三者之间的数据流信息
同时在hudi下可通过appTrackingUrl跳转到Spark WebUI,进入Streaming页签查看流数据处理情况
写入事务数据
点击开始事务,然后修改一行数据,再添加一行数据,提交
可在如下方式观察到数据开始写入
1)CDL作业
2)Spark App
3)HDFS
4. 引擎交互
DataSource读
source Hudi/component_env
spark-shell --master yarn
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
spark.read.format("hudi").load("/tmp/cdl_to_hudi/source1").show(false)
spark.read.format("hudi").load("/tmp/cdl_to_hudi/source1").select("id","comb","col0","col1","col2","col3","col4").show(false)
注:事务操作只涉及2条增量数据,所以id=2的数据并未读到
SparkSql读
SparkSQL、Hive、Hetu引擎读,需要同步Hive表,若CDL作业未自动创建,可通过脚本创建
bin/run_hive_sync_tool.sh --base-path /tmp/cdl_to_hudi/source1 --database default --table hudi_cow --partition-value-extractor org.apache.hudi.hive.NonPartitionedExtractor --support-timestamp
source Hudi/component_env
spark-sql --master yarn
select * from hudi_cow;
Hive读
beeline
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
select * from hudi_cow;
Hetu读
首先完成3项配置
- 业务用户添加hetuadmin用户组,然后在Ranger UI中将用户添加到HetuServer的all - catalog, schema, table, column策略中
- Hetu web ui中数据源页面,hive配置添加自定义配置,parquet.use-column-names = true,保存
- Hetu启动计算实例,创建配置选择默认,实例各1个即可
hetu-cli --catalog hive --schema default
select * from hudi_cow;
Flink流读+流写
- 业务用户,通过添加角色赋予Flink管理员权限
- 进入Flink WebUI,系统管理中创建集群连接,添加该用户,并上传集群配置和认证凭据
新建流作业,SQL编辑如下
流式读CDL写的hudi 表,然后流式写入到另一张hudi表
CREATE TABLE read_hudi(
id INT,
comb INT,
col0 BIGINT,
col1 VARCHAR(10),
col2 DECIMAL(30, 10),
col3 TIMESTAMP,
col4 DATE
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://hacluster/tmp/cdl_to_hudi/source1',
'table.type' = 'COPY_ON_WRITE',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '10',
'hoodie.datasource.write.recordkey.field' = 'id',
'write.precombine.field' = 'id'
);CREATE TABLE write_hudi(
id INT,
comb INT,
col0 BIGINT,
col1 VARCHAR(10),
col2 DECIMAL(30, 10),
col3 TIMESTAMP,
col4 DATE
) WITH (
'connector' = 'hudi',
'path' = 'hdfs://hacluster/tmp/cdl_to_hudi/flinl_write_hudi',
'table.type' = 'COPY_ON_WRITE',
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '10',
'hoodie.datasource.write.recordkey.field' = 'id',
'write.precombine.field' = 'id'
);
INSERT INTO
write_hudi
SELECT
*
FROM
read_hudi;
通过spark-shell检查flink读hudi再写hudi的结果
5. 实时全流程
MySQL的源表,开启事务再增量写入数据,然后检查Flink流式实时读写的结果
source1表,修改一行,新增一行,提交
观察Flink作业,接收到2条增量数据(对比之前,received从2变为4,正确)
最后Flink实时写入到另一张Hudi表,同为2条增量数据,正确
- 点赞
- 收藏
- 关注作者
评论(0)