上千个Hive UDF迁移到Spark--Facebook实践经验

举报
aloe 发表于 2020/06/15 18:40:17 2020/06/15
【摘要】 随着Spark SQL计算引擎的发展,Spark批处理的能力和性能也越来越成熟。很多大数据平台开始将自己的业务从Hive迁移到Spark,Facebook就是其中之一。在迁移过程中难免出现不兼容、性能下降等问题,本文将针对Facebook Hive UDF迁移到Spark过程中遇到的各种问题进行介绍。

    多年来,Facebook已将Hive用作主要的查询引擎,当Facebook将作业从Hive迁移到Spark SQL时,遇到了各种各样的挑战和困难,其中Hive UDF的迁移就是遇到了很多明显的问题。本文将着重介绍Facebook在将上千个Hive UDF迁移到Spark SQL时,遇到的兼容性、功能、性能方面的几个问题,以及相应的解决办法。


一、什么是Hive UDF

Hive 支持的函数分为内置函数,和用户自定义函数(UDF)。当内置函数不足以满足需求时,可以通过自定义函数逻辑注册成UDF在SQL中使用即可。

UDF分为三类:

1. 普通UDF

 输入一行数据中的一列或者多列,输出单个值。

  例如: 

SELECT is_substring(col1, col2) AS substring FROM normal_udf

  输入

col1 col2
This is a test test
Hello world hi

  输出          

substring
true
false

  上述例子中,判断col2列是否是col1的子子字符串,每一行对应一个输出结果。


2. 自定义聚合函数

 输入表中的一行或多行数据,输出单个结果。例如:max、min属于内置聚合函数。

  例如:

SELECT CONTACT_SET(id) AS all_ids FROM dim_three_row

  输入

id
1
2
3

  输出

all_ids
1,2,3

  上述例子中,COLLECT_SET方法即将id列多个值进行聚合, 用逗号分隔后输出一个值。


3.自定义表生成函数

  输入表中一行数据,输出多行多列数据,即类似于生成一个表

  例如:

SELECT a.id, b.col1, b.col2 FROM a LATERAL VIEW split_str(a.name) b AS name, age

  输入

id name
1 Jhon#21
2 Lily#18

  输出

id
name age
1 Jhon 21
2 Lily 18

  上述例子,split_str方法对name列进行切分,并结合literal view一起使用,将UDTF的执行结果转储为view b,再与a表进行合并输出最后结果。

二、Spark中如何执行HiveUDF

由于Hive中部分数据类型Spark并不支持。 在Spark中通过封装类,创建Hive的GenericUDF,SimpleGenericUDAF 和GenericUDTF这三种基本类型,例如下图第6行代码。然后调用Hive中的接口执行相应的方法体,再输出最终结果时,再进行解封装将结果转换为Spark中的数据类型(第10行)。

三、Hive UDF迁移到Spark SQL的困难

Facebook计算引擎主要时Hive,其中包含有一千多的UDF,UDF的执行时间占整个业务CPU时间的70%。所以UDF的迁移是整个迁移工作中很重要的一部分。经过最初的几线测试发现,其中只有58%的UDF能够被很好的兼容和支持,对于失败的测试用例所遇到的问题可以分为如下几类:

1.  Spark不支持部分Hive接口

        众所周知,目前Spark不支持Hive的部分接口。如下:

        以上这些方法都存在兼容性问题,虽然通过业务调整可以避免一些不兼容的接口,但是getRequiredJars,getRequiredFiles这两个方法的使用特别的广泛,它们是用于自动加载需要的文件和jar包的,比如在executor中读取文件时,则需要保证该文件已经在executor的工作目录中,是否则就会出错。需要解决该兼容性问题。

        解决方法:

         在Spark driver端进行UDF初始化时,识别出需要用到的文件和jar包,通过SparkContext.addJar 和 addFile方法注册这些资源并分布到各个executor节点。

         在executor中进行二次确认,先查看需要用到的资源是否已经在工作目录中,若已经包含则不需要处理;否则,尝试创建一个软链接到该文件的绝对路径中。这里没有直接复制文件到executor节点,是为了防止资源文件或jar包过大,带来太多额外的工作负担。

2. 线程安全

        Hive中每个任务都在单独的JVM进程中运行,因此绝大多数Hive UDF的编写都没有考虑并发性问题,但是在Spark中每个executor运行在一个单独的JVM进程中,一个executor中可以同时运行多个任务。因此Hive UDF直接运行在Spark中可能会存在线程不安全的问题。

        如下例子中,定义了一个static的全局变量mapping,当在Spark中运行时,可能会有多个实例同时执行该UDF,当两个实例都运行到17~19行时,实例1判断mapping为空,执行初始化,此时实例2也判断发现mapping为空,也进行初始化。实例1执行略快,当实例1初始化完成向mapping中写数据后,实例2初始化完成,覆盖了实例1中的数据,从而导致数据丢失。

        解决方法1:

         通过Synchronize方法加锁,对mapping的初始化进行加锁,从而避免多个进程同时初始化。

         这种方法有两个弊端:1、频繁的加锁解锁导致性能下降,因为每一行数据进来都需要进行加锁操作,降低执行效率;2、代码改动量大,上千个UDF都进行这样的修改,代码工作量大。

        解决方法2:

        将mapping对象从static改为普通对象,每个实例使用自己内部的mapping对象。

        缺点:实例之间数据和状态无法共享,消耗更多的内存,因为每个实例都需要保存一份该数据;

        优点:代码改动小,易于操作。

