设为首页 收藏本站
查看: 990|回复: 0

[经验分享] 【原】Spark中Master源码分析(一)

[复制链接]
发表于 2017-3-2 11:14:32 | 显示全部楼层 |阅读模式
  Master作为集群的Manager,对于集群的健壮运行发挥着十分重要的作用。下面,我们一起了解一下Master是听从Client(Leader)的号召,如何管理好Worker的吧。
1.家当(静态属性)
  1.设置一个守护单线程的消息发送器,
  private val forwardMessageThread =
  ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
  2.根据sparkConf得到hadoopConf
  private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
  3.一个bool类型的标识,如果设置为true,那么app的执行将会尽量分步到尽可能多的worker上,否则app的执行将会先用完一个worker的资源,然后再使用下一个worker的资源
  private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
  4.设置执行app默认的最大核数为Int类型的最大值
  private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
  5.还有一些关于worker、driver、app等的字段信息,都比较简单,限于篇幅限制就不一一列出了
2.技能(方法)
  由于Master上本质上是一个RpcEndpoint,所以我们按照它的生命周期进行介绍。如果不明白,请看文章
  Spark Rpc通信源码分析 http://www.cnblogs.com/yourarebest/p/5297157.html
  1.构造函数就是Master默认的主构造器
  2.onStart方法,主要功能是启动Jetty的WebUI服务,Rest服务、选出持久化引擎及持久化代理
  
  override def onStart(): Unit = {
  logInfo("Starting Spark master at " + masterUrl)
  logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
  webUi = new MasterWebUI(this, webUiPort)
  //启动JettyServer并绑定webUI端口号
  webUi.bind()
  masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
  //forwardMessageThread线程每1min中检查Worker是否宕了
  checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
  override def run(): Unit = Utils.tryLogNonFatalError {
  self.send(CheckForWorkerTimeOut)
  }
  }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
  //启动Rest服务,默认端口6066
  if (restServerEnabled) {
  val port = conf.getInt("spark.master.rest.port", 6066)
  restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
  }
  //返回绑定的端口号
  restServerBoundPort = restServer.map(.start())
  masterMetricsSystem.registerSource(masterSource)
  masterMetricsSystem.start()
  applicationMetricsSystem.start()
  //当metrics系统启动后,将master和app的metrics servlet的hadnler给webui
  masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
  applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
  //序列化Spark的配置文件
  val serializer = new JavaSerializer(conf)
  //支持三种持久化引擎,将Spark的配置参数持久化,便于以后恢复使用
  val (persistenceEngine, leaderElectionAgent_) = RECOVERY_MODE match {
  case "ZOOKEEPER" =>
  logInfo("Persisting recovery state to ZooKeeper")
  val zkFactory =
  new ZooKeeperRecoveryModeFactory(conf, serializer)
  (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
  case "FILESYSTEM" =>
  val fsFactory =
  new FileSystemRecoveryModeFactory(conf, serializer)
  (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
  case "CUSTOM" =>
  val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))

  val factory = clazz.getConstructor(classOf[SparkConf],>  .newInstance(conf, serializer)
  .asInstanceOf[StandaloneRecoveryModeFactory]
  (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
  case _ =>
  (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
  }
  persistenceEngine = persistenceEngine_
  leaderElectionAgent = leaderElectionAgent_
  }
  3.onStop方法,停止master的metrics系统、停止app的metrics系统、取消异步执行的任务、停止WebUi服务、停止rest服务以及持久化引擎和选举代理的停止。
  
  override def onStop() {
  masterMetricsSystem.report()
  applicationMetricsSystem.report()
  //避免异步发出的CompleteRecovery消息导致master的重启
  if (recoveryCompletionTask != null) {
  recoveryCompletionTask.cancel(true)
  }
  if (checkForWorkerTimeOutTask != null) {
  checkForWorkerTimeOutTask.cancel(true)
  }
  forwardMessageThread.shutdownNow()
  webUi.stop()
  restServer.foreach(_.stop())
  masterMetricsSystem.stop()
  applicationMetricsSystem.stop()
  persistenceEngine.close()
  leaderElectionAgent.stop()
  }
  还有一个重要的方法receive方法,留到下一篇吧。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-349243-1-1.html 上篇帖子: Red5源代码分析 下篇帖子: CXF学习(2) helloworld
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表