Impala - Runtime Filter的原理及实现

举报
weizisheng 发表于 2020/06/10 22:21:14 2020/06/10
【摘要】 RuntimeFilter 是Impala 2.5及更高版本中可用的优化特性 。当针对分区表进行查询,或join条件仅需要表中的一小部分数据时,Impala会在查询运行时确定适当的条件,并将该信息广播到所有正在读取数据的impalad节点,以便它们可以避免不必要的IO,并仅输出与之匹配的数据子集来避免不必要的网络传输。

上一篇介绍了Impala中Bloom Filter的实现原理,及其应用(即其在RuntimeFilter中的应用),有兴趣的童鞋可以点这回顾下。那么RuntimeFilter在Impala当中又是如何应用的呢?它在何时产生,又在何时应用以过滤数据,本文接着来学习介绍。

什么是Runtime Filter

一条SQL往往包括如下几个部分:select/join/where/group by/order by,这几个常用的算子分别对应着SQL执行计划中的project、join、filter、aggregation和sort。Runtime Filter主要关注join和filter,先看这个例子:

select A.name, B.class from PEOPLE A join CLASS B on A.classid = B.id where A.age > 10 and B.grade = 4;

A表中保存着100000条学生记录,通过'age > 10'可以过滤掉其中的40000条;B表中保存1000个班级信息,通过'grade = 4' 可以过滤掉900条。join之后产生30000条记录。从这个SQL的表名和字段名中不难分析出我们想得到什么结果。最简单的执行引擎会首先将PEOPLE表和CLASS表从存储引擎中读出来,然后将它们按照classid和id进行比较然后join,之后根据join的结果中age和grade列进行过滤,最后读取name列和classname列返回给用户。 这种执行计划最简单直接,完全按照SQL语义顺序执行,显而易见性能是最差的。

聪明点的执行引擎会把filter和project下推到数据扫描节点(Impala中此处实现HdfsScanNode),首先从HDFS读取数据,如果存储引擎有提供传入谓词的API(例如Kudu)那就再好不过了,如果没有则需要根据度上来的数据进行过滤,选择需要的列交给joinNode。上面例子中要从PEOPLE表选择classid、name和age列,从CLASS表中选择id、classname和grade列,join节点(Impala中主要实现PartitionedHashJoinNode)根据classid和id进行inner join,然后选择name和classname列输出。这类映射和谓词下推是SQL优化的基本手段了,效果如下:

那么Impala是采用了这种实现吗?并不是,Impala使用Runtime Filter技术在上述基础上进一步优化,它并不是使用SQL中写明的条件,而是在运行时生成的、动态的过滤条件。在IMpala中使用Runtime Filter的一个前提是:通常假设join的两个表一个是大表而另一个是小表,例如通常进行join的是一张事实表和一张维表。显而易见对小表的扫描(HdfsScanNode)速度要远远快于大表,这样的话就可以先对小表执行扫描操作,将输出的记录交由JoinNode,而大表则会主动等待一段时间(默认等待1000ms),JoinNode会根据小表输出的记录计算出一个过滤条件(PartitionedHashJoinNode,借助PhjBuilder来完成),这个条件就是本文的主角-Runtime Filter。接着JoinNode会将这个RF发送给执行大表扫描的HdfsScanNode,后者基于这个RF进行再次过滤,将过滤的记录输出给JoinNode,效果如下:

Runtime Filter在Impala中的作用

本质上它还是表扫描过程的一个Filter,与谓词下推不同,后者在实现读取更少的数据的同时意味着更少的记录输出,而RF仍然需要从存储引擎中先读取数据再进行过滤,无法将Filter应用到存储引擎,因此它只能减少输出的记录数。

Runtime Filter在Impala中的实现

上面已经说明了什么是Runtime Filter,那么它具体是什么样的一种Filter呢,没错就是上一篇提到的Bloom Filter(点这回顾)。JoinNode会依赖小表扫描后输出的记录生成一个BF,BF中包含了小表中等值on条件中所有的id列值,大表进行scan时依据此BF进行数据过滤。

Impala中的RF主要有两类:LocalGlobal,这主要基于不同join算法实现。常用的join算法有很多,Impala只实现了其中的两种:hash join(Impala中PartitionedHashJoinNode)和 nested loop join(Impala中NestedLoopJoinNode),后者针对一些比较特殊的场景,例如复杂数据结构的查询和非等值join,在此不做更多讨论;而hash join又分为shuffle join和broadcast join两种,前者主要针对大表之间的join,后者针对大表和小表之间的join。不论采用哪种join,最终都是基于BF实现,join节点首先根据小表(通常是右表)的输入构建BF中的hashtable,这个过程称为build阶段。然后对于大表的每一条记录进行匹配过滤以生成一条或多条记录,这个过程称为probe阶段。

  • shuffle join是通过将两个输入表分别进行shuffle,保证整个join被分割成多个完全不重叠的任务并行执行,join计算之后输出到父节点汇总

  • broadcast join是通过将小表广播,大表的数据扫描不需要网络传输而直接输出到join节点,join节点处理的是整个小表和部分大表的数据。

