FusionInsight MRS Hudi原理解析之Clustering

一枚核桃 发表于 2021/12/28 09:50:32 2021/12/28
【摘要】 什么是Hudi的Clustering即数据布局,该服务可重新组织数据以提高查询性能,也不会影响摄取速度。Hudi的Clustering架构Hudi通过其写入客户端API提供了不同的操作,如insert/upsert/bulk_insert来将数据写入Hudi表。为了能够在文件大小和入湖速度之间进行权衡,Hudi提供了一个hoodie.parquet.small.file.limit配置来设置...

什么是Hudi的Clustering

即数据布局,该服务可重新组织数据以提高查询性能,也不会影响摄取速度。

Hudi的Clustering架构

Hudi通过其写入客户端API提供了不同的操作,如insert/upsert/bulk_insert来将数据写入Hudi表。为了能够在文件大小和入湖速度之间进行权衡,Hudi提供了一个hoodie.parquet.small.file.limit配置来设置最小文件大小。用户可以将该配置设置为“0”,以强制新数据写入新的文件组,或设置为更高的值以确保新数据被“填充”到现有小的文件组中,直到达到指定大小为止,但其会增加摄取延迟。

为能够支持快速摄取的同时不影响查询性能,我们引入了Clustering服务来重写数据以优化Hudi数据湖文件的布局。

Clustering服务可以异步或同步运行,Clustering会添加了一种新的REPLACE操作类型,该操作类型将在Hudi元数据时间轴中标记Clustering操作。

Clustering服务基于Hudi的MVCC设计,允许继续插入新数据,而Clustering操作在后台运行以重新格式化数据布局,从而确保并发读写者之间的快照隔离。

总体而言Clustering分为两个部分:

  • 调度Clustering:使用可插拔的Clustering策略创建Clustering计划。
    1. 识别符合Clustering条件的文件:根据所选的Clustering策略,调度逻辑将识别符合Clustering条件的文件。
    2. 根据特定条件对符合Clustering条件的文件进行分组。每个组的数据大小应为targetFileSize的倍数。分组是计划中定义的"策略"的一部分。此外还有一个选项可以限制组大小,以改善并行性并避免混排大量数据。
    3. 将Clustering计划以avro元数据格式保存到时间线。
  • 执行Clustering:使用执行策略处理计划以创建新文件并替换旧文件。
    1. 读取Clustering计划,并获得ClusteringGroups,其标记了需要进行Clustering的文件组。
    2. 对于每个组使用strategyParams实例化适当的策略类(例如:sortColumns),然后应用该策略重写数据。
    3. 创建一个REPLACE提交,并更新HoodieReplaceCommitMetadata中的元数据。

如何执行Clustering

  • 同步执行Clustering配置。
    df.write.format("org.apache.hudi").
    options(getQuickstartWriteConfigs).
    option(PRECOMBINE_FIELD_OPT_KEY, "ts").
    option(RECORDKEY_FIELD_OPT_KEY, "uuid").
    option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
    option(TABLE_NAME, "tableName").
    option("hoodie.parquet.small.file.limit", "0").
    option("hoodie.clustering.inline", "true").
    option("hoodie.clustering.inline.max.commits", "4").
    option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").
    option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").
    option("hoodie.clustering.plan.strategy.sort.columns", "column1,column2"). mode(Append).save("dfs://location");
  • 异步执行Clustering:
    spark-submit --master yarn --class org.apache.hudi.utilities.HoodieClusteringJob /opt/client/Hudi/hudi/lib/hudi-utilities*.jar --schedule --base-path <table_path> --table-name <table_name> --props /tmp/clusteringjob.properties --spark-memory 1g
    spark-submit --master yarn --driver-memory 16G --executor-memory 12G --executor-cores 4 --num-executors 4 --class org.apache.hudi.utilities.HoodieClusteringJob /opt/client/Hudi/hudi/lib/hudi-utilities*.jar --base-path <table_path> --instant-time 20210605112954 --table-name <table_name> --props /tmp/clusteringjob.properties --spark-memory 12g

    说明


    clusteringjob.properties中为用户自定义的clustering相关配置。例如:

    hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824

    hoodie.clustering.inline.max.commits=4


    注意


  • 分区表执行Clustering重组范围默认只对排序最大的2个分区,其他则依赖用户自定义策略。
  • Clustering的排序列不允许值存在null,是spark rdd的限制。
  • 当target.file.max.bytes的值较大时,启动Clustering执行需要提高--spark-memory,否则会导致executor内存溢出。
  • 当前clean不支持清理Clustering失败后的垃圾文件。
  • Clustering后可能出现新文件大小不等引起数据倾斜的情况。
  • cluster不支持和upsert并发。
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:cloudbbs@huaweicloud.com进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。