设为首页 收藏本站
查看: 953|回复: 0

[经验分享] Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源码实现

[复制链接]

尚未签到

发表于 2017-4-19 12:39:12 | 显示全部楼层 |阅读模式
  如果Spark的部署方式选择Standalone,一个采用Master/Slaves的典型架构,那么Master是有SPOF(单点故障,Single Point of Failure)。Spark可以选用ZooKeeper来实现HA。
     ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master但是只有一个是Active的,其他的都是Standby,当Active的Master出现故障时,另外的一个Standby Master会被选举出来。由于集群的信息,包括Worker, Driver和Application的信息都已经持久化到文件系统,因此在切换的过程中只会影响新Job的提交,对于正在进行的Job没有任何的影响。加入ZooKeeper的集群整体架构如下图所示。
DSC0000.jpg
 
1. Master的重启策略

Master在启动时,会根据启动参数来决定不同的Master故障重启策略:

  • ZOOKEEPER实现HA
  • FILESYSTEM:实现Master无数据丢失重启,集群的运行时数据会保存到本地/网络文件系统上

  • 丢弃所有原来的数据重启

Master::preStart()可以看出这三种不同逻辑的实现。
 
[java] view plaincopy 



  • override def preStart() {  
  •     logInfo("Starting Spark master at " + masterUrl)  
  •     ...  
  •     //persistenceEngine是持久化Worker,Driver和Application信息的,这样在Master重新启动时不会影响  
  •     //已经提交Job的运行  
  •     persistenceEngine = RECOVERY_MODE match {  
  •       case "ZOOKEEPER" =>  
  •         logInfo("Persisting recovery state to ZooKeeper")  
  •         new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)  
  •       case "FILESYSTEM" =>  
  •         logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)  
  •         new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))  
  •       case _ =>  
  •         new BlackHolePersistenceEngine()  
  •     }  
  •     //leaderElectionAgent负责Leader的选取。  
  •     leaderElectionAgent = RECOVERY_MODE match {  
  •         case "ZOOKEEPER" =>  
  •           context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))  
  •         case _ => // 仅仅有一个Master的集群,那么当前的Master就是Active的  
  •           context.actorOf(Props(classOf[MonarchyLeaderAgent], self))  
  •       }  
  •   }  

 
RECOVERY_MODE是一个字符串,可以从spark-env.sh中去设置。
 
[java] view plaincopy 



  • val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode""NONE")  

 
如果不设置spark.deploy.recoveryMode的话,那么集群的所有运行数据在Master重启是都会丢失,这个结论是从BlackHolePersistenceEngine的实现得出的。
 
[java] view plaincopy 



  • private[spark] class BlackHolePersistenceEngine extends PersistenceEngine {  
  •   override def addApplication(app: ApplicationInfo) {}  
  •   override def removeApplication(app: ApplicationInfo) {}  
  •   override def addWorker(worker: WorkerInfo) {}  
  •   override def removeWorker(worker: WorkerInfo) {}  
  •   override def addDriver(driver: DriverInfo) {}  
  •   override def removeDriver(driver: DriverInfo) {}  
  •   
  •   override def readPersistedData() = (Nil, Nil, Nil)  
  • }  

 
它把所有的接口实现为空。PersistenceEngine是一个trait。作为对比,可以看一下ZooKeeper的实现。
 
[java] view plaincopy 



  • class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)  
  •   extends PersistenceEngine  
  •   with Logging  
  • {  
  •   val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir""/spark") + "/master_status"  
  •   val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)  
  •   
  •   SparkCuratorUtil.mkdir(zk, WORKING_DIR)  
  •   // 将app的信息序列化到文件WORKING_DIR/app_{app.id}中  
  •   override def addApplication(app: ApplicationInfo) {  
  •     serializeIntoFile(WORKING_DIR + "/app_" + app.id, app)  
  •   }  
  •   
  •   override def removeApplication(app: ApplicationInfo) {  
  •     zk.delete().forPath(WORKING_DIR + "/app_" + app.id)  
  •   }  

 
Spark使用的并不是ZooKeeper的API,而是使用的org.apache.curator.framework.CuratorFramework 和 org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch} 。Curator在ZooKeeper上做了一层很友好的封装。
 
