设为首页 收藏本站
查看: 1224|回复: 0

[经验分享] spark DAGScheduler、TaskSchedule、Executor执行task源码分析

[复制链接]

尚未签到

发表于 2019-1-30 10:39:46 | 显示全部楼层 |阅读模式
摘要
  spark的调度一直是我想搞清楚的东西,以及有向无环图的生成过程、task的调度、rdd的延迟执行是怎么发生的和如何完成的,还要就是RDD的compute都是在executor的哪个阶段调用和执行我们定义的函数的。这些都非常的基础和困难。花一段时间终于弄白了其中的奥秘。总结起来,以便以后继续完善。spark的调度分为两级调度:DAGSchedule和TaskSchedule。DAGSchedule是根据job来生成相互依赖的stages,然后把stages以TaskSet形式传递给TaskSchedule来进行任务的分发过程,里面的细节会慢慢的讲解出来的,比较长。

本文目录
  1、spark的RDD逻辑执行链
2、spark的job的划分、stage的划分
3、spark的DAGScheduler的调度
4、spark的TaskSchedule的调度
5、executor如何执行task以及我们定义的函数

spark的RDD的逻辑执行链
  都说spark进行延迟执行,通过RDD的DAG来生成相应的Stage等,RDD的DAG的形成过程,是通过依赖来完成的,每一个RDD通过转换算子的时候都会生成一个和多个子RDD,在通过转换算子的时候,在创建一个新的RDD的时候,也会创建他们之间的依赖关系。因此他们是通过Dependencies连接起来的,RDD的依赖不是我们的重点,如果想了解RDD的依赖,可以自行google,RDD的依赖分为:1:1的OneToOneDependency,m:1的RangeDependency,还有m:n的ShuffleDependencies,其中OneToOneDependency和RangeDependency又被称为NarrowDependency,这里的1:1,m:1,m:n的粒度是对于RDD的分区而言的。

  依赖中最根本的是保留了父RDD,其rdd的方法就是返回父RDD的方法。这样其就形成了一个链表形式的结构,通过最后面的RDD根据依赖,可以向前回溯到所有的父类RDD。
我们以map为例,来看一下依赖是如何产生的。

  通过map其实其实创建了一个MapPartitonsRDD的RDD

然后我们看一下MapPartitonsRDD的主构造函数,其又对RDD进行了赋值,其中父RDD就是上面的this对象指定的RDD,我们再看一下RDD这个类的构造函数:

其又调用了RDD的主构造函数

其实依赖都是在RDD的构造函数中形成的。
通过上面的依赖转换就形成了RDD额DAG图
生成了一个RDD的DAG图:

spark的job的划分、stage的划分
spark的Application划分job其实挺简单的,一个Application划分为几个job,我们就要看这个Application中有多少个Action算子,一个Action算子对应一个job,这个可以通过源码来看出来,转换算子是形成一个或者多个RDD,而Action算子是触发job的提交。
比如上面的map转换算子就是这样的

而Action算子是这样的:

通过runJob方法提交作业。stage的划分是根据是否进行shuflle过程来决定的,这个后面会细说。
  spark的DAGScheduler的调度
  当我们通过客户端,向spark集群提交作业时,如果利用的资源管理器是yarn,那么客户端向spark提交申请运行driver进程的机器,driver其实在spark中是没有具体的类的,driver机器主要是用来运行用户编写的代码的地方,完成DAGScheduler和TaskSchedule,追踪task运行的状态。记住,用户编写的主函数是在driver中运行的,但是RDD转换和执行是在不同的机器上完成。其实driver主要负责作业的调度和分发。Action算子到stage的划分和DAGScheduler的完成过程。
当我们在driver进程中运行用户定义的main函数的时候,首先会创建SparkContext对象,这个是我们与spark集群进行交互的入口它会初始化很多运行需要的环境,最主要的是初始化了DAGScheduler和TaskSchedule。

我们以这样的的一个RDD的逻辑执行图来分析整个DAGScheduler的过程。

因为DAGScheduler发生在driver进程中,我们就冲Driver进程运行用户定义的main函数开始。在上图中RDD9是最后一个RDD并且其调用了Action算子,就会触发作业的提交,其会调用SparkContext的runjob函数,其经过一系列的runJob的封装,会调用DAGScheduler的runJob
  在SparkContext中存在着runJob方法