3. 序列化和反序列化

        在Spark中,UDF的对象在driver端进行初始化、序列化,然后分发给executor节点,在executor中再进行反序列化读取对象。这样的序列化和反序列化中有一些问题,一些类型可以用Kryo进行序列化但是却不能正确的反序列化,例如:guava的ImmutableSet类型。

        解决方法:

        1、  对于公共的常用类型,可以自定义序列化方法,或者引入已有的序列化和反序列化方法;

        2、  对于无法正确序列化和反序列化的对象,加上transient修饰词。即:该对象再driver端只做初始化,不进行序列化,在executor中需要使用该对象是,重新进行初始化。

4.  性能问题

        由于Spark和Hive都不支持对方的一些数据类型,在Spark中执行Hive UDF时,先将Spark的数据类型转换为Hive的inspectors或Java类型,在Hive中进行计算,执行完成后再将结果从Hive类型转换为Spark类型,如下图红框中的部分代码。

        这种封装和解封的过程,比Spark原生的UDF需要多花2倍的CPU,对于复杂的数据类型,如:map, array, structure等则需要花费更多的时间。Facebook的UDF在Spark计算中占用15%的CPU时间,因此,对于耗时最长的那些Hive UDF,可以将其转换为Spark原生的UDF。

四、Partial Aggregate

    Hive中max函数即为聚合函数,其执行流程如下图, Mapper端将所有数据进行shuffle,在Reducer端将相同key的数据读取到同一个reducer中进行max的计算。

SELECT id, max(value) FROM t1 GROUP BY id


    这种计算方法有两个问题:(1)每一条数据都需要通过网络shuffle传播,带来的网络压力很大;(2)当存在数据倾斜时,个别的reducer处理时长将明显比其他reducer长,出现大部分reducer空闲,等待某一个reducer执行的情况,从而导致整个作业的执行时间很长。

    同样的sql和UDF,Partial Aggregation计算过程如下,每一个mapper先将自己本地的数据执行一遍部分聚合,再将聚合后的结果进行shuffle,reducer再对所有的部分聚合结果进行全局的聚合。

    优点:不必要所有的数据都通过网络进行shuffle传输,减轻网络压力;部分计算压力移到mapper端,减轻reducer压力,并且在一定程度上可以减轻数据倾斜带来的弊端。


    Spark已经支持了Partial Aggregation,通过一定的适配Hive UDF使其可以支持Partial Aggregation后,整体的工作性能提升,CPU提升20%,shuffle的数据量减少17%。但是却存在部分UDF性能变差,最差的变慢了300%。

        

    经过分析发现有两种情况会导致性能变慢:

    (1) 查询规模:列扩展的情况下,可能会导致shuffle的数据量变多。如下例子,查询的列包含了min, max,count,avg,经过partial aggregation mapper端输出列从原来的一列,变为5列,但是mapper端合并的数据量却很少,从而导致整体的shuffle数据量增多。

    (2) 数据分布:在一些分布情况下,mapper端数据不能进行合并或者合并的极少,导致数据量没有变少,但是却增加了额外的CPU时间。


    解决方法:

         我们可以看出,影响Partial aggregation性能的有这三个因素: UDAF的性能、列扩张、行缩减。Facebook提出一种基于代价的Optimizer,利用输入列数、输出的列数、UDAF的计算代价等等属性,计算出计算代价,从而决定是否使用Partial aggregation。

         经过该方法,整体的性能有所提升,但是并不能解决所有的UDAF的执行性能变慢的问题,因为“行缩减”这个因素并没有一个好的衡量标准。因为,数据集每天的分布不一定相同、即使是相同的数据集在不同聚合函数下分布情况是不一样的,所以“行缩减”的程度较难量化。


五、未来工作


根据以上分析和介绍,其中性能的提升还有待进一步优化。将考虑一种基于历史作业的开关,决定是否使用Partial aggregation。基于历史作业的eventLog,可以得到很多的统计信息,包括各个节点的执行时间、输出的数据行数等。基于这些信息可以做出更好的预测是否使用Partial aggregation。

近年来,越来越多的论文中提到,利用历史作业的执行信息进行作业优化,比如:提取高价值SubExpression等,进行物化提升具有相同子结构作业的执行时间,相信未来eventLog的利用将越来越多样,越成熟。


引用

https://databricks.com/session/supporting-over-a-thousand-custom-hive-user-defined-functions

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

举报
请填写举报理由
0/200