2. 集群启动参数的配置
简单总结一下参数的设置,通过上述代码的分析,我们知道为了使用ZooKeeper至少应该设置一下参数(实际上,仅仅需要设置这些参数。通过设置spark-env.sh:
 
[java] view plaincopy 



  • spark.deploy.recoveryMode=ZOOKEEPER  
  • spark.deploy.zookeeper.url=zk_server_1:2181,zk_server_2:2181  
  • spark.deploy.zookeeper.dir=/dir     
  • // OR 通过一下方式设置  
  • export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER "  
  • export SPARK_DAEMON_JAVA_OPTS="${SPARK_DAEMON_JAVA_OPTS} -Dspark.deploy.zookeeper.url=zk_server1:2181,zk_server_2:2181"  

 
各个参数的意义:
参数
默认值
含义
spark.deploy.recoveryMode
NONE
恢复模式(Master重新启动的模式),有三种:1, ZooKeeper, 2, FileSystem, 3 NONE
spark.deploy.zookeeper.url
 ZooKeeper的Server地址
spark.deploy.zookeeper.dir
/spark
ZooKeeper 保存集群元数据信息的文件目录,包括Worker,Driver和Application。
 
3. CuratorFramework简介 
CuratorFramework极大的简化了ZooKeeper的使用,它提供了high-level的API,并且基于ZooKeeper添加了很多特性,包括

  • 自动连接管理:连接到ZooKeeper的Client有可能会连接中断,Curator处理了这种情况,对于Client来说自动重连是透明的。
  • 简洁的API:简化了原生态的ZooKeeper的方法,事件等;提供了一个简单易用的接口。
  • Recipe的实现(更多介绍请点击Recipes):

    • Leader的选择
    • 共享锁
    • 缓存和监控
    • 分布式的队列
    • 分布式的优先队列

 
CuratorFrameworks通过CuratorFrameworkFactory来创建线程安全的ZooKeeper的实例。
CuratorFrameworkFactory.newClient()提供了一个简单的方式来创建ZooKeeper的实例,可以传入不同的参数来对实例进行完全的控制。获取实例后,必须通过start()来启动这个实例,在结束时,需要调用close()。
 
[java] view plaincopy 



  • /** 
  •      * Create a new client 
  •      * 
  •      * 
  •      * @param connectString list of servers to connect to 
  •      * @param sessionTimeoutMs session timeout 
  •      * @param connectionTimeoutMs connection timeout 
  •      * @param retryPolicy retry policy to use 
  •      * @return client 
  •      */  
  •     public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)  
  •     {  
  •         return builder().  
  •             connectString(connectString).  
  •             sessionTimeoutMs(sessionTimeoutMs).  
  •             connectionTimeoutMs(connectionTimeoutMs).  
  •             retryPolicy(retryPolicy).  
  •             build();  
  •     }  

  
需要关注的还有两个Recipe:org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch}。

 
首先看一下LeaderlatchListener,它在LeaderLatch状态变化的时候被通知:

  • 在该节点被选为Leader的时候,接口isLeader()会被调用
  • 在节点被剥夺Leader的时候,接口notLeader()会被调用
由于通知是异步的,因此有可能在接口被调用的时候,这个状态是准确的,需要确认一下LeaderLatch的hasLeadership()是否的确是true/false。这一点在接下来Spark的实现中可以得到体现。
 
[java] view plaincopy 



  • /** 
  • * LeaderLatchListener can be used to be notified asynchronously about when the state of the LeaderLatch has changed. 
  • * 
  • * Note that just because you are in the middle of one of these method calls, it does not necessarily mean that 
  • * hasLeadership() is the corresponding true/false value. It is possible for the state to change behind the scenes 
  • * before these methods get called. The contract is that if that happens, you should see another call to the other 
  • * method pretty quickly. 
  • */  
  • public interface LeaderLatchListener  
  • {  
  •   /** 
  • * This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership = true. 
  • * 
  • * Note that it is possible that by the time this method call happens, hasLeadership has fallen back to false. If 
  • * this occurs, you can expect {@link #notLeader()} to also be called. 
  • */  
  •   public void isLeader();  
  •   
  •   /** 
  • * This is called when the LeaderLatch's state goes from hasLeadership = true to hasLeadership = false. 
  • * 
  • * Note that it is possible that by the time this method call happens, hasLeadership has become true. If 
  • * this occurs, you can expect {@link #isLeader()} to also be called. 
  • */  
  •   public void notLeader();  
  • }  

LeaderLatch负责在众多连接到ZooKeeper Cluster的竞争者中选择一个Leader。Leader的选择机制可以看ZooKeeper的具体实现,LeaderLatch这是完成了很好的封装。我们只需要要知道在初始化它的实例后,需要通过
 
