spark读写hudi表流程
Spark dataSourceV1查询hudi表:
MOR表读流程:
MOR表的读包括3个分支:1)普通MOR表读;2)clustering数据读;3)compaction时读;4)metatable表读;5)hive inputFormat读
最终读接口为:\Hudi_Kernel\hudi-common\src\main\java\org\apache\hudi\common\table\log\AbstractHoodieLogRecordScanner.java -》scan,大体调用栈如下:
MOR表实时视图:
已spark读MOR表实时视图为例,总体流程为:
构造MergeOnReadSnapshotRelation(通过buildFileIndex方法构造分组,每个分组包含单个base file及相应的log files)》
HoodieMergeOnReadRDD 》
HoodieMergeOnReadRDD.compute(计算过程中,根据hoodie.datasource.merge.type决定base文件和log文件是否需要merge;存在3种读:1)读parquet,2)读log,3)读parquet和log做precombine(根据order键做precombine)) 》
以precombine为例:HoodieMergeOnReadRDD.payloadCombineFileIterator 》
HoodieMergeOnReadRDD.scanLog 》
HoodieMergedLogRecordScanner.performScan
scan
MOR表读增量视图:
总体流程类似读实时视图,区别点在于,构建分区的方式:增量视图构建分区是通过读取start commit 与end commit之间的commit构建fsview,再根据是否需要过滤分区过滤掉部分分区,在读方面实现方式一致。
Log文件读取流程在scan接口中:
LOG文件的数据块,分为COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK, HFILE_DATA_BLOCK几种类型,分别表示指定操作数据块(如回滚),删除数据块,损坏数据块,正常数据块,HFile类型数据块
mor表读过程中,会关闭矢量化读(因为需要逐行做precomibne和merge)
Spark datasource 写入hudi表:
限制条件:spark.serializer必须为org.apache.spark.serializer.KryoSerializer
写操作类型:
BULK_INSERT
INSERT
UPSERT
DELETE
BOOTSTRAP
INSERT_OVERWRITE
INSERT_OVERWRITE_TABLE
设置默认参数:此处只会设置默认值,我们可以做一个扩展,支持读取默认的配置文件
df去除huqi元数据列:保证了可以从hudi表导入数据
HoodieSparkSqlWriter.write流程:
接口支持同步compaction,但是spark datasource接口写入时没有配置同步compaction
- 点赞
- 收藏
- 关注作者
评论(0)