数据湖(四):Hudi与Spark整合
Hudi与Spark整合
一、向Hudi插入数据
默认Spark操作Hudi使用表类型为Copy On Write模式。Hudi与Spark整合时有很多参数配置,可以参照https://hudi.apache.org/docs/configurations.html配置项来查询,此外,整合时有几个需要注意的点,如下:
- Hudi这里使用的是0.8.0版本,其对应使用的Spark版本是2.4.3+版本
- Spark2.4.8使用的Scala版本是2.12版本,虽然2.11也是支持的,建议使用2.12。
- maven导入包中需要保证httpclient、httpcore版本与集群中的Hadoop使用的版本一致,不然会导致通信有问题。检查Hadoop使用以上两个包的版本路径为:$HADOOP_HOME/share/hadoop/common/lib。
- 在编写代码过程中,指定数据写入到HDFS路径时直接写“/xxdir”不要写“hdfs://mycluster/xxdir”,后期会报错“java.lang.IllegalArgumentException: Not in marker dir. Marker Path=hdfs://mycluster/hudi_data/.hoodie\.temp/2022xxxxxxxxxx/default/c4b854e7-51d3-4a14-9b7e-54e2e88a9701-0_0-22-22_20220509164730.parquet.marker.CREATE, Expected Marker Root=/hudi_data/.hoodie/.temp/2022xxxxxxxxxx”,可以将对应的hdfs-site.xml、core-site.xml放在resources目录下,直接会找HDFS路径。
1、创建项目,修改pom.xml为如下内容
2、编写向Hudi插入数据代码
二、指定分区向hudi中插入数据
向Hudi中存储数据时,如果没有指定分区列,那么默认只有一个default分区,我们可以保存数据时指定分区列,可以在写出时指定“DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY”选项来指定分区列,如果涉及到多个分区列,那么需要将多个分区列进行拼接生成新的字段,使用以上参数指定新的字段即可。
1、指定一个分区列
2、指定分区为多个列时,可以先拼接,后指定拼接字段当做分区列:
指定两个分区,需要拼接
三、 读取Hudi数据
使用SparkSQL读取Hudi中的数据,无法使用读取表方式来读取,需要指定HDFS对应的路径来加载,指定的路径只需要指定到*.parquet当前路径或者上一层路径即可,路径中可以使用“*”来替代任意目录和数据。
读取数据返回的结果中除了原有的数据之外,还会携带Hudi对应的列数据,例如:hudi的主键、分区、提交时间、对应的parquet名称。
Spark读取Hudi表数据代码如下:
四、更新Hudi数据
向Hudi中更新数据有如下几个特点
- 同一个分区内,向Hudi中更新数据是用主键来判断数据是否需要更新的,这里判断的是相同分区内是否有相同主键,不同分区内允许有相同主键。
- 更新数据时,如果原来数据有分区,一定要指定分区,不然就相当于是向相同表目录下插入数据,会生成对应的“default”分区。
- 向Hudi中更新数据时,与向Hudi中插入数据一样,但是写入的模式需要指定成“Append”,如果指定成“overwrite”,那么就是全覆盖了。建议使用时一直使用“Append”模式即可。
- 当更新完成之后,再一次从Hudi中查询数据时,会看到Hudi提交的时间字段为最新的时间。
这里将原有的三条数据改成如下三条数据:
更新Hudi数据代码如下:
五、 增量查询Hudi数据
Hudi可以根据我们传入的时间戳查询此时间戳之后的数据,这就是增量查询,需要注意的是增量查询必须通过以下方式在Spark中指定一个时间戳才能正常查询:
option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,timestamp)
例如:原始数据如下:
我们可以查询“20210709220335”之后的数据,查询结果如下:
代码如下:
六、指定时间范围查询Hudi数据
Hudi还可以通过指定开始时间和结束时间来查询时间范围内的数据。如果想要查询最早的时间点到某个结束时刻的数据,开始时间可以指定成“000”。
1、向原有Hudi表“person_infos”中插入两次数据
目前hudi表中的数据如下:
先执行两次新的数据插入,两次插入数据之间的间隔时间至少为1分钟,两次插入数据代码如下:
此时,数据如下:
2、指定时间段查询Hudi中的数据
代码如下:
开始时间为“000”,相当于是从头开始查询到endTime的数据:
开始时间为“20210710002148”:
七、删除Hudi数据
我们准备对应的主键及分区的数据,将Hudi中对应的主键及分区的数据进行删除,在删除Hudi中的数据时,需要指定option(OPERATION_OPT_KEY,"delete")配置项,并且写入模式只能是Append,不支持其他写入模式,另外,设置下删除执行的并行度,默认为1500个,这里可以设置成2个。
原始数据如下:
准备要删除的数据如下:
编写代码如下:
结果如下:
八、更新Hudi某个分区数据
如果我们想要更新Hudi某个分区的数据,其他分区数据正常使用,那么可以通过配置option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert_overwrite")选项,该选项“insert_overwrite”可以直接在元数据层面上操作,直接将写入某分区的新数据替换到该分区内,原有数据会在一定时间内删除,相比upsert更新Hudi速度要快。
1、删除person_infos对应的目录,重新插入数据,代码如下
2、读取更新分区数据,插入到Hudi preson_infos表中
读取数据如下:
代码如下:
九、覆盖Hudi整个表数据
如果我们想要替换Hudi整个表数据,可以在向Hudi表写入数据时指定配置option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert_overwrite_table")选项,该选项“insert_overwrite_table”可以直接在元数据层面上操作,直接将数据写入表,原有数据会在一定时间内删除,相比删除原有数据再插入更方便。
1、删除Hudi表person_infos对应的HDFS路径,重新插入数据
2、读取新数据,覆盖原有Hudi表数据
覆盖更新的数据如下:
代码如下:
十、Spark操作Hudi Merge On Read 模式
默认Spark操作Hudi使用Copy On Write模式,也可以使用Merge On Read 模式,通过代码中国配置如下配置来指定:
option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
代码操作如下:
- 删除原有person_infos对应的HDFS路径
- 读取数据向Hudi表person_info中插入数据
读取的数据如下:
代码如下:
- 更新Hudi表person_info数据
这里更新“beijing”、“shanghai”、“ttt”分区数据,更新数据如下:
代码如下:
- 增量查询Hudi表中的数据
Snapshot 模式查询,这种模式对于COW或者MOR模式都是查询到当前时刻全量的数据,如果有更新,那么就是更新之后全量的数据:
incremental 模式查询,这种模式需要指定一个时间戳,查询指定时间戳之后的新增数据:
Read Optimized 模式查询,这种模式只查询Base中的数据,不会查询MOR中Log文件中的数据,代码如下:
十一、测试COW模式parquet文件删除与MOR模式Parquet文件与log文件Compact
COW默认情况下,每次更新数据Commit都会基于之前parquet文件生成一个新的Parquet Base文件数据,默认历史parquet文件数为10,当超过10个后会自动删除旧的版本,可以通过参数“hoodie.cleaner.commits.retained”来控制保留的FileID版本文件数,默认是10。测试代码如下:
测试注意:每次运行代码,读取新的一个数据文件,并查看Hudi表对应的HDFS路径,每次读取都会生成一个新的Parquet文件,当达到指定的3个历史版本时(不包含最新Parquet文件),再插入数据生成新的Parquet文件时,一致会将之前的旧版本删除,保存4个文件。
MOR模式下,如果有新增数据会直接写入Base Parquet文件,这个Parquet文件个数的控制也是由“hoodie.cleaner.commits.retained”控制,默认为10。当对应的每个FlieSlice(Base Parquet文件+log Avro文件)中有数据更新时,会写入对应的log Avro文件,那么这个文件何时与Base Parquet文件进行合并,这个是由参数“hoodie.compact.inline.max.delta.commits”决定的,这个参数意思是在提交多少次commit后触发压缩策略,默认是5,也就是当前FlieSlice中如果有5次数据更新就会两者合并生成全量的数据,当前FlieSlice还是这个FileSlice名称,只不过对应的parquet文件中是全量数据,再有更新数据还是会写入当前FileSlice对应的log日志文件中。使“hoodie.compact.inline.max.delta.commits”参数起作用,默认必须开启“hoodie.compact.inline”,此值代表是否完成提交数据后进行压缩,默认是false。
测试代码如下:
第一次运行插入数据,commit,路径对应数据目录如下:
第一次运行更新数据,commit,路径对应数据目录如下:
第二次运行更新数据,commit,路径对应的数据目录如下:
第三次运行更新数据,commit,路径对应的数据目录如下:
第四次运行更新数据,commit,路径对应的数据目录如下:
第五次运行更新数据,commit,路径对应的目录数据如下:
- 点赞
- 收藏
- 关注作者
评论(0)