[java] view plaincopy 



  • public class LeaderLatch implements Closeable  
  • {  
  •     private final Logger log = LoggerFactory.getLogger(getClass());  
  •     private final CuratorFramework client;  
  •     private final String latchPath;  
  •     private final String id;  
  •     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);  
  •     private final AtomicBoolean hasLeadership = new AtomicBoolean(false);  
  •     private final AtomicReference<String> ourPath = new AtomicReference<String>();  
  •     private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();  
  •     private final CloseMode closeMode;  
  •     private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>();  
  • .  
  • .  
  • .  
  •     /** 
  •      * Attaches a listener to this LeaderLatch 
  •      * <p/> 
  •      * Attaching the same listener multiple times is a noop from the second time on. 
  •      * <p/> 
  •      * All methods for the listener are run using the provided Executor.  It is common to pass in a single-threaded 
  •      * executor so that you can be certain that listener methods are called in sequence, but if you are fine with 
  •      * them being called out of order you are welcome to use multiple threads. 
  •      * 
  •      * @param listener the listener to attach 
  •      */  
  •     public void addListener(LeaderLatchListener listener)  
  •     {  
  •         listeners.addListener(listener);  
  •     }  

 
 
通过addListener可以将我们实现的Listener添加到LeaderLatch。在Listener里,我们在两个接口里实现了被选为Leader或者被剥夺Leader角色时的逻辑即可。
 
 
4. ZooKeeperLeaderElectionAgent的实现
实际上因为有Curator的存在,Spark实现Master的HA就变得非常简单了,ZooKeeperLeaderElectionAgent实现了接口LeaderLatchListener,在isLeader()确认所属的Master被选为Leader后,向Master发送消息ElectedLeader,Master会将自己的状态改为ALIVE。当noLeader()被调用时,它会向Master发送消息RevokedLeadership时,Master会关闭。

 
[java] view plaincopy 



  • private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,  
  •     masterUrl: String, conf: SparkConf)  
  •   extends LeaderElectionAgent with LeaderLatchListener with Logging  {  
  •   val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir""/spark") + "/leader_election"  
  •   // zk是通过CuratorFrameworkFactory创建的ZooKeeper实例  
  •   private var zk: CuratorFramework = _  
  •   // leaderLatch:Curator负责选出Leader。  
  •   private var leaderLatch: LeaderLatch = _  
  •   private var status = LeadershipStatus.NOT_LEADER  
  •   
  •   override def preStart() {  
  •   
  •     logInfo("Starting ZooKeeper LeaderElection agent")  
  •     zk = SparkCuratorUtil.newClient(conf)  
  •     leaderLatch = new LeaderLatch(zk, WORKING_DIR)  
  •     leaderLatch.addListener(this)  
  •   
  •     leaderLatch.start()  
  •   }  

 
 
在prestart中,启动了leaderLatch来处理选举ZK中的Leader。就如在上节分析的,主要的逻辑在isLeader和noLeader中。
[java] view plaincopy 



  • override def isLeader() {  
  •   synchronized {  
  •     // could have lost leadership by now.  
  •     //现在leadership可能已经被剥夺了。。详情参见Curator的实现。  
  •     if (!leaderLatch.hasLeadership) {  
  •       return  
  •     }  
  •   
  •     logInfo("We have gained leadership")  
  •     updateLeadershipStatus(true)  
  •   }  
  • }  
  •   
  • override def notLeader() {  
  •   synchronized {  
  •     // 现在可能赋予leadership了。详情参见Curator的实现。  
  •     if (leaderLatch.hasLeadership) {  
  •       return  
  •     }  
  •   
  •     logInfo("We have lost leadership")  
  •     updateLeadershipStatus(false)  
  •   }  
  • }  

updateLeadershipStatus的逻辑很简单,就是向Master发送消息。
 
[java] view plaincopy 



  • def updateLeadershipStatus(isLeader: Boolean) {  
  •     if (isLeader && status == LeadershipStatus.NOT_LEADER) {  
  •       status = LeadershipStatus.LEADER  
  •       masterActor ! ElectedLeader  
  •     } else if (!isLeader && status == LeadershipStatus.LEADER) {  
  •       status = LeadershipStatus.NOT_LEADER  
  •       masterActor ! RevokedLeadership  
  •     }  
  •   }  

 
5. 设计理念
为了解决Standalone模式下的Master的SPOF,Spark采用了ZooKeeper提供的选举功能。Spark并没有采用ZooKeeper原生的Java API,而是采用了Curator,一个对ZooKeeper进行了封装的框架。采用了Curator后,Spark不用管理与ZooKeeper的连接,这些对于Spark来说都是透明的。Spark仅仅使用了100行代码,就实现了Master的HA。当然了,Spark是站在的巨人的肩膀上。谁又会去重复发明轮子呢?
  http://blog.csdn.net/anzhsoft/article/details/33740737

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-366423-1-1.html 上篇帖子: [ZooKeeper]连接中断,watch恢复,心跳和客户端超时 下篇帖子: ZooKeeper伪分布式集群安装及简单使用 java api
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表