----------------------------------------------
  def runJob[T, U: ClassTag](
rdd: RDD[T], // rdd为上面提到的RDD逻辑执行图中的RDD9
func: (TaskContext, Iterator[T]) => U,这个方法也是RDD9调用Action算子传入的函数
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}

----------------------------------------------
  DAGScheduler的runJob

----------------------------------------------
  def runJob[T, U](
rdd: RDD[T], //RDD9
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
//在这里会生成一个job的守护进程waiter,用来等待作业提交执行是否完成,其又调用了submitJob,其以下的代
//码都是用来处运行结果的一些log日志信息
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}

----------------------------------------------
  submitJob的源代码

----------------------------------------------
  def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// 检查RDD的分区是否合法
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
&quot;Attempting to access a non-existent partition: &quot; + p + &quot;. &quot; +
&quot;Total number of partitions: &quot; + maxPartitions)
}
  val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
  assert(partitions.size > 0)
//这一块是把我们的job继续进行封装到JobSubmitted,然后放入到一个进程中池里,spark会启动一个线程来处理我
//们提交的作业
val func2 = func.asInstanceOf[(TaskContext, Iterator[]) => ]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}

----------------------------------------------
  在DAGScheduler类中有一个DAGSchedulerEventProcessLoop的类,用来接收处理DAGScheduler的消息事件

JobSubmitted对象,因此会执行第一个操作handleJobSubmitted,在这里我们要说一下,Stage的类型,在spark中有两种类型的stage一种是ShuffleMapStage,和ResultStage,最后一个RDD对应的Stage是ResultStage,遇到Shuffle过程的RDD被称为ShuffleMapStage。

----------------------------------------------
  private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[],//对应RDD9
func: (TaskContext, Iterator[
]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// 先创建ResultStage。
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning(&quot;Creating new stage failed due to exception - job: &quot; + jobId, e)
listener.jobFailed(e)
return
}
  val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo(&quot;Got job %s (%s) with %d output partitions&quot;.format(
job.jobId, callSite.shortForm, partitions.length))
logInfo(&quot;Final stage: &quot; + finalStage + &quot; (&quot; + finalStage.name + &quot;)&quot;)
logInfo(&quot;Parents of final stage: &quot; + finalStage.parents)
logInfo(&quot;Missing parents: &quot; + getMissingParentStages(finalStage))
  val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}

----------------------------------------------
  上面的createResultStage其实就是RDD转换为Stage的过程,方法如下

