设为首页 收藏本站
查看: 1306|回复: 0

[经验分享] Apache Spark源码走读之3

[复制链接]

尚未签到

发表于 2019-1-31 07:06:06 | 显示全部楼层 |阅读模式
概要  本篇主要阐述在TaskRunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理的结果返回到哪里,如何返回。
准备  1. spark已经安装完毕
  2. spark运行在local mode或local-cluster mode
local-cluster mode  local-cluster模式也称为伪分布式,可以使用如下指令运行
MASTER=local[1,2,1024] bin/spark-shell  [1,2,1024] 分别表示,executor number, corenumber和内存大小,其中内存大小不应小于默认的512M
DriverProgramme的初始化过程分析初始化过程的涉及的主要源文件  1. SparkContext.scala      整个初始化过程的入口
  2. SparkEnv.scala          创建BlockManager, MapOutputTrackerMaster, ConnectionManager,CacheManager
  3. DAGScheduler.scala      任务提交的入口,即将Job划分成各个stage的关键
  4. TaskSchedulerImpl.scala 决定每个stage可以运行几个task,每个task分别在哪个executor上运行
  5. SchedulerBackend
  1. 最简单的单机运行模式的话,看LocalBackend.scala
  2. 如果是集群模式,看源文件SparkDeploySchedulerBackend
初始化过程步骤详解  步骤1: 根据初始化入参生成SparkConf,再根据SparkConf来创建SparkEnv, SparkEnv中主要包含以下关键性组件 1. BlockManager 2. MapOutputTracker 3. ShuffleFetcher 4.ConnectionManager
private[spark] val env = SparkEnv.create(    conf,    "",    conf.get("spark.driver.host"),    conf.get("spark.driver.port").toInt,    isDriver = true,    isLocal = isLocal)  SparkEnv.set(env)  步骤2:创建TaskScheduler,根据Spark的运行模式来选择相应的SchedulerBackend,同时启动taskscheduler,这一步至为关键
  private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName)  taskScheduler.start()  TaskScheduler.start目的是启动相应的SchedulerBackend,并启动定时器进行检测
override def start() {    backend.start()     if (!isLocal && conf.getBoolean("spark.speculation", false)) {      logInfo("Starting speculative execution thread")      import sc.env.actorSystem.dispatcher      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,            SPECULATION_INTERVAL milliseconds) {        checkSpeculatableTasks()      }    }  }  步骤3:以上一步中创建的TaskScheduler实例为入参创建DAGScheduler并启动运行
@volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)  dagScheduler.start()  步骤4:启动WEB UI
  ui.start()
  RDD的转换过程
  还是以最简单的wordcount为例说明rdd的转换过程
sc.textFile("README.md").flatMap(line=>line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)  上述一行简短的代码其实发生了很复杂的RDD转换,下面仔细解释每一步的转换过程和转换结果
步骤1:val rawFile = sc.textFile("README.md")  textFile先是生成hadoopRDD,然后再通过map操作生成MappedRDD,如果在spark-shell中执行上述语句,得到的结果可以证明所做的分析
  scala> sc.textFile("README.md")14/04/23 13:11:48 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes14/04/23 13:11:48 INFO MemoryStore: ensureFreeSpace(119741) called with curMem=0, maxMem=31138775014/04/23 13:11:48 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 116.9 KB, free 296.8 MB)14/04/2313:11:48 DEBUG BlockManager: Put block broadcast_0 locally took  277 ms14/04/23 13:11:48 DEBUG BlockManager: Put for block broadcast_0 without replication took  281 msres0: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :13
  步骤2: valsplittedText = rawFile.flatMap(line => line.split(" "))
  flatMap将原来的MappedRDD转换成为FlatMappedRDD
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =                                                                                                  new FlatMappedRDD(this, sc.clean(f))
  步骤3:val wordCount = splittedText.map(word => (word, 1))
  利用word生成相应的键值对,上一步的FlatMappedRDD被转换成为MappedRDD
步骤4:val reduceJob = wordCount.reduceByKey(_ + _),这一步最复杂  步骤2,3中使用到的operation全部定义在RDD.scala中,而这里使用到的reduceByKey却在RDD.scala中见不到踪迹。reduceByKey的定义出现在源文件PairRDDFunctions.scala
  细心的你一定会问reduceByKey不是MappedRDD的属性和方法啊,怎么能被MappedRDD调用呢?其实这背后发生了一个隐式的转换,该转换将MappedRDD转换成为PairRDDFunctions
implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =    newPairRDDFunctions(rdd)  这种隐式的转换是scala的一个语法特征,如果想知道的更多,请用关键字"scalaimplicit method"进行查询,会有不少的文章对此进行详尽的介绍。
  接下来再看一看reduceByKey的定义
  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {    reduceByKey(defaultPartitioner(self), func)  }   def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {    combineByKey[V]((v: V) => v, func, func, partitioner)  }   def combineByKey[C](createCombiner: V => C,      mergeValue: (C, V) => C,      mergeCombiners: (C, C) => C,      partitioner: Partitioner,      mapSideCombine: Boolean = true,      serializerClass: String = null): RDD[(K, C)] = {    if (getKeyClass().isArray) {      if (mapSideCombine) {        throw new SparkException("Cannot use map-side combining with array keys.")      }      if (partitioner.isInstanceOf[HashPartitioner]) {        throw new SparkException("Default partitioner cannot partition array keys.")      }    }    val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)    if (self.partitioner == Some(partitioner)) {      self.mapPartitionsWithContext((context, iter) => {        newInterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))      }, preservesPartitioning = true)    } else if (mapSideCombine) {      val combined = self.mapPartitionsWithContext((context, iter) => {        aggregator.combineValuesByKey(iter, context)      }, preservesPartitioning = true)      valpartitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)        .setSerializer(serializerClass)      partitioned.mapPartitionsWithContext((context, iter) => {        new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))      }, preservesPartitioning = true)    } else {      // Don't apply map-side combiner.      valvalues = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)      values.mapPartitionsWithContext((context, iter) => {        newInterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))      }, preservesPartitioning = true)    }  }  reduceByKey最终会调用combineByKey, 在这个函数中PairedRDDFunctions会被转换成为ShuffleRDD,当调用mapPartitionsWithContext之后,shuffleRDD被转换成为MapPartitionsRDD
  Log输出能证明我们的分析
  res1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at reduceByKey at :13
  RDD转换小结
  小结一下整个RDD转换过程
  HadoopRDD->MappedRDD->FlatMappedRDD->MappedRDD->PairRDDFunctions->ShuffleRDD->MapPartitionsRDD
  整个转换过程好长啊,这一切的转换都发生在任务提交之前。
运行过程分析数据集操作分类  在对任务运行过程中的函数调用关系进行分析之前,我们也来探讨一个偏理论的东西,作用于RDD之上的Transformantion为什么会是这个样子?
  对这个问题的解答和数学搭上关系了,从理论抽象的角度来说,任务处理都可归结为“input->processing->output"。input和output对应于数据集dataset.
  在此基础上作一下简单的分类
  1. one-one 一个dataset在转换之后还是一个dataset,而且dataset的size不变,如map
  2. one-one 一个dataset在转换之后还是一个dataset,但size发生更改,这种更改有两种可能:扩大或缩小,如flatMap是size增大的操作,而subtract是size变小的操作
  3. many-one 多个dataset合并为一个dataset,如combine, join
  4. one-many 一个dataset分裂为多个dataset, 如groupBy
Task运行期的函数调用  task的提交过程参考本系列中的第二篇文章。本节主要讲解当task在运行期间是如何一步步调用到作用于RDD上的各个operation


  •   TaskRunner.run




    •   Task.runTask (Task是一个基类,有两个子类,分别为ShuffleMapTask和ResultTask)



    •   RDD.computeOrReadCheckpoint         



    •   RDD.compute 



    •   RDD.iterator



    •   Task.run


  或许当看到RDD.compute函数定义时,还是觉着f没有被调用,以MappedRDD的compute定义为例
  override def compute(split: Partition, context: TaskContext) =                                                                                       firstParent[T].iterator(split, context).map(f)    注意,这里最容易产生错觉的地方就是map函数,这里的map不是RDD中的map,而是scala中定义的iterator的成员函数map, 请自行参考http://www.scala-lang.org/api/2. ... collection.Iterator
堆栈输出 80         at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:111) 81         at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:154) 82         at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149) 83         at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64) 84         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 85         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 86         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 87         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 88         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 89         at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) 90         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 91         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 92         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) 93         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 94         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 95         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) 96         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241) 97         at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) 98         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) 99         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)100         at org.apache.spark.scheduler.Task.run(Task.scala:53)101         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)ResultTask  compute的计算过程对于ShuffleMapTask比较复杂,绕的圈圈比较多,对于ResultTask就直接许多。
  override def runTask(context: TaskContext): U = {    metrics = Some(context.taskMetrics)    try{      func(context, rdd.iterator(split, context))    } finally {      context.executeOnCompleteCallbacks()    }  }
  计算结果的传递
  上面的分析知道,wordcount这个job在最终提交之后,被DAGScheduler分为两个stage,第一个Stage是shuffleMapTask,第二个Stage是ResultTask.
  那么ShuffleMapTask的计算结果是如何被ResultTask取得的呢?这个过程简述如下
  1. ShffuleMapTask将计算的状态(注意不是具体的数据)包装为MapStatus返回给DAGScheduler
  2. DAGScheduler将MapStatus保存到MapOutputTrackerMaster中
  3. ResultTask在执行到ShuffleRDD时会调用BlockStoreShuffleFetcher的fetch方法去获取数据
  1. 第一件事就是咨询MapOutputTrackerMaster所要取的数据的location
  2. 根据返回的结果调用BlockManager.getMultiple获取真正的数据
  BlockStoreShuffleFetcher的fetch函数伪码
    val blockManager = SparkEnv.get.blockManager     val startTime = System.currentTimeMillis    val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)    logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(      shuffleId, reduceId, System.currentTimeMillis - startTime))     val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)    val itr = blockFetcherItr.flatMap(unpackBlock)  注意上述代码中的getServerStatuses及getMultiple,一个是询问数据的位置,一个是去获取真正的数据。
  





运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669780-1-1.html 上篇帖子: (版本定制)第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密 下篇帖子: SPARK Your Embedded Development Project with eBox
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表