高并发下Spark任务driver内存溢出调优
一、问题背景
客户生产环境中,Spark任务读写hdfs数据,每天的数据量大约2千亿,期间做大量的shuffle操作,并发启动30个jobs运行,分配给driver的堆内内存为8G,运行4小时即无法正常工作,老年代堆内存不断Full GC,内存使用率几乎达到100%,任务失败。
二、分析步骤
分析driver的内存dump文件,内存使用异常主要集中在以下4个类。
(1) org.apache.spark.storage.memory.MemoryStore,占用内存2.4GB,占比约35%
(2) org.apache.spark.status.TaskDataWrapper,占用内存1.4GB,占比约20%
(3) org.apache.spark.MapOutputTrackerMaster,占用内存1GB,占比14%
(4) org.apache.spark.status.ElementTrackingStore,占用内存1GB,占比14%
主要有以下两个原因导致driver内存不能及时释放,累积到一定程度之后内存溢出:
1、单线程Spark Context Cleaner清理Mapstatus和Broadcast等对象存在瓶颈
类org.apache.spark.storage.memorMemoryStore存放了超过2个万个java.util.LinkedHashMap
这些LinkedHashMap用于存放Broadcast对象
据了解,业务侧每天有两千亿的数据做shuffle操作,从而在shuffle过程中生成了大量的Broadcast对象储存在driver中。
类org.apache.spark.MapOutputTrackerMaster存放了6287个ConcurrentHashMap
这些LinkedHashMap用于存放org.apache.spark.ShuffleStatus对象
这些对象同样来自于shuffle期间产生的,由于executor数量两千多个,shuffle的数据量很高,需要产生大量的ShuffleStatus,用于存储blockid的位置和状态。
于是,我们自然联想到,如此大的数据量之下,如果这些对象没有清理机制,内存一下子就爆掉。在ContextCleaner中,实现了上述对象清理机制,通过start()拉起了两个线程,一个是gc线程,负责定时对该对象作垃圾回收工作,并把回收到的结果放到队列referenceQueue中,另外一个线程是Spark Context Cleaner,当检查到队列中有被回收的对象时,针对不同的对象,调用不同的方法进行清理:
/** Start the cleaner. */
def start(): Unit = {
cleaningThread.setDaemon(true)
cleaningThread.setName("Spark Context Cleaner")
cleaningThread.start()
periodicGCService.scheduleAtFixedRate(new Runnable {
override def run(): Unit = System.gc()
}, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
}
private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.foreach { ref =>
logDebug("Got cleaning task " + ref.task)
referenceBuffer.remove(ref)
ref.task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
}
}
}
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning thread", e)
}
}
}
之前我们分析到占用的内存的对象是ShuffleStatus和broadcast,这些对象的清理通过调用keepCleaning()中doCleanupBroadcast方法做清理工作,而单线程Spark Context Cleaner对高并发、大数据下,会产生大量的上述对象,不能够及时处理。刚好doCleanupBroadcast方法中的清理机制为默认为阻塞状态,由参数
spark.cleaner.referenceTracking.blocking控制,当为非阻塞模式时,不需要等待返回结果就可以做下一步的清理对象,因此,可以通过设置spark.cleaner.referenceTracking.blocking = false加快清理Broadcast对象。
/** Remove all blocks belonging to the given broadcast. */
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) {
val future = driverEndpoint.askSync[Future[Seq[Int]]](
RemoveBroadcast(broadcastId, removeFromMaster))
future.failed.foreach(e =>
logWarning(s"Failed to remove broadcast $broadcastId" +
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
}
2、Application存储的中间状态数据清理缓慢
而org.apache.spark.status.TaskDataWrapper和org.apache.spark.status.ElementTrackingStore存储了大量的application中间状态数据。先讨论org.apache.spark.status.TaskDataWrapper:
该类中存放的对象,一共3432672个,每个488byte,共占内存大概1.5G
从单个对象来看,org.apache.spark.status.TaskDataWrapper包括以下成员:
可以看到占用内存较大部分主要是accumulatorUpdates,包含2509486个AccumulableInfo,每个160byte,共计380M的内存,经确认,这是业务侧代码频繁调用累计器计算所产生的对象。
而org.apache.spark.status.ElementTrackingStore主要包括以下几个对象:
(1)org.apache.spark.status.JobDataWrapper
(2)org.apache.spark.status.StageDataWrapper
(3)org.apache.spark.status.RDDOperationGraphWrapper
(4)org.apache.spark.status.ExecutorStageSummaryWrapper
分析方法类似,这些对象都是Application存储的中间状态数据,包括了job、stage、task等信息,缓存到driver的InMemoryStore中,最终显示到Spark web ui界面上。
针对以上对象,具体的实现在类AppStatusListener中,负责将application过程的的相关信息,job、stage、task等中间数据写入到driver中,并做了相应的清理机制。 我们所关心的清理机制如下:
kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS))
{ count => cleanupExecutors(count) }
kvstore.addTrigger(classOf[JobDataWrapper], conf.get(MAX_RETAINED_JOBS)) { count =>
cleanupJobs(count)
}
kvstore.addTrigger(classOf[StageDataWrapper], conf.get(MAX_RETAINED_STAGES)) { count =>
cleanupStages(count)
}
这里调用了触发器,当给定的数据类型的数量达到一定的阈值之后,就会执行以下的动作,分别是清理上面所提到的几个对象:
{ count => cleanupExecutors(count) }
cleanupJobs(count)
cleanupStages(count)
以cleanupJobs(count)为例分析:
当对象JobDataWrapper的数据累计达到MAX_RETAINED_JOBS时候(默认为1000),会触发对JobDataWrapper对象遍历做清理操作:
val MAX_RETAINED_JOBS = ConfigBuilder("spark.ui.retainedJobs")
.intConf
.createWithDefault(1000)
private def cleanupJobs(count: Long): Unit = {
val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_JOBS))
if (countToDelete <= 0L) {
return
}
val view = kvstore.view(classOf[JobDataWrapper]).index("completionTime").first(0L)
val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN
}
toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
}
其他的对象的清理机制类似,考虑到现网任务一个小时内的task数据达到百万级别,因此期间产生的stage、job、task信息巨大,缓存到driver端,共计几个G内存。因此,我们可以加快清理这些对象的频率,使得缓存到driver端的内存及时释放,需要在客户端或者提交命令中增加调整以下参数值:
spark.ui.retainedJobs = 10
spark.ui.retainedStages = 100
spark.ui.retainedTasks = 100
这样可以使得application中间状态的job、stage、task等相关临时数据得到释放, 内存及时回收,产生的影响是WEB UI界面保留的历史的任务的数据变少。
三、总结
高并发、大数据量任务下的,spark中的清理机制弊端,导致了任务的性能上不足,需要我们深入分析去调优,同时也要从内核上做优化,更好的让人们方便高效使用。
- 点赞
- 收藏
- 关注作者
评论(0)