CDL写入Hudi全流程操作

举报
小兔子615 发表于 2021/12/31 17:03:11 2021/12/31
【摘要】 1.    准备数据源要求与集群在相同网段的数据库已安装,记录节点地址以及用户名密码。本次测试数据库已准备,以Mysql为例。本地解压Navicat Premium 15.rar并启动navicat.exe,先新建数据库连接。若mysql中已存在数据库,则勾选自动打开然后在左侧连接上右键,打开连接,然后可加载到所有数据库以及表。创建一张测试表source1,并写入2行数据。 2.    配置...

1.    准备数据源

要求与集群在相同网段的数据库已安装,记录节点地址以及用户名密码。

本次测试数据库已准备,以Mysql为例。

本地解压Navicat Premium 15.rar并启动navicat.exe,先新建数据库连接。


mysql中已存在数据库,则勾选自动打开


然后在左侧连接上右键,打开连接,然后可加载到所有数据库以及表。


创建一张测试表source1,并写入2行数据。



 

2.    配置CDL连接

环境准备

首先确认集群已添加CDLKafka服务,且运行正常。

然后在使用的业务用户中添加cdladminkafkaadmin用户组,如admintest用户。


使用该业务用户,进入CDL WebUI


上传驱动

驱动管理中添加MySQLOracle的驱动,Postgresql因为集群内部已集成,不需添加。


配置ENV

配置CDL启动Spark任务的相关资源配置


连接管理

CDLHudi表需要分别创建一个source link,一个sink link

如下是mysql连接,配置后测试连接通过即可


然后是hudi连接,需要上传业务用户的keytab文件,测试连接通过


3.    提交CDL作业

新增作业

default topic与作业名称一致即可,无实际作用

新建mysqlconnector

从左侧拖动mysql图标到右侧,然后双击,配置如下,选择项见?有说明



新建hudiconnector

选择项见?有说明


提交作业

配置完成后,从mysql拖曳到hudi,连接之后保存


然后启动该作业,因为需要提交Spark任务到Yarn上,需要等待1分钟


运行后点击作业,可观察运行状态以及mysql, kafka, hudi三者之间的数据流信息

同时在hudi下可通过appTrackingUrl跳转到Spark WebUI,进入Streaming页签查看流数据处理情况


写入事务数据

点击开始事务,然后修改一行数据,再添加一行数据,提交



可在如下方式观察到数据开始写入

1CDL作业


2)Spark App


3HDFS


4.    引擎交互

DataSource

source Hudi/component_env

spark-shell --master yarn

import org.apache.hudi.QuickstartUtils._

import scala.collection.JavaConversions._

import org.apache.spark.sql.SaveMode._

import org.apache.hudi.DataSourceReadOptions._

import org.apache.hudi.DataSourceWriteOptions._

import org.apache.hudi.config.HoodieWriteConfig._

spark.read.format("hudi").load("/tmp/cdl_to_hudi/source1").show(false)

spark.read.format("hudi").load("/tmp/cdl_to_hudi/source1").select("id","comb","col0","col1","col2","col3","col4").show(false)

注:事务操作只涉及2条增量数据,所以id=2的数据并未读到


SparkSql

SparkSQLHiveHetu引擎读,需要同步Hive表,若CDL作业未自动创建,可通过脚本创建

bin/run_hive_sync_tool.sh --base-path /tmp/cdl_to_hudi/source1 --database default --table hudi_cow --partition-value-extractor org.apache.hudi.hive.NonPartitionedExtractor --support-timestamp

 

source Hudi/component_env

spark-sql --master yarn

select * from hudi_cow;


Hive

beeline

set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

select * from hudi_cow;


Hetu

首先完成3项配置

  1. 业务用户添加hetuadmin用户组,然后在Ranger UI中将用户添加到HetuServerall - catalog, schema, table, column策略中



  1. Hetu web ui中数据源页面,hive配置添加自定义配置,parquet.use-column-names = true,保存


  1. Hetu启动计算实例,创建配置选择默认,实例各1个即可


 

hetu-cli --catalog hive --schema default

select * from hudi_cow;


Flink流读+流写

  1. 业务用户,通过添加角色赋予Flink管理员权限
  2. 进入Flink WebUI,系统管理中创建集群连接,添加该用户,并上传集群配置和认证凭据

 

新建流作业,SQL编辑如下

流式读CDL写的hudi 表,然后流式写入到另一张hudi


CREATE TABLE read_hudi(

  id INT,

  comb INT,

  col0 BIGINT,

  col1 VARCHAR(10),

  col2 DECIMAL(30, 10),

  col3 TIMESTAMP,

  col4 DATE

) WITH (

  'connector' = 'hudi',

  'path' = 'hdfs://hacluster/tmp/cdl_to_hudi/source1',

  'table.type' = 'COPY_ON_WRITE',

  'read.streaming.enabled' = 'true',

  'read.streaming.check-interval' = '10',

  'hoodie.datasource.write.recordkey.field' = 'id',

  'write.precombine.field' = 'id'

);CREATE TABLE write_hudi(

  id INT,

  comb INT,

  col0 BIGINT,

  col1 VARCHAR(10),

  col2 DECIMAL(30, 10),

  col3 TIMESTAMP,

  col4 DATE

) WITH (

  'connector' = 'hudi',

  'path' = 'hdfs://hacluster/tmp/cdl_to_hudi/flinl_write_hudi',

  'table.type' = 'COPY_ON_WRITE',

  'read.streaming.enabled' = 'true',

  'read.streaming.check-interval' = '10',

  'hoodie.datasource.write.recordkey.field' = 'id',

  'write.precombine.field' = 'id'

);

INSERT INTO

  write_hudi

SELECT

  *

FROM

  read_hudi;



通过spark-shell检查flinkhudi再写hudi的结果


 

5.    实时全流程

MySQL的源表,开启事务再增量写入数据,然后检查Flink流式实时读写的结果

source1表,修改一行,新增一行,提交


观察Flink作业,接收到2条增量数据(对比之前,received2变为4,正确)


最后Flink实时写入到另一张Hudi表,同为2条增量数据,正确


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。