----------------------------------------------
  /*创建ResultStage的时候,它会调用相关函数
*/
private def createResultStage(
rdd: RDD[], //对应上图的RDD9
func: (TaskContext, Iterator[
]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
  /**


  • 形成ResultStage依赖的父Stage
    */
    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    getShuffleDependencies(rdd).map { shuffleDep =>
    getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
    }
    /**
  • 采用的是深度优先遍历找到Action算子的父依赖中的宽依赖
  • 这个是最主要的方法,要看懂这个方法,其实后面的就好理解,最好结合这例子上面给出的RDD逻辑依赖图,比*
  •   较容易看出来,根据上面的RDD逻辑依赖图,其返回的ShuffleDependency就是RDD2和RDD1,RDD7和RDD6的依
    赖,如果存在A
    //如果不是shuffle依赖,把其父RDD压入待访问栈中,从而进行循环
    waitingForVisit.push(dependency.rdd)
    }
    }
    }
    parents
    }
    /创建shuffleMapStage,根据上面得到的两个Shuffle对象,分别创建了两个shuffleMapStage
    /
    /
    def createShuffleMapStage(shuffleDep: ShuffleDependency[, , _], jobId: Int): ShuffleMapStage = {
    //这个RDD其实就是RDD1和RDD6
    val rdd = shuffleDep.rdd
    val numTasks = rdd.partitions.length
    val parents = getOrCreateParentStages(rdd, jobId) //查看这两个ShuffleMapStage是否存在父Shuffle的Stage
    val id = nextStageId.getAndIncrement()
    //创建ShuffleMapStage,下面是更新一下SparkContext的状态
    val stage = new ShuffleMapStage(
    id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
    stageIdToStage(id) = stage
    shuffleIdToMapStage(shuffleDep.shuffleId) = stage
    updateJobIdStageIdMaps(jobId, stage)
      if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
    // Kind of ugly: need to register RDDs with the cache and map output tracker here
    // since we can't do it in the RDD constructor because # of partitions is unknown
    logInfo(&quot;Registering RDD &quot; + rdd.id + &quot; (&quot; + rdd.getCreationSite + &quot;)&quot;)
    mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    stage
    }

    ----------------------------------------------
      通过上面的源代码分析,结合RDD的逻辑执行图,我们可以看出,这个job拥有三个Stage,一个ResultStage,两个ShuffleMapStage,一个ShuffleMapStage中的RDD是RDD1,另一个stage中的RDD是RDD6,从而,以上完成了RDD到Stage的切分工作。当切分完成后在handleJobSubmitted这个方法的最后,调用提交stage的方法。

  submitStage源代码比较简单,它会检查我们当前的stage依赖的父stage是否已经执行完成,如果没有执行完成会循环提交其父stage等待其父stage执行完成了,才提交我们当前的stage进行执行。

----------------------------------------------
  private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug(&quot;submitStage(&quot; + stage + &quot;)&quot;)
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug(&quot;missing: &quot; + missing)
if (missing.isEmpty) {
logInfo(&quot;Submitting &quot; + stage + &quot; (&quot; + stage.rdd + &quot;), which has no missing parents&quot;)
submitMissingTasks(stage, jobId.get)
} else {
for (parent
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
  //得到每个分区对应的具体位置,即分区的数据位于集群的哪台机器上。
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s&quot;Task creation failed: $e\n${Utils.exceptionString(e)}&quot;, Some(e))
runningStages -= stage
return
}
//    这个把上面stage要计算的分区和每个分区对应的物理位置进行了从新封装,放在了latestInfo里面
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
  //序列化我们刚才得到的信息,以便在driver机器和work机器之间进行传输
var taskBinary: Broadcast[Array[Byte]] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}

  taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
// In the case of a failure during serialization, abort the stage.
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString, Some(e))
runningStages -= stage
// Abort execution
return
case NonFatal(e) =>
abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
  //封装stage构成taskSet集合,ShuffleMapStage对应的task为ShuffleMapTask,而ResultStage对应的taskSet为ResultTask
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}

  case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
  } catch {
case NonFatal(e) =>
abortStage(stage, s&quot;Task creation failed: $e\n${Utils.exceptionString(e)}&quot;, Some(e))
runningStages -= stage
return
}
  //提交task给TaskSchedule
if (tasks.size > 0) {
logInfo(&quot;Submitting &quot; + tasks.size + &quot; missing tasks from &quot; + stage + &quot; (&quot; + stage.rdd + &quot;)&quot;)
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug(&quot;New pending partitions: &quot; + stage.pendingPartitions)
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)

val debugString = stage match {
case stage: ShuffleMapStage =>
s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})"
case stage : ResultStage =>
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
}
logDebug(debugString)
submitWaitingChildStages(stage)
  }
}

----------------------------------------------
  到此,完成了整个DAGScheduler的调度。

spark的TaskSchedule的调度
  spark的Task的调度,我们要明白其调度过程,其根据不同的资源管理器拥有不同的调度策略,因此也拥有不同的调度守护进程,这个守护进程管理着集群的资源信息,spark提供了一个基本的守护进程的类,来完成与driver和executor的交互:CoarseGrainedSchedulerBackend,它应该运行在集群资源管理器上,比如yarn等。他收集了集群work机器的一般资源信息。当我们形成tasks将要进行调度的时候,driver进程会与其通信,请求资源的分配和调度,其会把最优的work节点分配给task来执行其任务。而TaskScheduleImpl实现了task调度的过程,采用的调度算法默认的是FIFO的策略,也可以采用公平调度策略。

  当我们提交task时,其会创建一个管理task的类TaskSetManager,然后把其加入到任务调度池中。

----------------------------------------------
  override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo(&quot;Adding task set &quot; + taskSet.id + &quot; with &quot; + tasks.length + &quot; tasks&quot;)
this.synchronized {
// 创建taskSetManager,以下为更新一下状态
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s&quot;more than one active taskSet for stage $stage:&quot; +
s&quot; ${stageTaskSets.toSeq.map{
._2.taskSet.id}.mkString(&quot;,&quot;)}&quot;)
}
//把封装好的taskSet,加入到任务调度队列中。
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
  }
//这个地方就是向资源管理器发出请求,请求任务的调度
backend.reviveOffers()
}
  /*
*这个方法是位于CoarseGrainedSchedulerBackend类中,driver进程会想集群管理器发送请求资源的请求。
/
override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}

----------------------------------------------
  当其收到这个请求时,其会调用这样的方法。

----------------------------------------------
  override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s&quot;Ignored task status update ($taskId state $state) &quot; +
s&quot;from unknown executor with ID $executorId&quot;)
}
}
//发送的请求满足这个条件
case ReviveOffers =>
makeOffers()
  case KillTask(taskId, executorId, interruptThread) =>
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread))
case None =>
// Ignoring the task kill since the executor is not registered.
logWarning(s&quot;Attempted to kill task $taskId for unknown executor $executorId.&quot;)
}
}
  /*
*这个方法是搜集集群上现在还在活着的机器的相关信息。并且进行封装成WorkerOffer类,


  • 然后其会调用TaskSchedulerImpl中的resourceOffers方法,来进行筛选,筛选出符合请求资源的机器,来执行我们当前的任务
    /
    private def makeOffers() {
    // Filter out executors under killing
    val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
    val workOffers = activeExecutors.map { case (id, executorData) =>
    new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
    }.toIndexedSeq
    launchTasks(scheduler.resourceOffers(workOffers))
    }
  /*得到集群中空闲机器的信息后,我们通过此方法来筛选出满足我们这次任务要求的机器,然后返回TaskDescription类
*这个类封装了task与excutor的相关信息


  •   /
    def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    var newExecAvail = false
    //检查work是否已经存在了,把不存在的加入到work调度池中
    for (o  o.cores).toArray
    //从task任务调度池中,按照我们的调度算法,取出需要执行的任务
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet
    logInfo(&quot;Successfully registered with driver&quot;)
    try {
    executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
    } catch {
    case NonFatal(e) =>
    exitExecutor(1, &quot;Unable to create executor due to &quot; + e.getMessage, e)
    }
      case RegisterExecutorFailed(message) =>
    exitExecutor(1, &quot;Slave registration failed: &quot; + message)
    //提交任务时,执行这样的操作。
    case LaunchTask(data) =>
    if (executor == null) {
    exitExecutor(1, &quot;Received LaunchTask command but executor was null&quot;)
    } else {
    //先反序列化
    val taskDesc = ser.deserializeTaskDescription
    logInfo(&quot;Got assigned task &quot; + taskDesc.taskId)
    //然后执行launchTask操作。
    executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
    taskDesc.name, taskDesc.serializedTask)
    }
      case KillTask(taskId, _, interruptThread) =>
    if (executor == null) {
    exitExecutor(1, &quot;Received KillTask command but executor was null&quot;)
    } else {
    executor.killTask(taskId, interruptThread)
    }
      case StopExecutor =>
    stopping.set(true)
    logInfo(&quot;Driver commanded a shutdown&quot;)
    // Cannot shutdown here because an ack may need to be sent back to the caller. So send
    // a message to self to actually do the shutdown.
    self.send(Shutdown)
      case Shutdown =>
    stopping.set(true)
    new Thread(&quot;CoarseGrainedExecutorBackend-stop-executor&quot;) {
    override def run(): Unit = {
    // executor.stop() will call SparkEnv.stop() which waits until RpcEnv stops totally.
    // However, if executor.stop() runs in some thread of RpcEnv, RpcEnv won't be able to
    // stop until executor.stop() returns, which becomes a dead-lock (See SPARK-14180).
    // Therefore, we put this line in a new thread.
    executor.stop()
    }
    }.start()
    }

    ----------------------------------------------
      Executor的相关源代码,从源码中我们可以看出,对于Task,其创建了一个TaskRunner的线程,并且把其放入到执行队列中进行执行。

    ----------------------------------------------
      def launchTask(
    context: ExecutorBackend,
    taskId: Long,
    attemptNumber: Int,
    taskName: String,
    serializedTask: ByteBuffer): Unit = {
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
    serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
    }

    ----------------------------------------------
      从下面可以看出,其定义的就是一个线程,那我们就看一下这个线程的run方法。


    ----------------------------------------------
      override def run(): Unit = {
    //初始化线程运行需要的一些环境
    val threadMXBean = ManagementFactory.getThreadMXBean
    val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime
    } else 0L
    //得到当前进程的类加载器
    Thread.currentThread.setContextClassLoader(replClassLoader)
    val ser = env.closureSerializer.newInstance()
    logInfo(s&quot;Running $taskName (TID $taskId)&quot;)
    //更新相关的状态
    execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
    var taskStart: Long = 0
    var taskStartCpu: Long = 0
    startGCTime = computeTotalGcTime()

    try {
      //反序列化类相关的依赖,得到相关的参数
    val (taskFiles, taskJars, taskProps, taskBytes) =
    Task.deserializeWithDependencies(serializedTask)

      // Must be set before updateDependencies() is called, in case fetching dependencies
    // requires access to properties contained within (e.g. for access control).
    Executor.taskDeserializationProps.set(taskProps)
      //更新依赖配置
    updateDependencies(taskFiles, taskJars)
    task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
    task.localProperties = taskProps
    task.setTaskMemoryManager(taskMemoryManager)

      // If this task has been killed before we deserialized it, let's quit now. Otherwise,
    // continue executing the task.
    if (killed) {
    // Throw an exception rather than returning, because returning within a try{} block
    // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
    // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
    // for the task.
    throw new TaskKilledException
    }
    logDebug("Task " + taskId + "'s epoch is " + task.epoch)
      //追踪缓存数据的位置
    env.mapOutputTracker.updateEpoch(task.epoch)

      // Run the actual task and measure its runtime.
    taskStart = System.currentTimeMillis()
    taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime
    } else 0L
    var threwException = true
      //运行任务的run方法来运行task,主要就是下面的task.run方法,它又会调用runTask方法来真正执行task,前面我们提到过,job变
    //为stage有两种,ShuffleMapStage和ResultStage,那么其对应的也有两个Task:ShuffleMapTask和ResultTask,不同的task类型,执行不同的run方法。
    val value = try {
    val res = task.run(
    taskAttemptId = taskId,
    attemptNumber = attemptNumber,
    metricsSystem = env.metricsSystem)
    threwException = false
    res
    } finally {
    //下面就是根据上面的运行结果,来进行一些判断和日志的打出
    val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
    val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()

        if (freedMemory > 0 && !threwException) {
    val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
    if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
    throw new SparkException(errMsg)
    } else {
    logWarning(errMsg)
    }
    }
    if (releasedLocks.nonEmpty && !threwException) {
    val errMsg =
    s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +
    releasedLocks.mkString("[", ", ", "]")
    if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false)) {
    throw new SparkException(errMsg)
    } else {
    logWarning(errMsg)
    }
    }
    }
    val taskFinish = System.currentTimeMillis()
    val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime
    } else 0L
    // If the task has been killed, let's fail it.
    if (task.killed) {
    throw new TaskKilledException
    }
    val resultSer = env.serializer.newInstance()
    val beforeSerialization = System.currentTimeMillis()
    val valueBytes = resultSer.serialize(value)
    val afterSerialization = System.currentTimeMillis()
    // Deserialization happens in two parts: first, we deserialize a Task object, which
    // includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
    task.metrics.setExecutorDeserializeTime(
    (taskStart - deserializeStartTime) + task.executorDeserializeTime)
    task.metrics.setExecutorDeserializeCpuTime(
    (taskStartCpu - deserializeStartCpuTime) + task.executorDeserializeCpuTime)
    // We need to subtract Task.run()'s deserialization time to avoid double-counting
    task.metrics.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
    task.metrics.setExecutorCpuTime(
    (taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)
    task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
    task.metrics.setResultSerializationTime(afterSerialization - beforeSerialization)
    // Note: accumulator updates must be collected after TaskMetrics is updated
    val accumUpdates = task.collectAccumulatorUpdates()
    // TODO: do not serialize value twice
    val directResult = new DirectTaskResult(valueBytes, accumUpdates)
    val serializedDirectResult = ser.serialize(directResult)
    val resultSize = serializedDirectResult.limit
    // directSend = sending directly back to the driver
    val serializedResult: ByteBuffer = {
    if (maxResultSize > 0 && resultSize > maxResultSize) {
    logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
    s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
    s"dropping it.")
    ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
    } else if (resultSize > maxDirectResultSize) {
    val blockId = TaskResultBlockId(taskId)
    env.blockManager.putBytes(
    blockId,
    new ChunkedByteBuffer(serializedDirectResult.duplicate()),
    StorageLevel.MEMORY_AND_DISK_SER)
    logInfo(
    s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
    ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
    } else {
    logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
    serializedDirectResult
    }
    }
    execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
    } catch {
    case ffe: FetchFailedException =>
    val reason = ffe.toTaskFailedReason
    setTaskFinishedAndClearInterruptStatus()
    execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
    case _: TaskKilledException =>
    logInfo(s"Executor killed $taskName (TID $taskId)")
    setTaskFinishedAndClearInterruptStatus()
    execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
    case _: InterruptedException if task.killed =>
    logInfo(s"Executor interrupted and killed $taskName (TID $taskId)")
    setTaskFinishedAndClearInterruptStatus()
    execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
    case CausedBy(cDE: CommitDeniedException) =>
    val reason = cDE.toTaskFailedReason
    setTaskFinishedAndClearInterruptStatus()
    execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
    case t: Throwable =>
    // Attempt to exit cleanly by informing the driver of our failure.
    // If anything goes wrong (or this was a fatal exception), we will delegate to
    // the default uncaught exception handler, which will terminate the Executor.
    logError(s"Exception in $taskName (TID $taskId)", t)
    // Collect latest accumulator values to report back to the driver
    val accums: Seq[AccumulatorV2[_, _]] =
    if (task != null) {
    task.metrics.setExecutorRunTime(System.currentTimeMillis() - taskStart)
    task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
    task.collectAccumulatorUpdates(taskFailed = true)
    } else {
    Seq.empty
    }
    val accUpdates = accums.map(acc => acc.toInfo(Some(acc.value), None))
    val serializedTaskEndReason = {
    try {
    ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))
    } catch {
    case _: NotSerializableException =>
    // t is not serializable so just send the stacktrace
    ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))
    }
    }
    setTaskFinishedAndClearInterruptStatus()
    execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
    // Don't forcibly exit unless the exception was inherently fatal, to avoid
    // stopping other tasks unnecessarily.
    if (Utils.isFatalError(t)) {
    SparkUncaughtExceptionHandler.uncaughtException(t)
    }
    } finally {
    runningTasks.remove(taskId)
    }
      }
    }

    ----------------------------------------------
      前面我们提到过,job变为stage有两种,ShuffleMapStage和ResultStage,那么其对应的也有两个Task:ShuffleMapTask和
    ResultTask,不同的task类型,执行不同的Task.runTask方法。Task.run方法中调用了runTask的方法,这个方法在上面两个Task类中都进行了重写。
    ShuffleMapTask的runTask方法

    ----------------------------------------------
      override def runTask(context: TaskContext): MapStatus = {
    // Deserialize the RDD using the broadcast variable.
    //首先进行一些初始化操作
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime
    } else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()
    //反序列化,这里的rdd,其实是我们进行shuffle之前的最后一个rdd,这个我们在前面已经说到的。
    val (rdd, dep) = ser.deserialize[(RDD[], ShuffleDependency[, , ])](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
      _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L
    //下面就是把每一个shuffle之前的stage的最后一个rdd进行写入操作,但是没有看到task执行我们写的函数,也没有看到其调用compute函数以及rdd之间的管道执行也没有体现出来,往下看,会揭露这些问题的面纱。
    var writer: ShuffleWriter[Any, Any] = null
    try {
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
    writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[

    try {
    if (writer != null) {
    writer.stop(success = false)
    }
    } catch {
    case e: Exception =>
    log.debug(&quot;Could not stop writer&quot;, e)
    }
    throw e
    }
    }

    ----------------------------------------------
      对于上面红色部分的问题,我们在这里进行详细的解释。RDD会根据依赖关系来形成一个有向无环图,通过最后一个RDD和其依赖,我们就可以反向查找其对应的所有父类。如果没有shuffle过程,那么其就会形成管道,形成管道的好处就是所有RDD的中间结果不需要进行存储,直接就把我们的定义的多个函数串连起来,从输入到输出中间结果不需要存储,节省了时间和空间。同时我们也知道RDD的中间结果可以持久化到内存或者硬盘上,spark对于这个是可以追踪到的。

      通过上面的分析,我们可以看出,executor中

    正是我们RDD往前回溯的开始。对于shuffle过程和ResultTask的runTask的执行过程以后会在慢慢跟进。




运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669512-1-1.html 上篇帖子: Spark能做什么?Spark应用领域 下篇帖子: spark下dataframe转为rdd格式
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表