使用flink SQL Client将mysql数据写入到hudi并同步到hive
测试环境
组件版本
-
mysql 5.7
-
hive 3.1.2
-
flink 1.12.2
-
hudi 0.9.0
-
hadoop 3.2.0
首先请确保以下组件正常启动:
-
mysql
-
hivemetastore
-
hiveserver2
-
hdfs
-
yarn
hudi适配hive 3.1.2源码编译
0.9.0版本的hudi在适配hive3时,其hudi/package/hudi-flink-bundle/pom.xml文件使用的flink-connector-hive版本有问题,所以需要修改pom文件。
修改点一:
143行,修改为:
<include>org.apache.flink:flink-sql-connector-hive-${hive.version}_${scala.binary.version}</include>
642行,修改为:
<artifactId>flink-sql-connector-hive-${hive.version}_${scala.binary.version}</artifactId>
编译命令:
mvn clean install -DskipTests -Pflink-bundle-shade-hive3 -Dhadoop.version=3.2.0 -Dhive.version=3.1.2 -Pinclude-flink-sql-connector-hive -U -Dscala.version=2.12.10 -Dscala.binary.version=2.12
将编译后得到的hudi/package/hudi-flink-bundle/target/hudi-flink-bundle_2.12-0.9.0.jar拷贝到flink/lib目录下,将得到的hudi/package/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.9.0.jar拷贝到hive/auxlib目录下,如果没有这个目录则新建一个即可。
关于flink操作hudi的相关方法如果有疑惑的可先看本系列的其他文章,例如使用flink插入数据到hudi数据湖初探, Flink SQL Client实战CDC数据入湖等。
生成测试数据
使用datafaker生成100000条数据,放到mysql数据库中的stu4表。
datafaker工具使用方法见datafaker — 测试数据生成工具
首先在mysql中新建表test.stu4
create database test;
use test;
create table stu4 (
id int unsigned auto_increment primary key COMMENT '自增id',
name varchar(20) not null comment '学生名字',
school varchar(20) not null comment '学校名字',
nickname varchar(20) not null comment '学生小名',
age int not null comment '学生年龄',
score decimal(4,2) not null comment '成绩',
class_num int not null comment '班级人数',
phone bigint not null comment '电话号码',
email varchar(64) comment '家庭网络邮箱',
ip varchar(32) comment 'IP地址'
) engine=InnoDB default charset=utf8;
新建meta.txt文件,文件内容为:
id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||学生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||学生年龄[:age]
score||decimal(4,2)||成绩[:decimal(4,2,1)]
class_num||int||班级人数[:int(10, 100)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]
生成10000条数据并写入到mysql中的test.stu3表
datafaker rdb mysql+mysqldb://root:Pass-123-root@hadoop:3306/test?charset=utf8 stu4 100000 --meta meta.txt
datafaker工具有详细使用方法,请参考。
导入mysql数据
使用flink sql client进行如下操作
构建源表
create table stu4(
id bigint not null,
name string,
school string,
nickname string,
age int not null,
score decimal(4,2) not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
PRIMARY KEY (id) NOT ENFORCED
) with (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://hadoop:3306/test?serverTimezone=GMT%2B8',
'username' = 'root',
'password' = 'Pass-123-root',
'table-name' = 'stu4'
);
构建目标表
create table stu4_tmp_1(
id bigint not null,
name string,
`school` string,
nickname string,
age int not null,
score decimal(4,2) not null,
class_num int not null,
phone bigint not null,
email string,
ip string,
primary key (id) not enforced
)
partitioned by (`school`)
with (
'connector' = 'hudi',
'path' = 'hdfs://hadoop:9000/tmp/stu4_tmp_1',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'school',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://hadoop:9083',
'hive_sync.jdbc_url' = 'jdbc:hive2://hadoop:10000',
'hive_sync.table' = 'stu_tmp_1',
'hive_sync.db' = 'test',
'hive_sync.username' = 'hive',
'hive_sync.password' = 'hive'
);
插入数据
insert into stu4_tmp_1 select * from stu4;
hive数据查询
使用hive命令进入hive cli
执行如下命令查询数据
select * from test.stu_tmp_1 limit 10;
结果:
- 点赞
- 收藏
- 关注作者
评论(0)