flink写入数据到hudi的四种方式

举报
从大数据到人工智能 发表于 2022/03/28 00:30:39 2022/03/28
【摘要】 总览 bulk_insert用于快速导入快照数据到hudi。 基本特性bulk_insert可以减少数据序列化以及合并操作,于此同时,该数据写入方式会跳过数据去重,所以用户需要保证数据的唯一性。bulk_insert在批量写入模式中是更加有效率的。默认情况下,批量执行模式按照分区路径对输入记录进行排序,并将这些记录写入Hudi,该方式可以避免频繁切换文件句柄导致的写性能下降。bulk_in...

总览

bulk_insert

用于快速导入快照数据到hudi。

基本特性

bulk_insert可以减少数据序列化以及合并操作,于此同时,该数据写入方式会跳过数据去重,所以用户需要保证数据的唯一性。

bulk_insert在批量写入模式中是更加有效率的。默认情况下,批量执行模式按照分区路径对输入记录进行排序,并将这些记录写入Hudi,该方式可以避免频繁切换文件句柄导致的写性能下降。

bulk_insert的并行度有write.tasks参数指定,并行度会影响小文件的数量。理论上来说,bulk_insert的并行度就是bucket的数量(特别是,当每个bucket写到最大文件大小时,它将转到新的文件句柄。最后,文件的数量将大于参数write.bucket.assign.tasks指定的数量 )

可选配置参数

参数名称 是否必须 默认值 参数说明
write.operation true upsert 设置为 bulk_insert 以开启bulk_insert功能
write.tasks false 4 bulk_insert 并行度, the number of files >= write.bucket_assign.tasks
write.bulk_insert.shuffle_by_partition false true 写入前是否根据分区字段进行数据重分布。 启用此选项将减少小文件的数量,但可能存在数据倾斜的风险
write.bulk_insert.sort_by_partition false true 写入前是否根据分区字段对数据进行排序。 启用此选项将在写任务写多个分区时减少小文件的数量
write.sort.memory false 128 排序算子的可用托管内存。 默认为 128 MB

Flink SQL实践

使用datafaker生成100000条数据,放到mysql数据库中的stu4表。

数据生成方式以及Flink SQL使用方法见Flink SQL Client实战CDC数据入湖

使用bulk_insert方式写入到hudi中。

Flink SQL client 创建myql数据源

create table stu4(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  PRIMARY KEY (id) NOT ENFORCED
) with (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://hadoop:3306/test?serverTimezone=GMT%2B8',
  'username' = 'root',
  'password' = 'Pass-123-root',
  'table-name' = 'stu4'
);

创建hudi表

 create table stu4_sink_hudi(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
 score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop:9000/tmp/stu4_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.option' = 'bulk_insert',
  'write.precombine.field' = 'school'
  );

mysql数据插入到hudi中

insert into stu4_sink_hudi select * from stu4;

执行时间

Index bootstrap

基本特性

该方式用于快照数据+增量数据的导入。如果快照数据已经通过bulk_insert导入到hudi,那么用户就可以近实时插入增量数据并且通过index bootstrap功能来确保数据不会重复。

温馨提示:

如果你觉得这个过程特别耗时,那么你在写快照数据的时候可以多设置计算资源,然后在插入增量数据时减少计算资源。

可选配置参数

参数名称 是否必须 默认值 参数说明
index.bootstrap.enabled true false 当启用index bootstrap功能时,会将Hudi表中的剩余记录一次性加载到Flink状态中
index.partition.regex false * 优化参数,设置正则表达式来过滤分区。 默认情况下,所有分区都被加载到flink状态

使用方法

  1. CREATE TABLE创建一条与Hudi表对应的语句。 注意这个table.type配置必须正确。

  2. 设置index.bootstrap.enabled = true来启用index bootstrap功能

  3. 在flink-conf.yaml文件中设置Flink checkpoint的容错机制,设置配置项execution.checkpointing.tolerable-failed-checkpoints = n(取决于Flink checkpoint执行时间)

  4. 等待直到第一个checkpoint成功,表明index bootstrap完成。

  5. 在index bootstrap完成后,用户可以退出并保存savepoint(或直接使用外部 checkpoint)。

  6. 重启任务,并且设置index.bootstrap.enablefalse

