Hive RuntimeFilter

举报
wangzhen 发表于 2020/07/13 19:28:46 2020/07/13
【摘要】 1 介绍select *from store join store_sales on (store.id = store_sales.store_id)where store.s_store_name = 'My Store'在这个语句中,事实表store_sales与维度表store通过store_id进行jion,维度表store通过s_store_name进行过滤。Sto...

1      介绍

select *

from store join store_sales on (store.id   = store_sales.store_id)

where store.s_store_name = 'My Store'

在这个语句中,事实表store_sales与维度表store通过store_id进行jion,维度表store通过s_store_name进行过滤。Store表经过过滤后得到的记录条数可能非常少,store_id的值也非常少。所以可以将store_id的最大最小值,及bloomFilter作为store_sales表的过滤条件,过滤条件甚至可以下推至存储层,减少数据的读取IO

Hive Runtime Filter的优化是基于动态分区剪裁优化,推荐先阅读《Hive 动态分区剪裁原理》之后再阅读本文。

2      Runtime filter使用

针对Hive3.1.0版本,有如下的参数与Runtime Filter有关。需要注意的是,只有开启动态分区剪裁才能开启runtime filter

参数名

默认值

描述

hive.tez.dynamic.partition.pruning

true

动态分区剪裁

hive.tez.dynamic.semijoin.reduction

true

是否开启runtime   filter

hive.tez.min.bloom.filter.entries

1000000L

Bloom过滤器最小条数

hive.tez.max.bloom.filter.entries

100000000L

Bloom过滤器最大条数

hive.tez.bloom.filter.factor

1.0


hive.tez.bigtable.minsize.semijoin.reduction

100000000L

大表的最小值

hive.tez.dynamic.semijoin.reduction.threshold

0.5

计算的cost大于0.5才会执行runtime filter

 

3      原理

3.1      物理优化

       Runtime filter的逻辑优化与动态分区剪裁的逻辑优化相同,都是先合成动态filter。这里不再赘述。Runtime filter的物理优化发生在DynamicPartitionPruningOptimization类中,当动态过滤条件的列是分区列时,执行动态分区剪裁优化,如果只是普通的列,就会执行runtime filter优化。

jjj.png

       如上图所示,当判断列为非分区字段时,执行runtime filter优化。针对每一个dynamic value filterhive执行以下操作:

1.         取出当前filterpredicate表达式中的所有列,遍历所有列,如果是非分区列执行以下runtimefilter优化,否则执行动态分区剪裁优化。

2.         给动态值表达式指向的RS的父节点增加runtime filter分支。分支的内容包括:SelectOperatorGroupByOperatorReduceSinkOperator。其中GroupByOperator会通过聚合函数计算列的最大值、最小值、和bloom filterReduce Operator会将最大值、最小值和bloom filter序列化发送给TableScanOperator。图中用虚线表示runtime filter分支与TableScanOperator的关系,实际上在Operator树中,这两者并没有父子关系。在tez生成task时才会将两者连接起来。

3.         替换TableScanOperatorpredicatemin,maxbloomfilter的表达式。min,maxbloomfilter的值在编译阶段都是不可知的,hive使用ExprNodeDynamicValueDesc对象表示动态的值,在运行时可通过该对象得到真实的值。

4.         FilterOperatorpredicate设置为true,后续优化将会把predicatefilter移除。

3.2      运行时

Runtime filter在运行时包含两个部分:

1.         Runtime filter分支会生成一个reducer,该reducer运行GroupByOperator,通过定义的minmaxbloomfilter三个聚合函数生成三个值。随后的ReducerSinkOperator会将这三个值序列化。

2.         Mapper在运行TableScanOperator之前会执行初始化操作,初始化操作中会等待runtime filter的输入,并且将runtime filter的输入反序列化为min,maxbloomfilter对象。在构建reader时,会将min,max下推到存储层。而bloom filter作为TableScanOperator的过滤条件。

这里有两个问题需要解决:

l  Mapper如何知道其需要等待哪个runtime filter的输入?

tez生成task阶段会在Mapper task中注入runtime filter的信息,mapper初始化时拿到这个信息就可以知道需要等待的runtime filter了。

l  TableScanOperatorpredicate使用了ExprNodeDynamicValueDesc对象来表示动态的值,那么在运行时如何知道真实的值?

每一个runtime filter分支都由唯一key标识,runtime filter的三个输出由key_min, key_max, key_bloomfilter标识。Mapper在初始化时,会将获取的runtime filter三个值存储到cache中,而ExprNodeDynamicValueDesc对象中也保存了一个key,在计算ExprNodeDynamicValueDesc对象的值时,起始是通过keycache中读取相应的值。

4      缺陷

目前hive runtime filterdynamic partition pruning都只是两张表直接通过join生成的,如果多张表通过多个jion间接关联,是否也可以生成runtime filter和动态分区剪裁呢?这可能是hive优化的一个方向。

5      参考文档

1.     https://issues.apache.org/jira/browse/HIVE-15269


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。