回到RF的种类,Local表示生成的RF不需要传输到远端的HdfsScanNode就可以直接应用(即本地的HdfsScanNode)。典型的情况时broadcast join的时候,JoinNode和左表的ScanNode是在一个Fragment中实现的(在一个线程中),由于每一个节点上运行的join都会获取到所有的右表数据,因此都能够build出完整的基于右表数据的RF信息,然后直接将这个信息交给左表的ScanNode,不需要经过任何的网络传输。Global是指全局的,例如在执行shuffle join的时候,每一个分区都只读取部分数据交给JoinNode聚合,而每一个JoinNode都只处理全局数据的一部分,因此也只能生成部分RF,它需要将这个局部的RF交给Coordinator节点进行合并,然后再由Coordinator推送到每一个大表ScanNode上,完成RF的分发。

一个RF(更准确的说是RF中BF,因为RF信息在JoinNode和ScanNode生成的时候就已经确定,但其中的BF是空的,需要等待JoinNode基于右表数据生成后发送给ScanNode)从诞生到应用的流程如下:

执行顺序:

  • 1、同时下发两个表的Scan操作,左边是大表右边是小表(相对而言),但是左表会等待一段时间,因此右表的Scan会先执行。

  • 2、右表的扫描的结果根据join键哈希传递到不同的Join节点,由Join节点执行BF的构建和RF的构建。

  • 3、Join节点完成RF的构建后将RF交给Coordinator节点(如果是Broadcast Join则会直接交给左表的Scan节点)。

  • 4、Coordinator节点将不同的RF进行merge,也就是把BF进行merge,merge之后它将这个RF分发给每一个左表Scan。

  • 5、左表会等待一段时间(默认1000ms)再开启数据扫描以尽可能的等待RF的到达,但是无论RF是否到达,左边在等待超时后都会开始扫描。迟到的RF会在到达那一刻之后被应用。(这里多说一句,Impala的实现是对整个RF列表进行遍历,依次检查是否到达<Local的RF在build之后立即同步到本地,Global的则需要等待网络传输>。个人觉得这种实现不好,因为在最坏情况下,假设有10个RF则需要等待10s。换成1s内遍历所有10个RF,可能更好?

  • 6、左表使用RF完成扫描之后将数据交给Join节点,以完成整个Join过程。


过程5的等待,是在ScanNode初始化后进行,更准确的是在上游ExecNode首次调用GetNext()获取数据时进行,以确保当前ScanNode的创建及初始化能够立即完成不受RF的影响。具体细节本文暂不介绍。

过程5的默认等待时间,可通过启动参数runtime_filter_wait_time_ms修改为其他值,比如在左右表数据量都较大的情况下,需要更多的时间以完成RF的生成,可以设置为2000或更大。也可以在impala-shell中通过set命令设置,这仅对当前会话生效。更多的配置参数这里暂不多说,可以查阅官网。

Runtime Filter在Impala中的实现细节

更多实现细节此处不做更多说明,仅抛出几个关键类,有兴趣的同学可以自行阅读源码

RuntimeFilter

实现类
RuntimeFilterBank 管理RF生产和消费
ScanNode 所有RF创建的地方
HdfsScanNode 所有RF得到应用的地方
HdfsScanNodeBase 等待所有RF到达的地方
...
其他文件格式的处理类

总结

目前RF被应用到Parquet、Orc、TEXT,Avro,RCFile和SequenceFile等多种文件格式,对查询性能的提升是非常可观。但其实RF并不总是有效的,在某种极端情况下比如join列的数据差集非常小,join两边的表并不能过滤掉很多数据,这时候进行大量的无谓的RF过滤反而会浪费资源(从BF的角度来说,每查必中)。好在Impala考虑到了这一点,会根据误检率这个指标更新RF为always true,从而减少更多的hash运算。如果生成RF的开销超过了其带来的性能提升,甚至可以完全关闭该功能,具体方法是设置RUNTIME_FILTER_MODE为off。


注:本文的图片来源于网络,因其通俗易懂因此借用。

————————————————

参考文档

https://impala.apache.org/docs/build/html/topics/impala_runtime_filtering.html

https://blog.csdn.net/yu616568/java/article/details/77073166


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。