温馨提示:

  1. 索引引导是一个阻塞过程,因此在索引引导期间无法完成checkpoint。

  2. index bootstrap由输入数据触发。 用户需要确保每个分区中至少有一条记录。

  3. index bootstrap是并发执行的。用户可以在日志文件中通过finish loading the index under partition以及Load record form file观察index bootstrap的进度。

  4. 第一个成功的checkpoint表明index bootstrap已完成。 从checkpoint恢复时,不需要再次加载索引。

Flink SQL实践

前提条件:

  1. 已有50w条数据已写入kafka,使用bulk_insert的方式将其导入hudi表。

  2. 再通过创建任务消费最新kafka数据,并开启index bootstrap特性。

如果未将数据导入kafka可使用Flink SQL Client实战CDC数据入湖文章提供的方法将数据导入kafka。

创建bulk_insert任务:

create table stu3_binlog_source_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string
 ) with (
  'connector' = 'kafka',
  'topic' = 'cdc_mysql_stu3_sink_test',
  'properties.bootstrap.servers' = 'hadoop1:9092',
  'format' = 'debezium-json',
  'scan.startup.mode' = 'earliest-offset',
  'properties.group.id' = 'testGroup_20210929_4'
  );
  
 create table stu3_binlog_sink_hudi(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop:9000/tmp/stu3_binlog_sink_hudi_20210929_4',
  'table.type' = 'MERGE_ON_READ',
  'write.option' = 'bulk_insert',
  'write.precombine.field' = 'school'
  );
  
insert into stu3_binlog_sink_hudi select * from stu3_binlog_source_kafka;

创建开启index bootstrap特性、离线压缩任务。

create table stu3_binlog_source_kafka_1(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string
 ) with (
  'connector' = 'kafka',
  'topic' = 'cdc_mysql_stu3_sink_test',
  'properties.bootstrap.servers' = 'hadoop1:9092',
  'format' = 'debezium-json',
  'scan.startup.mode' = 'latest-offset',
  'properties.group.id' = 'testGroup_20210929_4'
  );
  
   create table stu3_binlog_sink_hudi_1(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop:9000/tmp/stu3_binlog_sink_hudi_20210929_4',
  'table.type' = 'MERGE_ON_READ',
  'write.option' = 'upsert',
  'write.tasks' = '4',
  'write.precombine.field' = 'school',
  'compaction.async.enabled' = 'false', 
  'index.bootstrap.enabled' = 'true'
  );
  
insert into stu3_binlog_sink_hudi_1 select * from stu3_binlog_source_kafka_1;

提交离线压缩任务:

./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink-bundle_2.12-0.9.0.jar --path hdfs://hadoop:9000/tmp/stu3_binlog_sink_hudi_20210929_4

创建bulk_insert任务:

Changelog Mode

基本特性

Hudi可以保留消息的所有中间变化(I / -U / U / D),然后通过flink的状态计算消费,从而拥有一个接近实时的数据仓库ETL管道(增量计算)。 Hudi MOR表以行的形式存储消息,支持保留所有更改日志(格式级集成)。 所有的更新日志记录可以使用Flink流阅读器。

可选配置参数

参数名称 是否必须 默认值 参数说明
changelog.enabled false false 它在默认情况下是关闭的,为了拥有upsert语义,只有合并的消息被确保保留,中间的更改可以被合并。 设置为true以支持使用所有更改

温馨提示:

不管格式是否存储了中间更改日志消息,批处理(快照)读取仍然合并所有中间更改。

在设置changelog.enable为true时,更新日志记录的保留只是最大的努力: 异步压缩任务将更新日志记录合并到一条记录中,因此如果流源不及时消费,则压缩后只能读取每个key的合并记录。 解决方案是通过调整压缩策略,比如压缩选项:compress.delta_commits和compression.delta_seconds,为读取器保留一些缓冲时间。

Insert Mode

基本特性

默认情况下,Hudi对插入模式采用小文件策略:MOR将增量记录追加到日志文件中,COW合并基本parquet文件(增量数据集将被重复数据删除)。 这种策略会导致性能下降。

如果要禁止文件合并行为,可将write.insert.deduplicate设置为false,则跳过重复数据删除。 每次刷新行为直接写入一个新的 parquet文件(MOR表也直接写入parquet文件)。

可选配置参数

参数名称 是否必须 默认值 参数说明
write.insert.deduplicate false true “插入模式”默认启用重复数据删除功能。 关闭此选项后,每次刷新行为直接写入一个新的 parquet文件
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。