上新!MRS集成Hudi

举报
数据湖爱好者 发表于 2021/07/29 11:52:28 2021/07/29
【摘要】 Hudi是数据湖的文件组织层,对Parquet格式文件进行管理提供数据湖能力,支持多种计算引擎,提供IUD接口,在 HDFS/OBS的数据集上提供了插入更新和增量拉取的流原语。图1 Hudi基本架构Hudi特性 ACID事务能力,支持实时入湖和批量入湖。 多种视图能力(读优化视图/增量视图/实时视图),支持快速数据分析。 MVCC设计,支持数据版本回溯。 自动管理文件大小和布局,以优化查询性...

Hudi是数据湖的文件组织层,对Parquet格式文件进行管理提供数据湖能力,支持多种计算引擎,提供IUD接口,在 HDFS/OBS的数据集上提供了插入更新和增量拉取的流原语。

001.png

1 Hudi基本架构

Hudi特性

  • ACID事务能力,支持实时入湖和批量入湖。
  • 多种视图能力(读优化视图/增量视图/实时视图),支持快速数据分析。
  • MVCC设计,支持数据版本回溯。
  • 自动管理文件大小和布局,以优化查询性能准实时摄取,为查询提供最新数据。
  • 支持并发读写,基于snapshot的隔离机制实现写入时可读取。
  • 支持原地转表,将存量的历史表转换为Hudi数据集。

Hudi关键技术和优势

  • 可插拔索引机制:Hudi提供多种索引机制,可以快速完成对海量数据的更新和删除操作。
  • 良好的生态支持:Hudi支持多种数据引擎接入包括HiveSparkFlink

Hudi支持两种表类型

  • Copy On Write

写时复制表也简称cow表,使用parquet文件存储数据,内部的更新操作需要通过重写原始parquet文件完成。

  • 优点 读取时,只读取对应分区的一个数据文件即可,较为高效
  • 缺点 数据写入的时候,需要复制一个先前的副本再在其基础上生成新的数据文件,这个过程比较耗时。且由于耗时,读请求读取到的数据相对就会滞后
  • Merge On Read

读时合并表也简称mor表,使用列格式parquet和行格式Avro两种方式混合存储数据。其中parquet格式文件用于存储基础数据,Avro格式文件(也可叫做log文件)用于存储增量数据。

  • 优点 由于写入数据先写delta log,且delta log较小,所以写入成本较低
  • 缺点 需要定期合并整理compact,否则碎片文件较多。读取性能较差,因为需要将delta log 老数据文件合并

Hudi支持三种视图,针对不同场景提供相应的读能力

  • Snapshot View

实时视图:该视图提供当前hudi表最新的快照数据,即一旦有最新的数据写入hudi表,通过该视图就可以查出刚写入的新数据。

cow表和mor均支持这种视图能力。

  • Incremental View

增量视图:该视图提供增量查询的能力,可以查询指定COMMIT之后的增量数据,可用于快速拉取增量数据。

cow表支持该种视图能力, mor表也可以支持该视图,但是一旦mor表完成compact操作其增量视图能力消失。

  • Read Optimized View

读优化视图:该视图只会提供最新版本的parquet文件中存储的数据。

该视图在cow表和mor表上表现不同:

对于cow表,该视图能力和实时视图能力是一样的(cow表只用parquet文件存数据)。

对于mor表,仅访问基本文件,提供给定文件片自上次执行compact操作以来的数据, 可简单理解为该视图只会提供morparquet文件存储的数据,log文件里面的数据将被忽略。 该视图数据并不一定是最新的,但是mor表一旦完成compact操作,增量log数据被合入到了base数据里面,这个时候该视图和实时视图能力一样。

Hudi数据的实时写入

Hudi支持多种写入方式,实时写入方面MRS 3.1.0中集成了Hudi基于Spark的写入工具DeltaStreamer,不久的将来MRS将在升级Hudi 0.8.0版本,实现了对FlinkSQL写入Hudi的支持。

DeltaStreamer支持单次写入和持续写入两种方式,在持续写入的模式中,支持自动调度compactionclean任务。同时,DeltaStreamer还支持将同步创建Hive表和分区,便于使用SparkSQLHetu等引擎直接通过Hive表查询Hudi数据。

一般场景下,DeltaStreamer通过配置即可拉起任务。以读取Kafka json格式数据为例,首先编写源和目的数据schema,形如:

{

  "namespace":"hoodie",

  "type": "kafka",

  "name": "test",

  "fields": [

    {

      "name": "name",

      "type": "string"

    },

    {

      "name": "id",

      "type": "string"

    },

    {

      "name": "score",

      "type": "int"

    },

    {

      "name": "grade",

      "type": "int"

    }

  ]

}

然后,编辑DeltaStreamer运行配置:

# Kafka源数据schema

hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://hacluster/tmp/sourceschema.json

# Hudi目的数据schema

hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://hacluster/tmp/targetschema.json

