高并发下Spark任务driver内存溢出调优

举报
Yanchel 发表于 2020/07/13 00:01:33 2020/07/13
【摘要】 本文对生产环境中的Spark任务读写hdfs数据任务,其高并发、大数据量下的shuffle导致的driver内存溢出,做分析优化。

一、问题背景

客户生产环境中,Spark任务读写hdfs数据,每天的数据量大约2千亿,期间做大量的shuffle操作,并发启动30jobs运行,分配给driver的堆内内存为8G,运行4小时即无法正常工作,老年代堆内存不断Full GC,内存使用率几乎达到100%,任务失败。

二、分析步骤

分析driver的内存dump文件,内存使用异常主要集中在以下4个类。

(1)       org.apache.spark.storage.memory.MemoryStore,占用内存2.4GB,占比约35%

(2)       org.apache.spark.status.TaskDataWrapper,占用内存1.4GB,占比约20%

(3)       org.apache.spark.MapOutputTrackerMaster,占用内存1GB,占比14%

(4)       org.apache.spark.status.ElementTrackingStore,占用内存1GB,占比14%



image.png

image.png


主要有以下两个原因导致driver内存不能及时释放,累积到一定程度之后内存溢出:

1、单线程Spark Context Cleaner清理MapstatusBroadcast等对象存在瓶颈


org.apache.spark.storage.memorMemoryStore存放了超过2个万个java.util.LinkedHashMap

image.png

这些LinkedHashMap用于存放Broadcast对象

image.png

据了解,业务侧每天有两千亿的数据做shuffle操作,从而在shuffle过程中生成了大量的Broadcast对象储存在driver中。

image.png

org.apache.spark.MapOutputTrackerMaster存放了6287ConcurrentHashMap

image.png

这些LinkedHashMap用于存放org.apache.spark.ShuffleStatus对象

image.png

这些对象同样来自于shuffle期间产生的,由于executor数量两千多个,shuffle的数据量很高,需要产生大量的ShuffleStatus,用于存储blockid的位置和状态。

于是,我们自然联想到,如此大的数据量之下,如果这些对象没有清理机制,内存一下子就爆掉。在ContextCleaner中,实现了上述对象清理机制,通过start()拉起了两个线程,一个是gc线程,负责定时对该对象作垃圾回收工作,并把回收到的结果放到队列referenceQueue中,另外一个线程是Spark Context Cleaner,当检查到队列中有被回收的对象时,针对不同的对象,调用不同的方法进行清理:

