上千个Hive UDF迁移到Spark--Facebook实践经验
多年来,Facebook已将Hive用作主要的查询引擎,当Facebook将作业从Hive迁移到Spark SQL时,遇到了各种各样的挑战和困难,其中Hive UDF的迁移就是遇到了很多明显的问题。本文将着重介绍Facebook在将上千个Hive UDF迁移到Spark SQL时,遇到的兼容性、功能、性能方面的几个问题,以及相应的解决办法。
一、什么是Hive UDF
Hive 支持的函数分为内置函数,和用户自定义函数(UDF)。当内置函数不足以满足需求时,可以通过自定义函数逻辑注册成UDF在SQL中使用即可。
UDF分为三类:
1. 普通UDF
输入一行数据中的一列或者多列,输出单个值。
例如:
SELECT is_substring(col1, col2) AS substring FROM normal_udf
输入
col1 | col2 |
This is a test | test |
Hello world | hi |
输出
substring |
true |
false |
上述例子中,判断col2列是否是col1的子子字符串,每一行对应一个输出结果。
2. 自定义聚合函数
输入表中的一行或多行数据,输出单个结果。例如:max、min属于内置聚合函数。
例如:
SELECT CONTACT_SET(id) AS all_ids FROM dim_three_row
输入
id |
1 |
2 |
3 |
输出
all_ids |
1,2,3 |
上述例子中,COLLECT_SET方法即将id列多个值进行聚合, 用逗号分隔后输出一个值。
3.自定义表生成函数
输入表中一行数据,输出多行多列数据,即类似于生成一个表
例如:
SELECT a.id, b.col1, b.col2 FROM a LATERAL VIEW split_str(a.name) b AS name, age
输入
id | name |
1 | Jhon#21 |
2 | Lily#18 |
输出
id |
name | age |
1 | Jhon | 21 |
2 | Lily | 18 |
上述例子,split_str方法对name列进行切分,并结合literal view一起使用,将UDTF的执行结果转储为view b,再与a表进行合并输出最后结果。
二、Spark中如何执行HiveUDF
由于Hive中部分数据类型Spark并不支持。 在Spark中通过封装类,创建Hive的GenericUDF,SimpleGenericUDAF 和GenericUDTF这三种基本类型,例如下图第6行代码。然后调用Hive中的接口执行相应的方法体,再输出最终结果时,再进行解封装将结果转换为Spark中的数据类型(第10行)。
三、Hive UDF迁移到Spark SQL的困难
Facebook计算引擎主要时Hive,其中包含有一千多的UDF,UDF的执行时间占整个业务CPU时间的70%。所以UDF的迁移是整个迁移工作中很重要的一部分。经过最初的几线测试发现,其中只有58%的UDF能够被很好的兼容和支持,对于失败的测试用例所遇到的问题可以分为如下几类:
1. Spark不支持部分Hive接口
众所周知,目前Spark不支持Hive的部分接口。如下:
以上这些方法都存在兼容性问题,虽然通过业务调整可以避免一些不兼容的接口,但是getRequiredJars,getRequiredFiles这两个方法的使用特别的广泛,它们是用于自动加载需要的文件和jar包的,比如在executor中读取文件时,则需要保证该文件已经在executor的工作目录中,是否则就会出错。需要解决该兼容性问题。
解决方法:
在Spark driver端进行UDF初始化时,识别出需要用到的文件和jar包,通过SparkContext.addJar 和 addFile方法注册这些资源并分布到各个executor节点。
在executor中进行二次确认,先查看需要用到的资源是否已经在工作目录中,若已经包含则不需要处理;否则,尝试创建一个软链接到该文件的绝对路径中。这里没有直接复制文件到executor节点,是为了防止资源文件或jar包过大,带来太多额外的工作负担。
2. 线程安全
Hive中每个任务都在单独的JVM进程中运行,因此绝大多数Hive UDF的编写都没有考虑并发性问题,但是在Spark中每个executor运行在一个单独的JVM进程中,一个executor中可以同时运行多个任务。因此Hive UDF直接运行在Spark中可能会存在线程不安全的问题。
如下例子中,定义了一个static的全局变量mapping,当在Spark中运行时,可能会有多个实例同时执行该UDF,当两个实例都运行到17~19行时,实例1判断mapping为空,执行初始化,此时实例2也判断发现mapping为空,也进行初始化。实例1执行略快,当实例1初始化完成向mapping中写数据后,实例2初始化完成,覆盖了实例1中的数据,从而导致数据丢失。
解决方法1:
通过Synchronize方法加锁,对mapping的初始化进行加锁,从而避免多个进程同时初始化。
这种方法有两个弊端:1、频繁的加锁解锁导致性能下降,因为每一行数据进来都需要进行加锁操作,降低执行效率;2、代码改动量大,上千个UDF都进行这样的修改,代码工作量大。
解决方法2:
将mapping对象从static改为普通对象,每个实例使用自己内部的mapping对象。
缺点:实例之间数据和状态无法共享,消耗更多的内存,因为每个实例都需要保存一份该数据;
优点:代码改动小,易于操作。
3. 序列化和反序列化
在Spark中,UDF的对象在driver端进行初始化、序列化,然后分发给executor节点,在executor中再进行反序列化读取对象。这样的序列化和反序列化中有一些问题,一些类型可以用Kryo进行序列化但是却不能正确的反序列化,例如:guava的ImmutableSet类型。
解决方法:
1、 对于公共的常用类型,可以自定义序列化方法,或者引入已有的序列化和反序列化方法;
2、 对于无法正确序列化和反序列化的对象,加上transient修饰词。即:该对象再driver端只做初始化,不进行序列化,在executor中需要使用该对象是,重新进行初始化。
4. 性能问题
由于Spark和Hive都不支持对方的一些数据类型,在Spark中执行Hive UDF时,先将Spark的数据类型转换为Hive的inspectors或Java类型,在Hive中进行计算,执行完成后再将结果从Hive类型转换为Spark类型,如下图红框中的部分代码。
这种封装和解封的过程,比Spark原生的UDF需要多花2倍的CPU,对于复杂的数据类型,如:map, array, structure等则需要花费更多的时间。Facebook的UDF在Spark计算中占用15%的CPU时间,因此,对于耗时最长的那些Hive UDF,可以将其转换为Spark原生的UDF。
四、Partial Aggregate
Hive中max函数即为聚合函数,其执行流程如下图, Mapper端将所有数据进行shuffle,在Reducer端将相同key的数据读取到同一个reducer中进行max的计算。
SELECT id, max(value) FROM t1 GROUP BY id
这种计算方法有两个问题:(1)每一条数据都需要通过网络shuffle传播,带来的网络压力很大;(2)当存在数据倾斜时,个别的reducer处理时长将明显比其他reducer长,出现大部分reducer空闲,等待某一个reducer执行的情况,从而导致整个作业的执行时间很长。
同样的sql和UDF,Partial Aggregation计算过程如下,每一个mapper先将自己本地的数据执行一遍部分聚合,再将聚合后的结果进行shuffle,reducer再对所有的部分聚合结果进行全局的聚合。
优点:不必要所有的数据都通过网络进行shuffle传输,减轻网络压力;部分计算压力移到mapper端,减轻reducer压力,并且在一定程度上可以减轻数据倾斜带来的弊端。
Spark已经支持了Partial Aggregation,通过一定的适配Hive UDF使其可以支持Partial Aggregation后,整体的工作性能提升,CPU提升20%,shuffle的数据量减少17%。但是却存在部分UDF性能变差,最差的变慢了300%。
经过分析发现有两种情况会导致性能变慢:
(1) 查询规模:列扩展的情况下,可能会导致shuffle的数据量变多。如下例子,查询的列包含了min, max,count,avg,经过partial aggregation mapper端输出列从原来的一列,变为5列,但是mapper端合并的数据量却很少,从而导致整体的shuffle数据量增多。
(2) 数据分布:在一些分布情况下,mapper端数据不能进行合并或者合并的极少,导致数据量没有变少,但是却增加了额外的CPU时间。
解决方法:
我们可以看出,影响Partial aggregation性能的有这三个因素: UDAF的性能、列扩张、行缩减。Facebook提出一种基于代价的Optimizer,利用输入列数、输出的列数、UDAF的计算代价等等属性,计算出计算代价,从而决定是否使用Partial aggregation。
经过该方法,整体的性能有所提升,但是并不能解决所有的UDAF的执行性能变慢的问题,因为“行缩减”这个因素并没有一个好的衡量标准。因为,数据集每天的分布不一定相同、即使是相同的数据集在不同聚合函数下分布情况是不一样的,所以“行缩减”的程度较难量化。
五、未来工作
根据以上分析和介绍,其中性能的提升还有待进一步优化。将考虑一种基于历史作业的开关,决定是否使用Partial aggregation。基于历史作业的eventLog,可以得到很多的统计信息,包括各个节点的执行时间、输出的数据行数等。基于这些信息可以做出更好的预测是否使用Partial aggregation。
近年来,越来越多的论文中提到,利用历史作业的执行信息进行作业优化,比如:提取高价值SubExpression等,进行物化提升具有相同子结构作业的执行时间,相信未来eventLog的利用将越来越多样,越成熟。
引用
https://databricks.com/session/supporting-over-a-thousand-custom-hive-user-defined-functions
- 点赞
- 收藏
- 关注作者
评论(0)