Hive RuntimeFilter
1 介绍
select * from store join store_sales on (store.id = store_sales.store_id) where store.s_store_name = 'My Store' |
在这个语句中,事实表store_sales与维度表store通过store_id进行jion,维度表store通过s_store_name进行过滤。Store表经过过滤后得到的记录条数可能非常少,store_id的值也非常少。所以可以将store_id的最大最小值,及bloomFilter作为store_sales表的过滤条件,过滤条件甚至可以下推至存储层,减少数据的读取IO。
Hive Runtime Filter的优化是基于动态分区剪裁优化,推荐先阅读《Hive 动态分区剪裁原理》之后再阅读本文。
2 Runtime filter使用
针对Hive3.1.0版本,有如下的参数与Runtime Filter有关。需要注意的是,只有开启动态分区剪裁才能开启runtime filter。
参数名 |
默认值 |
描述 |
hive.tez.dynamic.partition.pruning |
true |
动态分区剪裁 |
hive.tez.dynamic.semijoin.reduction |
true |
是否开启runtime filter |
hive.tez.min.bloom.filter.entries |
1000000L |
Bloom过滤器最小条数 |
hive.tez.max.bloom.filter.entries |
100000000L |
Bloom过滤器最大条数 |
hive.tez.bloom.filter.factor |
1.0 |
|
hive.tez.bigtable.minsize.semijoin.reduction |
100000000L |
大表的最小值 |
hive.tez.dynamic.semijoin.reduction.threshold |
0.5 |
计算的cost大于0.5才会执行runtime filter |
3 原理
3.1 物理优化
Runtime filter的逻辑优化与动态分区剪裁的逻辑优化相同,都是先合成动态filter。这里不再赘述。Runtime filter的物理优化发生在DynamicPartitionPruningOptimization类中,当动态过滤条件的列是分区列时,执行动态分区剪裁优化,如果只是普通的列,就会执行runtime filter优化。
如上图所示,当判断列为非分区字段时,执行runtime filter优化。针对每一个dynamic value filter,hive执行以下操作:
1. 取出当前filter的predicate表达式中的所有列,遍历所有列,如果是非分区列执行以下runtimefilter优化,否则执行动态分区剪裁优化。
2. 给动态值表达式指向的RS的父节点增加runtime filter分支。分支的内容包括:SelectOperator、GroupByOperator、ReduceSinkOperator。其中GroupByOperator会通过聚合函数计算列的最大值、最小值、和bloom filter。Reduce Operator会将最大值、最小值和bloom filter序列化发送给TableScanOperator。图中用虚线表示runtime filter分支与TableScanOperator的关系,实际上在Operator树中,这两者并没有父子关系。在tez生成task时才会将两者连接起来。
3. 替换TableScanOperator的predicate为min,max和bloomfilter的表达式。min,max和bloomfilter的值在编译阶段都是不可知的,hive使用ExprNodeDynamicValueDesc对象表示动态的值,在运行时可通过该对象得到真实的值。
4. FilterOperator的predicate设置为true,后续优化将会把predicate的filter移除。
3.2 运行时
Runtime filter在运行时包含两个部分:
1. Runtime filter分支会生成一个reducer,该reducer运行GroupByOperator,通过定义的min、max、bloomfilter三个聚合函数生成三个值。随后的ReducerSinkOperator会将这三个值序列化。
2. Mapper在运行TableScanOperator之前会执行初始化操作,初始化操作中会等待runtime filter的输入,并且将runtime filter的输入反序列化为min,max和bloomfilter对象。在构建reader时,会将min,max下推到存储层。而bloom filter作为TableScanOperator的过滤条件。
这里有两个问题需要解决:
l Mapper如何知道其需要等待哪个runtime filter的输入?
在tez生成task阶段会在Mapper task中注入runtime filter的信息,mapper初始化时拿到这个信息就可以知道需要等待的runtime filter了。
l TableScanOperator的predicate使用了ExprNodeDynamicValueDesc对象来表示动态的值,那么在运行时如何知道真实的值?
每一个runtime filter分支都由唯一key标识,runtime filter的三个输出由key_min, key_max, key_bloomfilter标识。Mapper在初始化时,会将获取的runtime filter三个值存储到cache中,而ExprNodeDynamicValueDesc对象中也保存了一个key,在计算ExprNodeDynamicValueDesc对象的值时,起始是通过key从cache中读取相应的值。
4 缺陷
目前hive runtime filter和dynamic partition pruning都只是两张表直接通过join生成的,如果多张表通过多个jion间接关联,是否也可以生成runtime filter和动态分区剪裁呢?这可能是hive优化的一个方向。
5 参考文档
1. https://issues.apache.org/jira/browse/HIVE-15269
- 点赞
- 收藏
- 关注作者
评论(0)