上新!MRS集成Hudi
Hudi是数据湖的文件组织层,对Parquet格式文件进行管理提供数据湖能力,支持多种计算引擎,提供IUD接口,在 HDFS/OBS的数据集上提供了插入更新和增量拉取的流原语。
图1 Hudi基本架构
Hudi特性
- ACID事务能力,支持实时入湖和批量入湖。
- 多种视图能力(读优化视图/增量视图/实时视图),支持快速数据分析。
- MVCC设计,支持数据版本回溯。
- 自动管理文件大小和布局,以优化查询性能准实时摄取,为查询提供最新数据。
- 支持并发读写,基于snapshot的隔离机制实现写入时可读取。
- 支持原地转表,将存量的历史表转换为Hudi数据集。
Hudi关键技术和优势
- 可插拔索引机制:Hudi提供多种索引机制,可以快速完成对海量数据的更新和删除操作。
- 良好的生态支持:Hudi支持多种数据引擎接入包括Hive、Spark、Flink。
Hudi支持两种表类型
- Copy On Write
写时复制表也简称cow表,使用parquet文件存储数据,内部的更新操作需要通过重写原始parquet文件完成。
- 优点 读取时,只读取对应分区的一个数据文件即可,较为高效
- 缺点 数据写入的时候,需要复制一个先前的副本再在其基础上生成新的数据文件,这个过程比较耗时。且由于耗时,读请求读取到的数据相对就会滞后
- Merge On Read
读时合并表也简称mor表,使用列格式parquet和行格式Avro两种方式混合存储数据。其中parquet格式文件用于存储基础数据,Avro格式文件(也可叫做log文件)用于存储增量数据。
- 优点 由于写入数据先写delta log,且delta log较小,所以写入成本较低
- 缺点 需要定期合并整理compact,否则碎片文件较多。读取性能较差,因为需要将delta log 和 老数据文件合并
Hudi支持三种视图,针对不同场景提供相应的读能力
- Snapshot View
实时视图:该视图提供当前hudi表最新的快照数据,即一旦有最新的数据写入hudi表,通过该视图就可以查出刚写入的新数据。
cow表和mor均支持这种视图能力。
- Incremental View
增量视图:该视图提供增量查询的能力,可以查询指定COMMIT之后的增量数据,可用于快速拉取增量数据。
cow表支持该种视图能力, mor表也可以支持该视图,但是一旦mor表完成compact操作其增量视图能力消失。
- Read Optimized View
读优化视图:该视图只会提供最新版本的parquet文件中存储的数据。
该视图在cow表和mor表上表现不同:
对于cow表,该视图能力和实时视图能力是一样的(cow表只用parquet文件存数据)。
对于mor表,仅访问基本文件,提供给定文件片自上次执行compact操作以来的数据, 可简单理解为该视图只会提供mor表parquet文件存储的数据,log文件里面的数据将被忽略。 该视图数据并不一定是最新的,但是mor表一旦完成compact操作,增量log数据被合入到了base数据里面,这个时候该视图和实时视图能力一样。
Hudi数据的实时写入
Hudi支持多种写入方式,实时写入方面MRS 3.1.0中集成了Hudi基于Spark的写入工具DeltaStreamer,不久的将来MRS将在升级Hudi 0.8.0版本,实现了对FlinkSQL写入Hudi的支持。
DeltaStreamer支持单次写入和持续写入两种方式,在持续写入的模式中,支持自动调度compaction和clean任务。同时,DeltaStreamer还支持将同步创建Hive表和分区,便于使用SparkSQL,Hetu等引擎直接通过Hive表查询Hudi数据。
一般场景下,DeltaStreamer通过配置即可拉起任务。以读取Kafka json格式数据为例,首先编写源和目的数据schema,形如:
{
"namespace":"hoodie",
"type": "kafka",
"name": "test",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "id",
"type": "string"
},
{
"name": "score",
"type": "int"
},
{
"name": "grade",
"type": "int"
}
]
}
然后,编辑DeltaStreamer运行配置:
# Kafka源数据schema
hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://hacluster/tmp/sourceschema.json
# Hudi目的数据schema
hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://hacluster/tmp/targetschema.json
# 进行precombine的字段,当两条数据有相同主键时,根据该字段选择值更大的数据写入,值大小使用Object.compareTo(..)判断
hoodie.datasource.write.precombine.field=score
# 数据主键
hoodie.datasource.write.recordkey.field=id
# 分区字段
hoodie.datasource.write.partitionpath.field=grade
# 是否启动通过Hive表
hoodie.datasource.hive_sync.enable=true
# 是否使用元数据方式同步Hive表
hoodie.datasource.meta.sync.enable=true
# Hive表同步使用的database
hoodie.datasource.hive_sync.database=default
# 同步到Hive的表名
hoodie.datasource.hive_sync.table=test
# 同步Hive表使用的工具类
hoodie.meta.sync.client.tool.class=org.apache.hudi.hive.HiveSyncTool
# 同步Hive表的数据格式
hoodie.datasource.hive_sync.base_file_format=PARQUET
# 同步Hive表的分区字段
hoodie.datasource.hive_sync.partition_fields=grade
# Hive的JDBC连接地址
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://zk_ip_1:2181,zk_ip_2:2181,zk_ip_3:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2
# Kafka Source topic
hoodie.deltastreamer.source.kafka.topic=zhongyushuo
# Kafka props
# Kafka集群Bootstrap Server地址
bootstrap.servers= 192.168.0.231:9092,192.168.0.83:9092,192.168.0.240:9092
配置文件编辑完成后,使用spark-submit命令提交任务,任务提交命令中可以指定表类型,操作类型(insert/bulk_insert/upsert)。
UPSERT(插入更新)是默认行为,写入是将根据索引,如果当前没有与要写入数据主键相同的数据则直接插入,如果存在与要写入数据主键相同的数据,则进行更新操作。
INSERT(插入),与upsert类似,但是不会进行主键的索引查找,允许数据重复。
BULK_INSERT(批插入)语义与INSERT相同,但是对于批量写入场景效率更高。
DeltaStreamer任务提交命令如下:
source /opt/Bigdata/client/bigdata_env
source /opt/Bigdata/client/Hudi/component_env
spark-submit \
--name kafka2hudi \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 6g \
--executor-cores 4 \
--num-executors 4 \
--conf spark.kryoserializer.buffer.max=128m \
--conf spark.yarn.executor.memoryOverhead=4g \
--driver-class-path /opt/Bigdata/client/Hudi/hudi/conf:/opt/Bigdata/client/Hudi/hudi/lib/*:/opt/Bigdata/client/Spark2x/spark/jars/*: \
--jars /opt/Bigdata/client/Spark2x/spark/jars/streamingClient010/spark-streaming-kafka-0-10_2.11-2.4.5-hw-ei-310012.jar,/opt/Bigdata/client/Spark2x/spark/jars/streamingClient010/kafka-clients-2.4.0-hw-ei-310012.jar \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /opt/Bigdata/client/Hudi/hudi/lib/hudi-utilities_2.11-0.7.0-hw-ei-310012.jar` \
--props hdfs://hacluster/tmp/config.properties \
--target-base-path hdfs://haclustet/user/hudi/test/ \
--source-ordering-field score \
--table-type MERGE_ON_READ \
--target-table test \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--enable-sync \
--op UPSERT \
--continuous
Hudi的使用场景举例
传统数仓如Hive等,没有更新和删除语义,和传统数据库等有较大差别。当需要将数据库更新数据写入Hive时,往往需要将整个分区的数据读取出来,使用Spark等离线引擎进行数据合并后,再将数据进行覆写,数据延迟达到小时级甚至天级。
而在使用Hudi的数据湖中,只需要利用Hudi的upsert语义即可将数据库中的更新实时同步待数据湖数据当中,实现数据延迟T+1到T+0的优化。
在该场景中,首先使用CDC工具将数据库的binlog解析出来,写入到消息队列中,再由DeltaStreamer拉取消息队列中的数据写入Hudi,如图:
数据实时写入Hudi时,利用DeltaStreamer同步Hive表的功能,新版本的SparkSQL、Hive、Hute等查询引擎即可保持SQL不变,进行实时视图的查询。
总结
华为云提供了大数据MapReduce服务(MRS),MRS是一个在华为云上部署和管理Hadoop系统的服务,一键即可部署Hadoop集群。MRS提供租户完全可控的一站式企业级大数据集群云服务,完全兼容开源接口,结合华为云计算、存储优势及大数据行业经验,为客户提供高性能、低成本、灵活易用的全栈大数据平台,轻松运行Hadoop、Spark、Flink、Hudi等大数据组件,并具备在后续根据业务需要进行定制开发的能力,帮助企业快速构建海量数据信息处理系统,并通过对海量信息数据实时与非实时的分析挖掘,发现全新价值点和企业商机。MRS最新版本中已集成Hudi,客户可以在一键部署集群基于Hudi构建数据湖,更好地发掘数据价值。
- 点赞
- 收藏
- 关注作者
评论(0)