/** Start the cleaner. */
def start(): Unit = {
 
cleaningThread.setDaemon(true)
 
cleaningThread.setName("Spark Context Cleaner")
 
cleaningThread.start()
 
periodicGCService.scheduleAtFixedRate(new Runnable {
   
override def run(): Unit = System.gc()
  },
periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
}


private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
 
while (!stopped) {
   
try {
     
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
        .map(_.asInstanceOf[CleanupTaskWeakReference])
     
// Synchronize here to avoid being interrupted on stop()
     
synchronized {
        reference.foreach { ref =>
          logDebug(
"Got cleaning task " + ref.task)
         
referenceBuffer.remove(ref)
          ref.task
match {
           
case CleanRDD(rddId) =>
              doCleanupRDD(rddId, blocking =
blockOnCleanupTasks)
           
case CleanShuffle(shuffleId) =>
              doCleanupShuffle(shuffleId, blocking =
blockOnShuffleCleanupTasks)
           
case CleanBroadcast(broadcastId) =>
              doCleanupBroadcast(broadcastId, blocking =
blockOnCleanupTasks)
           
case CleanAccum(accId) =>
              doCleanupAccum(accId, blocking =
blockOnCleanupTasks)
           
case CleanCheckpoint(rddId) =>
              doCleanCheckpoint(rddId)
          }
        }
      }
    }
catch {
     
case ie: InterruptedException if stopped => // ignore
     
case e: Exception => logError("Error in cleaning thread", e)
    }
  }
}
之前我们分析到占用的内存的对象是ShuffleStatus和broadcast,这些对象的清理通过调用keepCleaning()中doCleanupBroadcast方法做清理工作,而单线程Spark Context Cleaner对高并发、大数据下,会产生大量的上述对象,不能够及时处理。刚好doCleanupBroadcast方法中的清理机制为默认为阻塞状态,由参数
spark.cleaner.referenceTracking.blocking控制,当为非阻塞模式时,不需要等待返回结果就可以做下一步的清理对象,因此,可以通过设置spark.cleaner.referenceTracking.blocking = false加快清理Broadcast对象。
/** Remove all blocks belonging to the given broadcast. */
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) {
 
val future = driverEndpoint.askSync[Future[Seq[Int]]](
    RemoveBroadcast(broadcastId, removeFromMaster))
  future.failed.foreach(e =>
    logWarning(
s"Failed to remove broadcast $broadcastId" +
     
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
  )(ThreadUtils.sameThread)
 
if (blocking) {
   
timeout.awaitResult(future)
  }
}

2、Application存储的中间状态数据清理缓慢

org.apache.spark.status.TaskDataWrapperorg.apache.spark.status.ElementTrackingStore存储了大量的application中间状态数据。先讨论org.apache.spark.status.TaskDataWrapper

该类中存放的对象,一共3432672个,每个488byte,共占内存大概1.5G

image.png

从单个对象来看,org.apache.spark.status.TaskDataWrapper包括以下成员:

image.png

可以看到占用内存较大部分主要是accumulatorUpdates,包含2509486AccumulableInfo,每个160byte,共计380M的内存,经确认,这是业务侧代码频繁调用累计器计算所产生的对象。

image.png

org.apache.spark.status.ElementTrackingStore主要包括以下几个对象:

1org.apache.spark.status.JobDataWrapper

2org.apache.spark.status.StageDataWrapper

3org.apache.spark.status.RDDOperationGraphWrapper

4org.apache.spark.status.ExecutorStageSummaryWrapper

分析方法类似,这些对象都是Application存储的中间状态数据,包括了jobstagetask等信息,缓存到driverInMemoryStore中,最终显示到Spark web ui界面上。

针对以上对象,具体的实现在类AppStatusListener中,负责将application过程的的相关信息,jobstagetask等中间数据写入到driver中,并做了相应的清理机制。 我们所关心的清理机制如下:

kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS))
  { count => cleanupExecutors(count) }
kvstore.addTrigger(classOf[JobDataWrapper], conf.get(MAX_RETAINED_JOBS)) { count =>
  cleanupJobs(count)
}
kvstore.addTrigger(classOf[StageDataWrapper], conf.get(MAX_RETAINED_STAGES)) { count =>
  cleanupStages(count)
}

这里调用了触发器,当给定的数据类型的数量达到一定的阈值之后,就会执行以下的动作,分别是清理上面所提到的几个对象:

{ count => cleanupExecutors(count) }

cleanupJobs(count)

cleanupStages(count)

以cleanupJobs(count)为例分析:
当对象JobDataWrapper的数据累计达到MAX_RETAINED_JOBS时候(默认为1000),会触发对JobDataWrapper对象遍历做清理操作:
val MAX_RETAINED_JOBS = ConfigBuilder("spark.ui.retainedJobs")
  .intConf
  .createWithDefault(
1000)
 
private def cleanupJobs(count: Long): Unit = {
 
val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_JOBS))
 
if (countToDelete <= 0L) {
   
return
 
}

 
val view = kvstore.view(classOf[JobDataWrapper]).index("completionTime").first(0L)
 
val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
    j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN
  }
  toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
}


其他的对象的清理机制类似,考虑到现网任务一个小时内的task数据达到百万级别,因此期间产生的stagejobtask信息巨大,缓存到driver端,共计几个G内存。因此,我们可以加快清理这些对象的频率,使得缓存到driver端的内存及时释放,需要在客户端或者提交命令中增加调整以下参数值:

spark.ui.retainedJobs = 10
spark.ui.retainedStages = 100
spark.ui.retainedTasks = 100


这样可以使得application中间状态的jobstagetask等相关临时数据得到释放,  内存及时回收,产生的影响是WEB UI界面保留的历史的任务的数据变少。

三、总结

高并发、大数据量任务下的,spark中的清理机制弊端,导致了任务的性能上不足,需要我们深入分析去调优,同时也要从内核上做优化,更好的让人们方便高效使用。


【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

举报
请填写举报理由
0/200