设为首页 收藏本站
查看: 1054|回复: 0

[经验分享] Apache Spark源码走读之15

[复制链接]

尚未签到

发表于 2015-7-31 08:08:36 | 显示全部楼层 |阅读模式
  欢迎转载,转载请注明出处,徽沪一郎。

概要
  本文就standalone部署方式下的容错性问题做比较细致的分析,主要回答standalone部署方式下的包含哪些主要节点,当某一类节点出现问题时,系统是如何处理的。

Standalone部署的节点组成
DSC0000.png
  介绍Spark的资料中对于RDD这个概念涉及的比较多,但对于RDD如何运行起来,如何对应到进程和线程的,着墨的不是很多。
  在实际的生产环境中,Spark总是会以集群的方式进行运行的,其中standalone的部署方式是所有集群方式中最为精简的一种,另外是Mesos和YARN,要理解其内部运行机理,显然要花更多的时间才能了解清楚。

standalone cluster的组成
  standalone集群由三个不同级别的节点组成,分别是


  • Master 主控节点,可以类比为董事长或总舵主,在整个集群之中,最多只有一个Master处在Active状态
  • Worker 工作节点 ,这个是manager,是分舵主, 在整个集群中,可以有多个worker,如果worker为零,什么事也做不了
  • Executor 干苦力活的,直接受worker掌控,一个worker可以启动多个executor,启动的个数受限于机器中的cpu核数
  这三种不同类型的节点各自运行于自己的JVM进程之中

Driver Application
  提交到standalone集群的应用程序称之为Driver Applicaton。

Standalone集群启动及任务提交过程详解
DSC0001.png
  
  上图总结了正常情况下Standalone集群的启动以及应用提交时,各节点之间有哪些消息交互。下面分集群启动和应用提交两个过程来作详细说明。

集群启动过程
  正常启动过程如下所述

step 1: 启动master

$SPARK_HOME/sbin/start-master.sh

step 2: 启动worker

./bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077

  worker启动之后,会做两件事情


  • 将自己注册到Master, RegisterWorker
  • 定期发送心跳消息给Master

任务提交过程

step 1: 提交application
  利用如下指令来启动spark-shell

MASTER=spark://127.0.0.1:7077 $SPARK_HOME/bin/spark-shell

  运行spark-shell时,会向Master发送RegisterApplication请求
  日志位置: master运行产生的日志在$SPARK_HOME/logs目录下

step 2: Master处理RegisterApplication的请求之后
  收到RegisterApplication请求之后,Mastet会做如下处理


  • 如果有worker已经注册上来,发送LaunchExecutor指令给相应worker
  • 如果没有,则什么事也不做

step 3: 启动Executor
  Worker在收到LaunchExecutor指令之后,会启动Executor进程

step 4: 注册Executor
  启动的Executor进程会根据启动时的入参,将自己注册到Driver中的SchedulerBackend
  日志位置: executor的运行日志在$SPARK_HOME/work目录下

step 5: 运行Task
  SchedulerBackend收到Executor的注册消息之后,会将提交到的Spark Job分解为多个具体的Task,然后通过LaunchTask指令将这些Task分散到各个Executor上真正的运行
  如果在调用runJob的时候,没有任何的Executor注册到SchedulerBackend,相应的处理逻辑是什么呢?


  • SchedulerBackend会将Task存储在TaskManager中
  • 一旦有Executor注册上来,就将TaskManager管理的尚未运行的task提交到executor中
  • 如果有多个job处于pending状态,默认调度策略是FIFO,即先提交的先运行

测试步骤


  • 启动Master
  • 启动spark-shell
  • 执行 sc.textFile("README.md").count
  • 启动worker
  • 注意worker启动之后,spark-shell中打印出来的日志消息

Job执行结束
  任务运行结束时,会将相应的Executor停掉。
  可以做如下的试验


  • 停止spark-shell
  • 利用ps -ef|grep -i java查看java进程,可以发现CoarseGrainedExecutorBackend进程已经退出

小结
  通过上面的控制消息原语之间的先后顺序可以看出


  • Master和worker进程必须显式启动
  • executor是被worker隐式的带起
  • 集群的启动顺序

    • Master必须先于其它节点启动
    • worker和driver哪个先启动,无所谓
    • 但driver提交的job只有在有相应的worker注册到Master之后才可以被真正的执行


异常场景分析
  上面说明的是正常情况下,各节点的消息分发细节。那么如果在运行中,集群中的某些节点出现了问题,整个集群是否还能够正常处理Application中的任务呢?

异常分析1: worker异常退出
DSC0002.png
  在Spark运行过程中,经常碰到的问题就是worker异常退出,当worker退出时,整个集群会有哪些故事发生呢? 请看下面的具体描述


  • worker异常退出,比如说有意识的通过kill指令将worker杀死
  • worker在退出之前,会将自己所管控的所有小弟executor全干掉
  • worker需要定期向master改善心跳消息的,现在worker进程都已经玩完了,哪有心跳消息,所以Master会在超时处理中意识到有一个“分舵”离开了
  • Master非常伤心,伤心的Master将情况汇报给了相应的Driver
  • Driver通过两方面确认分配给自己的Executor不幸离开了,一是Master发送过来的通知,二是Driver没有在规定时间内收到Executor的StatusUpdate,于是Driver会将注册的Executor移除

