大数据技术学习——Hudi
1 一句话介绍
Hudi(Hadoop Upserts Deletes and Incrementals)是数据湖的数据组织中间层,可以简单理解为基于Parquet进一步封装的数据格式,能提供表格式、事务能力(插入更新)、增量拉取能力。
2 基本概念
- 表格式:负责表的文件布局、表的 schema 和对表更改的元数据跟踪。
- 增量拉取(incremental pull):在大数据集上,拉取特定时间段的小批量数据。
- 更新插入(upsert):基于事务实现的更新插入数据能力。
- 视图:hudi中的视图就是查询类型(Query Types),包括实时视图(Snapshot Queries)、增量视图(Incremental Queries)和读优化视图(Read Optimized Queries)。
- record key:Hudi表的每一个记录都被唯一 primary key(主键)标识,primary key由record key以及partition path组成。record key可以是Spark DataFrame中的一个字段,或字段组合,也可以是其他的键生成器。
3 Hudi技术栈
数据湖技术栈如下图所示,Hudi实现了其中的kernel以及API层:
- 数据湖数据组织分为三层
湖存储(Lake Storage):HDFS或对象存储实现。
文件格式(File Format):通用的Parquet(面向列)、Avro(面向行)文件格式。
表格式(Table Format):确定文件布局、索引以及表元数据的存储方式。
- 索引(Indexs)
索引可帮助数据库规划更好的查询,从而减少 I/O 总量并提供更快的响应时间。
- 时间线服务器(Timeline Server)
时间线服务器内嵌在Hudi写入器进程中运行,是实现增量拉取能力的基础。
- 并发控制(Concurrency Control)
并发控制定义了不同的写入器 / 读取器如何协调对表的并发访问。
- 表服务(Table Service)
Hudi内置了很多表服务用于确保合理的表存储布局以及元数据管理,在每次写入操作后同步自动调用,或者作为单独的后台作业异步调用,功能类似于数据库的优化器。
- 写入器(Writers)
Hudi表可作为Flink与Spark管道的接收器,提供了更加丰富的写入功能,例如Hudi将写入操作细分为增量(插入、更新插入、删除)和分批 / 批量操作(插入覆盖、插入覆盖表、删除分区、批量插入),每个操作都是高性能和高内聚的。
- 读取器(Readers)
Hudi在写入器和读取器之间提供了快照隔离,并允许主流的湖查询引擎(Spark、Hive、Flink、Presto、Trino、Impala)甚至云数仓在任何表快照上进行一致的查询。
4 Hudi文件组织
4.1 逻辑结构
Hudi表根据分区键字段将表分成若干分区,每个分区内含若干FileGroup。每个FileGroup拥有唯一的FileId,当一个FileGroup大小超过阈值时,会新增FileGroup。每个FileGroup内文件有多个版本,称之为FileSlice。
4.2 物理存储
Hudi表存储文件分为三个部分:
- Metadata:以timeline的形式维护对hudi表的各项操作(commits、cleans、compactions等)。注意不同于表字段等元数据,这些元数据存储在parquet或orc文件中,spark可直接读取parquet文件并执行spark sql查询。
- Data Files:使用两种存储格式存储数据。Data Files包括BaseFile文件(默认Parquet格式)以及LogFile文件(默认Avro格式,存储增量数据,仅MOR表有,详见下文)
- Index:用于高效查找key是否存在。
Bloom Filter:为默认的索引,存储在parquet文件的footer中
Hbase Filter:拥有更快的索引检索速度
5 Hudi两种表类型
5.1 两种表对比
- 写时复制表(Copy On Write,COW表)
目前仅适用Parquet列式存储文件,写入过程中同步执行合并版本,重写被更新文件。在写文件的时候同步更新索引信息和时间序信息。
- 读时合并表(Merge On Read,MOR表)
采用Parquet列式存储+Avro行式存储组合来存储数据,更新数据记录到增量文件中(Avro格式),然后进行同步或压缩生成新的版本文件。相比COW表还增加了LogFile,记录新增记录。
特性 |
COW类型 |
MOR类型 |
数据延迟 |
更高 |
更低 |
更新代价(I/O) |
更高(同步进行重写) |
更低(异步重写) |
Parquet文件大小 |
更小(同步合并压缩) |
更大(异步合并或手动合并) |
写放大 |
更高(多版本) |
更低 |
适合场景 |
批量入湖 |
实时入湖 |
5.2 COW表读写操作原理
- COW表写入流程
数据预处理→索引加载→定位更新文件→重写需要更新的文件→合并小文件
- COW表读取流程
加载commit信息→根据commit查找File slices→读取对应的数据文件
样例说明:
- T1时写入数据[key1,key2,key3,key4];假设分布为两个文件,此时hudi表数据目录中会直接保存为Parquet File 1和Parquet File 2两个文件;Time line中会生成commit1
- Commit1保存成功后,使用读优化视图读取Hudi表,此时读入数据为Parquet File1 + Parquet File2
- T2时更新数据[key1,key2],此时根据索引信息发现需要更新的数据文件为Parquet File1,则根据Parquet File 1和更新后的数据的合并结果,生成Parquet File 1’;Time line中会生成commit2
- Commit2保存完成后,使用读优化视图查询,读取的数据文件为Parquet File 1’+ Parquet File 2;查询commit2与commit1之间的增量查询数据时,直接读取Parquet File 1’
5.3 MOR表读写操作原理
- MOR表写入流程
数据预处理→追加写入LogFile
- MOR表读取流程
加载commit信息→根据commit查找File slices以及log文件的offset range→合并读取parquet文件和log文件
- T1时写入数据[key1,key2,key3,key4];假设分布为两个文件,此时hudi表数据目录中会直接保存为Parquet File 1 和Parquet File 2两个文件;Time line中会生成commit1
- Commit1保存成功后,使用读优化视图读取hudi表,此时读入数据为Parquet File1 + Parquet File2
- T2时更新数据[key1,key2],此时直接追加写入log File1,Time line中会生成commit2
- Commit2保存完成后,使用读优化视图查询,读取的数据文件为Parquet File 1 + Parquet File 2;使用实时视图查询,将查询(Parquet File1 join log File1) + Parquet File2;查询commit2与commit1之间的增量查询数据时,直接读取log File 1
6 Hudi三种查询视图
- 实时视图(Real-time / Snapshot View)
该视图提供当前hudi表最新的快照数据,即一旦有最新的数据写入hudi表,通过该视图就可以查出刚写入的新数据。COW表和MOR均支持这种视图能力。
- 增量视图(Incremental View)
该视图提供增量查询的能力,可以查询指定COMMIT之后的增量数据,可用于快速拉取增量数据。COW表支持该种视图能力, MOR表也可以支持该视图,但是一旦MOR表完成compact操作其增量视图能力消失。
- 读优化视图(Read Optimized View)
该视图只会提供最新版本的parquet文件中存储的数据。该视图在COW表和MOR表上表现不同:
对于COW表,该视图能力和实时视图能力是一样的(COW表只用Parquet文件存数据)。
对于MOR表,仅访问基本文件,提供给定文件片自上次执行compact操作以来的数据, 可简单理解为该视图只会提供MOR表parquet文件存储的数据,log文件里面的数据将被忽略。 该视图数据并不一定是最新的,但是MOR表一旦完成compact操作,增量log数据被合入到了base数据里面,这个时候该视图和实时视图能力一样。
7 表服务
Hudi作为数据组织中间层,提供了多种表服务去权衡数据插入与查询效率。一般表服务支持同步以及异步两种方式,异步方式分为两个阶段:1. 生成调度计划 2. 执行调度计划,从而实现在不影响数据插入的情况下,提高数据查询性能。
- Compaction:用于合并MOR表中的Base以及Log文件。推荐数据入湖时同步生成调度计划,异步执行调度计划。
- Cleanning:清除FileGroup内不再需求的老旧版本数据(FileSlice)。
- Clustering:数据布局,重新组织FileGroup内数据,例如排序等,提高查询效率。后台数据布局执行中的FileGroup不允许Update操作。
- Archival:后台进程清除metadata中老旧版本的commit文件。
- Savepoint:将特定commit文件保存起来,便于后续使用rollback恢复。
8 Payload
Payload是Hudi提供的可插拔的数据插入机制。传统上整行插入覆盖无法满足所有场景需求,此外插入也有定制化需求,例如部分字段更新、数据去重等。Hudi会将每条记录包装成一个Payload,将数据之间的比较转换成Payload之间的比较。Payload则是基于数据插入时指定的hoodie.datasource.write.precombine.field(Precombine Key)来构建。Payload举例如下,可自定义:
Payload |
更新逻辑 |
OverwriteWithLatestAvroPayload |
更新全部字段 |
OverwriteNonDefaultsWithLatestAvroPayload |
将新数据中的非空字段更新到老数据中 |
DefaultHoodieRecordPayload |
根据precombine key顺序比较是否要更新数据,适合实时入湖且入湖顺序乱序 |
9 参考
- https://cloud.google.com/blog/products/data-analytics/getting-started-with-new-table-formats-on-dataproc
- https://www.infoq.cn/article/08t12zv6ev9spxalpklq
- https://eng.uber.com/apache-hudi-graduation/
- https://www.cnblogs.com/leesf456/p/12710118.html
- https://hudi.apache.org/blog/2020/11/11/hudi-indexing-mechanisms/
- Parquet:https://cloud.tencent.com/developer/article/1631008
- Orc:https://cloud.tencent.com/developer/article/1757862
- 使用案例:https://hudi.apache.org/cn/docs/0.6.0/use_cases/
- 表服务:https://hudi.apache.org/docs/next/compaction
- https://doc.hcs.huawei.com/zh-cn/usermanual/mrs/mrs_01_24038.html
- payload:https://bbs.huaweicloud.com/blogs/302579
- 点赞
- 收藏
- 关注作者
评论(0)