稍微了解Spark源码的人应该都知道SparkContext,作为整个Project的程序入口,其重要性不言而喻,许多大牛也在源码分析的文章中对其做了很多相关的深入分析和解读。这里,结合自己前段时间的阅读体会,与大家共同讨论学习一下Spark的入口对象—天堂之门—SparkContex。
SparkContex位于项目的源码路径\spark-master\core\src\main\scala\org\apache\spark\SparkContext.scala中,源文件包含Classs SparkContext声明和其伴生对象object SparkContext。而之所以将SparkContext称为整个程序的入口,原因在于,不管我们是从本地还是HDFS读取文件,总要首先创建一个SparkContext对象,然后基于这个SC对象,展开后续的RDD对象创建、转换等操作。
在创建SparkContex对象的过程中,进行了一系列的初始化操作,主要包括以下内容:
载入配置文件SparkConf
创建SparkEnv
创建TaskScheduler
创建DAGScheduler
1、 载入配置文件SparkConf
在SparkConf初始化时,会将相关的配置参数传递给SparkContex,包括master、appName、sparkHome、jars、environment等信息,这里的构造函数有多中表达形式,但最归初始化的结果都是殊途同归,SparkContex获取了所有相关的本地配置和运行时配置信息。
1 def this() = this(new SparkConf())
2
3 def this(master: String, appName: String, conf: SparkConf) =
4 this(SparkContext.updatedConf(conf, master, appName))
5
6 def this(
7 master: String,
8 appName: String,
9 sparkHome: String = null,
10 jars: Seq[String] = Nil,
11 environment: Map[String, String] = Map(),
12 preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
13 {
14 this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
15 this.preferredNodeLocationData = preferredNodeLocationData
16 }
2、创建SparkEnv
SparkEnv是一个非常重要的变量,其内包含了许多Spark运行时的重要组件(变量),包括 MapOutputTracker、ShuffleFetcher、BlockManager等,这里是通过SparkEnv类的伴生对象SparkEnv Object内的Create方法实现的。
1 private[spark] val env = SparkEnv.create(
2 conf,
3 "",
4 conf.get("spark.driver.host"),
5 conf.get("spark.driver.port").toInt,
6 isDriver = true,
7 isLocal = isLocal,
8 listenerBus = listenerBus)
9 SparkEnv.set(env)
3、创建TaskScheduler和DAGScheduler
下面这段代码非常重要,它初始化了SparkContex里两个非常关键的变量,TaskScheduler和DAGScheduler。
1 private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
2 @volatile private[spark] var dagScheduler: DAGScheduler = _
3 try {
4 dagScheduler = new DAGScheduler(this)
5 } catch {
6 case e: Exception => throw
7 new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))
8 }
9
10 // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
11 // constructor
12 taskScheduler.start()
首先,TaskScheduler是根据Spark的运行模式进行初始化的,具体代码在SparkContext中的createTaskScheduler方法中。以Standalone模式为例,它会将sc传递给TaskSchedulerImpl,并在返回Scheduler对象之前,创建SparkDeploySchedulerBackend,并将其初始化,最后返回Scheduler对象。
1 case SPARK_REGEX(sparkUrl) =>
2 val scheduler = new TaskSchedulerImpl(sc)
3 val masterUrls = sparkUrl.split(",").map("spark://" + _)
4 val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
5 scheduler.initialize(backend)
6 scheduler
创建TaskScheduler对象后,再将TaskScheduler对象传参至DAGScheduler,用来创建DAGScheduler对象,
1 def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
2 this(
3 sc,
4 taskScheduler,
5 sc.listenerBus,
6 sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
7 sc.env.blockManager.master,
8 sc.env)
9 }
之后,再调用其start()方法将其启动,其中包括SchedulerBackend的启动。
1 override def start() {
2 backend.start()
3
4 if (!isLocal && conf.getBoolean("spark.speculation", false)) {
5 logInfo("Starting speculative execution thread")
6 import sc.env.actorSystem.dispatcher
7 sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
8 SPECULATION_INTERVAL milliseconds) {
9 Utils.tryOrExit { checkSpeculatableTasks() }
10 }
11 }
12 }
除此之外,SparkContex还包括一些重要的函数方法,例如
1、runjob
runjob是spark中所有任务提交的入口,诸如rdd中的一些常见操作和变换,都会调用SparkContex的runjob方法,提交任务。
1 def runJob[T, U: ClassTag](
2 rdd: RDD[T],
3 func: (TaskContext, Iterator[T]) => U,
4 partitions: Seq[Int],
5 allowLocal: Boolean,
6 resultHandler: (Int, U) => Unit) {
7 if (dagScheduler == null) {
8 throw new SparkException("SparkContext has been shutdown")
9 }
10 val callSite = getCallSite
11 val cleanedFunc = clean(func)
12 logInfo("Starting job: " + callSite)
13 val start = System.nanoTime
14 dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
15 resultHandler, localProperties.get)
16 logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
17 rdd.doCheckpoint()
18 }
View Code 2、textFile
从HDFS路径读取单个数据文件,首先创建HadoopRDD,通过map操作,返回RDD对象。
3、wholeTextFiles
从HDFS某个文件夹读取多个文件。
4、parallelize
读取本地文件,并转换为RDD。
运维网声明
1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网 享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com