后果分析
  worker异常退出会带来哪些影响


  • executor退出导致提交的task无法正常结束,会被再一次提交运行
  • 如果所有的worker都异常退出,则整个集群不可用
  • 需要有相应的程序来重启worker进程,比如使用supervisordrunit

测试步骤


  • 启动Master
  • 启动worker
  • 启动spark-shell
  • 手工kill掉worker进程
  • 用jps或ps -ef|grep -i java来查看启动着的java进程

异常退出的代码处理
  定义于ExecutorRunner.scala的start函数

def start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
override def run() {
killProcess(Some("Worker shutting down"))
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
}

  killProcess的过程就是停止相应CoarseGrainedExecutorBackend的过程。
  worker停止的时候,一定要先将自己启动的Executor停止掉。这是不是很像水浒中宋江的手段,李逵就是这样不明不白的把命给丢了。

小结
  需要特别指出的是,当worker在启动Executor的时候,是通过ExecutorRunner来完成的,ExecutorRunner是一个独立的线程,和Executor是一对一的关系,这很重要。Executor作为一个独立的进程在运行,但会受到ExecutorRunner的严密监控。

异常分析2: executor异常退出
DSC0003.png
  Executor作为Standalone集群部署方式下的最底层员工,一旦异常退出,其后果会是什么呢?


  • executor异常退出,ExecutorRunner注意到异常,将情况通过ExecutorStateChanged汇报给Master
  • Master收到通知之后,非常不高兴,尽然有小弟要跑路,那还了得,要求Executor所属的worker再次启动
  • Worker收到LaunchExecutor指令,再次启动executor
  作为一名底层员工,想轻易摞挑子不干是不成的。"人在江湖,身不由己“啊。

测试步骤


  • 启动Master
  • 启动Worker
  • 启动spark-shell
  • 手工kill掉CoarseGrainedExecutorBackend

fetchAndRunExecutor
  fetchAndRunExecutor负责启动具体的Executor,并监控其运行状态,具体代码逻辑如下所示

def fetchAndRunExecutor() {
try {
// Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
// Launch the process
val command = getCommandSeq
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
val builder = new ProcessBuilder(command: _*).directory(executorDir)
val env = builder.environment()
for ((key, value)  {
logInfo("Runner thread for executor " + fullId + " interrupted")
state = ExecutorState.KILLED
killProcess(None)
}
case e: Exception => {
logError("Error running executor", e)
state = ExecutorState.FAILED
killProcess(Some(e.toString))
}
}
}

异常分析3: master 异常退出
DSC0004.png
  worker和executor异常退出的场景都讲到了,我们剩下最后一种情况了,master挂掉了怎么办?
  带头大哥如果不在了,会是什么后果呢?


  • worker没有汇报的对象了,也就是如果executor再次跑飞,worker是不会将executor启动起来的,大哥没给指令
  • 无法向集群提交新的任务
  • 老的任务即便结束了,占用的资源也无法清除,因为资源清除的指令是Master发出的
  怎么样,知道后果很严重了吧?别看老大平时不干活,要真的不在,仅凭小弟们是不行的。

Master单点失效问题的解决
  那么怎么解决Master单点失效的问题呢?
  你说再加一个Master就是了,两个老大。两个老大如果同时具有指挥权,结果也将是灾难性的。设立一个副职人员,当目前的正职挂掉之后,副职接管。也就是同一时刻,有且只有一个active master。
  注意不错,如何实现呢?使用zookeeper的ElectLeader功能,效果图如下
DSC0005.png

配置细节
  如何搭建zookeeper集群,这里不再废话,哪天有空的话再整一整,或者可以参考写的storm系列中谈到的zookeeper的集群安装步骤。
  假设zookeeper集群已经设置成功,那么如何启动standalone集群中的节点呢?有哪些特别的地方?

conf/spark-env.sh
  在conf/spark-env.sh中,为SPARK_DAEMON_JAVA_OPTS添加如下选项

System propertyMeaning
spark.deploy.recoveryModeSet to ZOOKEEPER to enable standby Master recovery mode (default: NONE).
spark.deploy.zookeeper.urlThe ZooKeeper cluster url (e.g., 192.168.1.100:2181,192.168.1.101:2181).
spark.deploy.zookeeper.dirThe directory in ZooKeeper to store recovery state (default: /spark).
  设置SPARK_DAEMON_JAVA_OPTS的实际例子

SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.deploy.recoveryMode=ZOOKEEPER"

应用程序启动
  应用程序运行的时候,指定多个master地址,用逗号分开,如下所示

MASTER=spark://192.168.100.101:7077,spark://192.168.100.102:7077 bin/spark-shell

小结
  Standalone集群部署方式下的容错性分析让我们对于Spark的任务分发过程又有了进一处的认识。前面的篇章从整体上匆匆过了一遍Spark所涉及的知识点,分析的不够深,不够细。
  此篇尝试着就某一具体问题做深入的分析。套用书画中的说法,在框架分析的时候,我们可以”大开大合,疏可走马,计白当黑“,在细节分析的时候,又要做到“密不透风,条分缕析,层层递进”。
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-92445-1-1.html 上篇帖子: Intro to the Apache table API 下篇帖子: Apache 模块 mod_cache应用
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表