Spark避坑指南----UnsafeRow对象的持久化
Spark推出Tungsten计划用于提升Spark的性能与资源使用,其中为了消除JVM对象模型和GC代价,提供了UnsafeRow对象类型。它由jvm提供的sun.misc.Unsafe实现,内部存储的是二进制,继承自InternalRow,是SparkSQL中的中间算子的处理和输出数据类型。
正是由于UnsafeRow的特殊性,我们发现在某些情况下可能会无法正确序列/持久化该类型,产生数据读取不一致的情况,下面我们通过几个例子说明:
例子1:默认的RDD.saveAsObjectFile无法正确处理UnsafeRow类型
首先我们准备一些用于测试的数据表:
我们希望获取到查询`select * from emp, dept where emp.emp_dept_id = dept.dept_id`的结果,该查询对两张表进行了Join操作,后续我们希望将其RDD的数据保存,再重新读取保存的数据。
为了得到RDD的结果,我们构造一个LogicalRDD的Plan,该Plan会直接Scan RDD的数据。再通过Dataset.ofrows来得到Plan的结果
得到的正确结果如下:
同样的,我们调用rdd.saveAsObjectFile将RDD[InternalRow]持久化,再利用spark.sparkContext.objectFile读取。
得到的结果发生了问题,它在总行数不变的情况下,数据被多次复制了,数据读取不一致:
我们查看saveAsObjectFile方法,发现序列化的方式是Java的默认序列化方式,该方法无法正确序列化UnsafeRow对象。
例子2:RDD的checkpoint方法
Spark提供了checkpoint的方法帮助开发者做中间结果的持久化,开发者可以利用checkpoint将计算查询中复杂的中间结果进行缓存,减少重复计算。
其中,localCheckpoint是将结果存在executor的本地磁盘中,checkpoint是将结果存在hdfs中,checkpoint相比localCheckpoint能获得容错机制,但是性能会相对较差。
在本例中,我们仍采用之前的数据和查询,首先验证一下localCheckpoint():
得到的结果也是错误的:
由于checkpoint是惰性的,并且在实际的调用过程中会将原来的计算重新执行一遍,所以一般推荐在checkpoint之前进行cache操作,这样到了真正执行时,checkpoint会直接读取cache的数据,而不用触发二次计算:
结果和localCheckpoint一样,是错误的数据。
通过以上的例子我们发现在对UnsafeRow的类型持久化时,java的序列化方法不能起到正确的作用。UnsafeRow支持的序列化方式为Externalizable和KryoSerializable,我们再对例子2进行验证,需要做两处修改:
1) 在创建sparkSession的时候设置“spark.serializer” 为 “org.apache.spark.serializer.KryoSerializer”
2)在persist的时候选择带SER的StorageLevel
得到的结果如下:
和实际的结果是一致的。
- 点赞
- 收藏
- 关注作者
评论(0)