# 进行precombine的字段,当两条数据有相同主键时,根据该字段选择值更大的数据写入,值大小使用Object.compareTo(..)判断

hoodie.datasource.write.precombine.field=score

# 数据主键

hoodie.datasource.write.recordkey.field=id

# 分区字段

hoodie.datasource.write.partitionpath.field=grade

# 是否启动通过Hive表

hoodie.datasource.hive_sync.enable=true

# 是否使用元数据方式同步Hive表

hoodie.datasource.meta.sync.enable=true

# Hive表同步使用的database

hoodie.datasource.hive_sync.database=default

# 同步到Hive的表名

hoodie.datasource.hive_sync.table=test

# 同步Hive表使用的工具类

hoodie.meta.sync.client.tool.class=org.apache.hudi.hive.HiveSyncTool

# 同步Hive表的数据格式

hoodie.datasource.hive_sync.base_file_format=PARQUET

# 同步Hive表的分区字段

hoodie.datasource.hive_sync.partition_fields=grade

# Hive的JDBC连接地址

hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://zk_ip_1:2181,zk_ip_2:2181,zk_ip_3:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2

 

# Kafka Source topic

hoodie.deltastreamer.source.kafka.topic=zhongyushuo

 

# Kafka props

# Kafka集群Bootstrap Server地址

bootstrap.servers= 192.168.0.231:9092,192.168.0.83:9092,192.168.0.240:9092

配置文件编辑完成后,使用spark-submit命令提交任务,任务提交命令中可以指定表类型,操作类型(insert/bulk_insert/upsert)。

UPSERT(插入更新)是默认行为,写入是将根据索引,如果当前没有与要写入数据主键相同的数据则直接插入,如果存在与要写入数据主键相同的数据,则进行更新操作。

INSERT(插入),与upsert类似,但是不会进行主键的索引查找,允许数据重复。

BULK_INSERT(批插入)语义与INSERT相同,但是对于批量写入场景效率更高。

DeltaStreamer任务提交命令如下:

source /opt/Bigdata/client/bigdata_env

source /opt/Bigdata/client/Hudi/component_env

spark-submit \

--name kafka2hudi \

--master yarn \

--deploy-mode cluster \

--driver-memory 4g \

--executor-memory 6g \

--executor-cores 4 \

--num-executors 4  \

--conf spark.kryoserializer.buffer.max=128m \

--conf spark.yarn.executor.memoryOverhead=4g \

--driver-class-path /opt/Bigdata/client/Hudi/hudi/conf:/opt/Bigdata/client/Hudi/hudi/lib/*:/opt/Bigdata/client/Spark2x/spark/jars/*: \

--jars /opt/Bigdata/client/Spark2x/spark/jars/streamingClient010/spark-streaming-kafka-0-10_2.11-2.4.5-hw-ei-310012.jar,/opt/Bigdata/client/Spark2x/spark/jars/streamingClient010/kafka-clients-2.4.0-hw-ei-310012.jar \

--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /opt/Bigdata/client/Hudi/hudi/lib/hudi-utilities_2.11-0.7.0-hw-ei-310012.jar` \

--props hdfs://hacluster/tmp/config.properties \

--target-base-path hdfs://haclustet/user/hudi/test/ \

--source-ordering-field score \

--table-type MERGE_ON_READ \

--target-table test \

--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \

--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \

--enable-sync \

--op UPSERT \

--continuous

Hudi的使用场景举例

传统数仓如Hive等,没有更新和删除语义,和传统数据库等有较大差别。当需要将数据库更新数据写入Hive时,往往需要将整个分区的数据读取出来,使用Spark等离线引擎进行数据合并后,再将数据进行覆写,数据延迟达到小时级甚至天级。

而在使用Hudi的数据湖中,只需要利用Hudi的upsert语义即可将数据库中的更新实时同步待数据湖数据当中,实现数据延迟T+1到T+0的优化。

在该场景中,首先使用CDC工具将数据库的binlog解析出来,写入到消息队列中,再由DeltaStreamer拉取消息队列中的数据写入Hudi,如图:

002.png

数据实时写入Hudi时,利用DeltaStreamer同步Hive表的功能,新版本的SparkSQL、Hive、Hute等查询引擎即可保持SQL不变,进行实时视图的查询。

总结

华为云提供了大数据MapReduce服务(MRS),MRS是一个在华为云上部署和管理Hadoop系统的服务,一键即可部署Hadoop集群。MRS提供租户完全可控的一站式企业级大数据集群云服务,完全兼容开源接口,结合华为云计算、存储优势及大数据行业经验,为客户提供高性能、低成本、灵活易用的全栈大数据平台,轻松运行Hadoop、Spark、Flink、Hudi等大数据组件,并具备在后续根据业务需要进行定制开发的能力,帮助企业快速构建海量数据信息处理系统,并通过对海量信息数据实时与非实时的分析挖掘,发现全新价值点和企业商机。MRS最新版本中已集成Hudi,客户可以在一键部署集群基于Hudi构建数据湖,更好地发掘数据价值。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。