设为首页 收藏本站
查看: 994|回复: 0

[经验分享] [Apache Spark源码阅读]天堂之门——SparkContext解析

[复制链接]

尚未签到

发表于 2015-8-6 09:58:58 | 显示全部楼层 |阅读模式
  稍微了解Spark源码的人应该都知道SparkContext,作为整个Project的程序入口,其重要性不言而喻,许多大牛也在源码分析的文章中对其做了很多相关的深入分析和解读。这里,结合自己前段时间的阅读体会,与大家共同讨论学习一下Spark的入口对象—天堂之门—SparkContex。
  SparkContex位于项目的源码路径\spark-master\core\src\main\scala\org\apache\spark\SparkContext.scala中,源文件包含Classs SparkContext声明和其伴生对象object SparkContext。而之所以将SparkContext称为整个程序的入口,原因在于,不管我们是从本地还是HDFS读取文件,总要首先创建一个SparkContext对象,然后基于这个SC对象,展开后续的RDD对象创建、转换等操作。
  在创建SparkContex对象的过程中,进行了一系列的初始化操作,主要包括以下内容:


  • 载入配置文件SparkConf
  • 创建SparkEnv
  • 创建TaskScheduler
  • 创建DAGScheduler
  1、 载入配置文件SparkConf
  在SparkConf初始化时,会将相关的配置参数传递给SparkContex,包括master、appName、sparkHome、jars、environment等信息,这里的构造函数有多中表达形式,但最归初始化的结果都是殊途同归,SparkContex获取了所有相关的本地配置和运行时配置信息。



1 def this() = this(new SparkConf())
2
3 def this(master: String, appName: String, conf: SparkConf) =
4     this(SparkContext.updatedConf(conf, master, appName))
5
6 def this(
7       master: String,
8       appName: String,
9       sparkHome: String = null,
10       jars: Seq[String] = Nil,
11       environment: Map[String, String] = Map(),
12       preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
13   {
14     this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
15     this.preferredNodeLocationData = preferredNodeLocationData
16   }
  
  2、创建SparkEnv
  SparkEnv是一个非常重要的变量,其内包含了许多Spark运行时的重要组件(变量),包括 MapOutputTracker、ShuffleFetcher、BlockManager等,这里是通过SparkEnv类的伴生对象SparkEnv Object内的Create方法实现的。



1   private[spark] val env = SparkEnv.create(
2     conf,
3     "",
4     conf.get("spark.driver.host"),
5     conf.get("spark.driver.port").toInt,
6     isDriver = true,
7     isLocal = isLocal,
8     listenerBus = listenerBus)
9   SparkEnv.set(env)
  
  3、创建TaskScheduler和DAGScheduler
  下面这段代码非常重要,它初始化了SparkContex里两个非常关键的变量,TaskScheduler和DAGScheduler。



1   private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
2   @volatile private[spark] var dagScheduler: DAGScheduler = _
3   try {
4     dagScheduler = new DAGScheduler(this)
5   } catch {
6     case e: Exception => throw
7       new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))
8   }
9
10   // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
11   // constructor
12   taskScheduler.start()
  
  首先,TaskScheduler是根据Spark的运行模式进行初始化的,具体代码在SparkContext中的createTaskScheduler方法中。以Standalone模式为例,它会将sc传递给TaskSchedulerImpl,并在返回Scheduler对象之前,创建SparkDeploySchedulerBackend,并将其初始化,最后返回Scheduler对象。



1     case SPARK_REGEX(sparkUrl) =>
2         val scheduler = new TaskSchedulerImpl(sc)
3         val masterUrls = sparkUrl.split(",").map("spark://" + _)
4         val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
5         scheduler.initialize(backend)
6         scheduler
  创建TaskScheduler对象后,再将TaskScheduler对象传参至DAGScheduler,用来创建DAGScheduler对象,



1   def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
2     this(
3       sc,
4       taskScheduler,
5       sc.listenerBus,
6       sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
7       sc.env.blockManager.master,
8       sc.env)
9   }
  之后,再调用其start()方法将其启动,其中包括SchedulerBackend的启动。



1 override def start() {
2     backend.start()
3
4     if (!isLocal && conf.getBoolean("spark.speculation", false)) {
5       logInfo("Starting speculative execution thread")
6       import sc.env.actorSystem.dispatcher
7       sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
8             SPECULATION_INTERVAL milliseconds) {
9         Utils.tryOrExit { checkSpeculatableTasks() }
10       }
11     }
12   }
  
  除此之外,SparkContex还包括一些重要的函数方法,例如
  1、runjob
  runjob是spark中所有任务提交的入口,诸如rdd中的一些常见操作和变换,都会调用SparkContex的runjob方法,提交任务。


DSC0000.gif DSC0001.gif


1   def runJob[T, U: ClassTag](
2       rdd: RDD[T],
3       func: (TaskContext, Iterator[T]) => U,
4       partitions: Seq[Int],
5       allowLocal: Boolean,
6       resultHandler: (Int, U) => Unit) {
7     if (dagScheduler == null) {
8       throw new SparkException("SparkContext has been shutdown")
9     }
10     val callSite = getCallSite
11     val cleanedFunc = clean(func)
12     logInfo("Starting job: " + callSite)
13     val start = System.nanoTime
14     dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
15       resultHandler, localProperties.get)
16     logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
17     rdd.doCheckpoint()
18   }
View Code  2、textFile
  从HDFS路径读取单个数据文件,首先创建HadoopRDD,通过map操作,返回RDD对象。
  3、wholeTextFiles
  从HDFS某个文件夹读取多个文件。
  4、parallelize
  读取本地文件,并转换为RDD。
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-94640-1-1.html 上篇帖子: Installing Apache, PHP, and MySQL on Mac OS X 下篇帖子: apache +php
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表