设为首页 收藏本站
查看: 955|回复: 0

[经验分享] Hadoop目前的HA(High Availability)机制分析和源代码研究

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2014-6-6 10:33:21 | 显示全部楼层 |阅读模式

Hadoop的设计初衷是服务于off-line的数据存储和处理应用。随着这个产品的不断成熟和发展,对于支持on-line应用的需求越来越强烈。例如HBase已经被Facebook和淘宝用到了在线存储应用中。所以Hadoop的on-line化也是一个趋势。目前制约Hadoop作为on-line存储和处理的瓶颈主要是系统的availability。衡量一个分布式系统的主要指标有:reliability, availability & scalability。Hadoop可以做到横向扩展,所以scalability非常好;而用户存在Hadoop里的数据几乎不会丢失,所以reliability也是非常不错的;目前的主要问题在availability,也就是用户向HDFS集群请求数据的时候集群是否能够保证100%提供服务,目前的主要问题体现在HDFS的SPOF(single point of failure),整个HDFS集群的启动/重启时间非常长,配置参数无法动态更改等。这些方面都是apache社区目前工作的重点,本文主要讨论HDFS NameNode的SPOF问题相关的HA机制。

Hadoop目前的trunk中的代码已经merge了原来的ha-branch,所以现在的trunk中的代码已经实现了基本的HA机制的功能。Hadoop PMC的人表示将会在后面的版本中发布这个功能。下面这张图是目前的HDFS HA的实现逻辑。

120913111826701.png

Right now the HA branch supports HOT-Failover, except that it is manual failover. We are now moving into a phase to implement automatic failover.

Significant enhancements were completed to make HOT Failover work:
- Configuration changes for HA
- Notion of active and standby states were added to the Namenode
- Client-side redirection
- Standby processing editlogs form Active
- Dual block reports to Active and Standby.

这是Hadoop mailing list中关于目前HA现状的阐述。下面首先简单介绍下这5个方面是怎么实现的,后面从源代码的角度分析具体的实现细节。

(1) Configuration changes for HA

在配置文件中会增加关于HA配置的参数,具体参数配置可以参考CDH4 Beta 2 High Availability Guide,这里介绍一些比较重要的参数。

例如dfs.ha.namenodes.[nameservice ID]这个参数表示在[nameservice ID]这个nameservice下的两台NameNode(分别作为Active和Standby模式运行)的主机名。然后针对每一台NN配置其对应的dfs.namenode.rpc-address.[nameservice ID].[name node ID]用来标示每一台NN。

由于目前的两台主机之间的HA机制是通过一个共享存储来存放editlog来实现的。所以需要配置参数dfs.namenode.shared.edits.dir表示共享存储的位置,一般是通过NFS挂载的形式,所以其实这个参数的值就是一个本地文件系统中的目录。

dfs.client.failover.proxy.provider.[nameservice ID]这个参数指定具体的failover proxy provider类,也就是在client端发现原来Active的NameNode变成了Standby模式时(在client发送RPC请求时返回了StandbyException时),该如何去连接当前Active的NameNode。目前的Hadoop里只有一个具体实现策略ConfiguredFailoverProxyProvider,实现方法就是如果client failover时,下次把RPC发送给另外一个NameNode的proxy。

另外就是dfs.ha.fencing.methods参数,指定在Active NameNode切换到Standby模式时,确保切换成功或者进程被杀死。
(2) Notion of active and standby states were added to the Namenode

有两种模式的NameNode,分别是Active和Standby模式。Active模式的NameNode接受client的RPC请求并处理,同时写自己的Editlog和共享存储上的Editlog,接收DataNode的Block report, block location updates和heartbeat;Standby模式的NameNode同样会接到来自DataNode的Block report, block location updates和heartbeat,同时会从共享存储的Editlog上读取并执行这些log操作,使得自己的NameNode中的元数据(Namespcae information + Block locations map)都是和Active NameNode中的元数据是同步的。所以说Standby模式的NameNode是一个热备(Hot Standby NameNode),一旦切换成Active模式,马上就可以提供NameNode服务。

(3) Client-side redirection

Client的通过RPC的Proxy与NameNode交互。在client端会有两个代理同时存在,分别代表与Active和Standby的NameNode的连接。由于Client端有Retry机制,当与Active NameNode正常通信的client proxy收到RPC返回的StandbyException时,说明这个Active NameNode已经变成了Standby模式,所以触发dfs.client.failover.proxy.provider.[nameservice ID]这个参数指定的类来做failover,目前唯一的实现是ConfiguredFailoverProxyProvider,实现方法就是下次开始把RPC发向另外一个NameNode。此后的RPC都是发往另外一个NameNode,也就是NameNode发生了主从切换。

public synchronized void performFailover(T currentProxy) {

currentProxyIndex = (currentProxyIndex + 1) % proxies.size();

}

(4) Standby processing editlogs form Active

开启Standby模式后,Standby NameNode会通过EditLogTailerThread从共享存储中读取Active NameNode写到那里的Editlog,然后执行操作,从而保持自己的元数据是最新的,所以说是热备。

(5)Dual block reports to Active and Standby.

DataNode的Block report, block location updates和heartbeat等RPC操作会发向两个NameNode,从而使得两个NameNode的Block locations map都是最新的,这样可以做到切换主从后原来的从(新的主)不再需要block report的时间。

可以看出client与NameNode之间的RPC是只向一个NameNode发送的(收到StandbyException后才会重试另外一个);而DataNode与NameNode之间的RPC在任何时候都是同时向两个NameNode发送的。

下一篇文章将从代码的角度来分析HDFS的HA机制。http://www.iyunv.com/Linux/2012-09/70414p2.htm

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------

前一篇文章分析了Hadoop High Availability的思路和主要功能,这篇文章中从代码的角度分析具体的实现。

(1)NameNode启动流程

对于HDFS HA机制来说,NameNode是核心,NameNode有Active和Standby两种状态。在NameNode的构造函数中,读取配置文件,如果配置文件配置了开启HA,那么NameNode进入STANDBY_STATE状态;反之则进入ACTIVE_STATE状态。

this.haEnabled = HAUtil.isHAEnabled(conf, nsId);

if (!haEnabled) {

state = ACTIVE_STATE;

} else {

state = STANDBY_STATE;

}

创建HA上下文,NameNodeHAContext类包含了NameNode的Active和Standby模式变换相关操作的函数实现。

this.haContext = createHAContext();

然后就是初始化操作,包括配置参数,RPC server,metrics,加载Namespace,然后进入当前的haContext

try {

initializeGenericKeys(conf, nsId, namenodeId);

initialize(conf);

state.prepareToEnterState(haContext);

state.enterState(haContext);

} catch (IOException e) {

this.stop();

throw e;

} catch (HadoopIllegalArgumentException e) {

this.stop();

throw e;

}

(2)管理员执行HA管理命令流程

当两个NameNode都已经启动并进入Standby模式之后,就可以通过bin/hdfs脚本执行HDFS管理功能,执行如下命令:

bin/hdfs haadmin

就会调用DFSHAAdmin.java这个类来执行用户指定的功能,例如:

bin/hdfs haadmin -transitionToActive serviceId

bin/hdfs haadmin -transitionToStandby serviceId

bin/hdfs haadmin -failover serviceId serviceId

以-failover为例,就会调用HAAdmin.java类中的failover方法。

private int failover(final String[] argv)

throws IOException, ServiceFailedException {

boolean forceFence = false;

boolean forceActive = false;

Options failoverOpts = new Options();

failoverOpts.addOption(“failover”, false, “failover”);

failoverOpts.addOption(FORCEFENCE, false, “force fencing”);

failoverOpts.addOption(FORCEACTIVE, false, “force failover”);

CommandLineParser parser = new GnuParser();

CommandLine cmd;

try {

cmd = parser.parse(failoverOpts, argv);

forceFence = cmd.hasOption(FORCEFENCE);

forceActive = cmd.hasOption(FORCEACTIVE);

} catch (ParseException pe) {

errOut.println(“failover: incorrect arguments”);

printUsage(errOut, “-failover”);

return -1;

}

int numOpts = cmd.getOptions() == null ? 0 : cmd.getOptions().length;

final String[] args = cmd.getArgs();

if (numOpts > 2 || args.length != 2) {

errOut.println(“failover: incorrect arguments”);

printUsage(errOut, “-failover”);

return -1;

}

HAServiceTarget fromNode = resolveTarget(args[0]);

HAServiceTarget toNode = resolveTarget(args[1]);

FailoverController fc = new FailoverController(getConf());

try {

fc.failover(fromNode, toNode, forceFence, forceActive);

out.println(“Failover from “+args[0]+” to “+args[1]+” successful”);

} catch (FailoverFailedException ffe) {

errOut.println(“Failover failed: ” + ffe.getLocalizedMessage());

return -1;

}

return 0;

}

在这个函数中首先解析参数,然后会生成两个HAServiceTarget,分别表示发生主从切换的两个NameNode。由于这个DFSHAAdmin命令可以在任何一台可以连接到集群中的机器上运行,所以HAServiceTarget实际上是发生主从切换的两个NameNode的代理的封装。这个代理与两个NameNode通信的RPC协议时HAServiceProtocol。目前的Hadoop的RPC已经默认了Protocol Buffer作为RPC的实现。

然后生成FailoverController对象,这个类就是用于控制主从切换的。然后执行这个类中的failover方法。

public void failover(HAServiceTarget fromSvc,

HAServiceTarget toSvc,

boolean forceFence,

boolean forceActive)

throws FailoverFailedException {

Preconditions.checkArgument(fromSvc.getFencer() != null,

“failover requires a fencer”); //强制需要一种fencing方法

// Failover前的检查,例如fromSvc和toSvc是不是同一NameNode,toSvc是不是已经处于Active状态等

preFailoverChecks(fromSvc, toSvc, forceActive);

// 第一步是先把fromSvc转换成standby模式

boolean tryFence = true;

// 通过向fromSvc的发送HAServiceProtocol的方式使得fromSvc transition to Standby mode

// 如果这个RPC返回的结果是ServiceFailedException或者IOException,

// 那么说明transition fail,从而是否要tryFence就是true,就必须fencing了。

if (tryGracefulFence(fromSvc)) {

tryFence = forceFence;

}

// Fence fromSvc if it’s required or forced by the user

if (tryFence) {

if (!fromSvc.getFencer().fence(fromSvc)) {

throw new FailoverFailedException(“Unable to fence ” +

fromSvc + “. Fencing failed.”);

}

}

// 第二步就是让toSvc转换成active模式,操作方法和上面类似,

// 通过RPC给toSvc发送transitionToActive命令。

boolean failed = false;

Throwable cause = null;

try {

HAServiceProtocolHelper.transitionToActive(

toSvc.getProxy(conf, rpcTimeoutToNewActive));

} catch (ServiceFailedException sfe) {

LOG.error(“Unable to make ” + toSvc + ” active (” +

sfe.getMessage() + “). Failing back.”);

failed = true;

cause = sfe;

} catch (IOException ioe) {

LOG.error(“Unable to make ” + toSvc +

” active (unable to connect). Failing back.”, ioe);

failed = true;

cause = ioe;

}

// 如果我们在第二步的时候,把toSvc转换成Active模式失败,需要考虑回滚。

// 如果我们没有强制fencing原来的fromSvc,那么就回滚。

// 如果我们强制fencing掉原来的fromSvc,那么只能抛异常了。

if (failed) {

String msg = “Unable to failover to ” + toSvc;

// Only try to failback if we didn’t fence fromSvc

if (!tryFence) {

try {

// Unconditionally fence toSvc in case it is still trying to

// become active, eg we timed out waiting for its response.

// Unconditionally force fromSvc to become active since it

// was previously active when we initiated failover.

failover(toSvc, fromSvc, true, true);

} catch (FailoverFailedException ffe) {

msg += “. Failback to ” + fromSvc +

” failed (” + ffe.getMessage() + “)”;

LOG.fatal(msg);

}

}

throw new FailoverFailedException(msg, cause);

}

(3)NameNode端的HA状态切换执行的操作代码

以上是DFSHAAdmin的操作,那么当它把对应的RPC命令发送到NameNode时,NameNode端的逻辑是怎么处理的呢?新的机遇Protocol Buffer实现的RPC请求在NameNode端会调用NameNodeRpcServer.java类中的方法。

@Override // HAServiceProtocol

public synchronized void transitionToActive()

throws ServiceFailedException, AccessControlException {

nn.transitionToActive();

}

@Override // HAServiceProtocol

public synchronized void transitionToStandby()

throws ServiceFailedException, AccessControlException {

nn.transitionToStandby();

}

@Override // HAServiceProtocol

public synchronized HAServiceStatus getServiceStatus()

throws AccessControlException, ServiceFailedException {

return nn.getServiceStatus();

}
然后调用NameNode.java类中对应的方法,把NameNode的HAState置为Active或者Standby。NameNode处于这两种不同的State会有不同的代码执行逻辑。
synchronized void transitionToActive()

throws ServiceFailedException, AccessControlException {

namesystem.checkSuperuserPrivilege();

if (!haEnabled) {

throw new ServiceFailedException(“HA for namenode is not enabled”);

}

state.setState(haContext, ACTIVE_STATE);

}



无论当前的state是哪种,state.setState()函数都会调用setStateInternal()函数,执行状态切换。
protected final void setStateInternal(final HAContext context, final HAState s)

throws ServiceFailedException {

prepareToExitState(context);

s.prepareToEnterState(context);

context.writeLock();

try {

exitState(context);

context.setState(s);

s.enterState(context);

} finally {

context.writeUnlock();

}


prepareToExitState(context)函数目前只有当前处于StandbyState的实现,用于取消Standby NameNode正在进行的checkpoint操作(StandbyCheckpointer类),不需要锁住NameNodeHAContext。下面是prepareToExitState()函数的调用关系。
s.prepareToEnterState(context)函数目前没有执行任何操作。
Exit和Enter的准备工作做好之后,就要真正进行状态切换了。在进行状态切换的时候Namespace是要加锁的,不允许同时还有其他的状态切换。下面以Standby模式向Active模式切换为例说明。
exitState(context)调用NameNodeHAContext.stopStandbyService()函数,然后调用FSNamesystem.stopStandbyService()函数。
void stopStandbyServices() throws IOException {

LOG.info(“Stopping services started for standby state”);

//关闭standbyCheckpointer线程,这个线程负责定期把共享存储上的Editlog合并成FsImage,

//保存在本地存储并且发送到Active NameNode

if (standbyCheckpointer != null) {

standbyCheckpointer.stop();

}

//由于要切换到Active模式,不需要EditLogTailer线程,所以关闭这个线程。

if (editLogTailer != null) {

editLogTailer.stop();

}

//由于要切换到Active模式,需要开启新的EditLog文件,所以关闭旧的文件输出流

if (dir != null && dir.fsImage != null && dir.fsImage.editLog != null) {

dir.fsImage.editLog.close();

}

s.enterState(context)的调用流程类似,最终会走到FSNamesystem.startActiveServices()方法中。
void startActiveServices() throws IOException {

LOG.info(“Starting services required for active state”);

writeLock();

try {

// 开启新的EditLog输出流,读取最后的TxId

FSEditLog editLog = dir.fsImage.getEditLog();

if (!editLog.isOpenForWrite()) {

// During startup, we’re already open for write during initialization.

editLog.initJournalsForWrite();

// May need to recover

editLog.recoverUnclosedStreams();

LOG.info(“Catching up to latest edits from old active before ” +

“taking over writer role in edits logs.”);

editLogTailer.catchupDuringFailover();

LOG.info(“Reprocessing replication and invalidation queues…”);

blockManager.getDatanodeManager().markAllDatanodesStale();

blockManager.clearQueues();

blockManager.processAllPendingDNMessages();

blockManager.processMisReplicatedBlocks();

if (LOG.isDebugEnabled()) {

LOG.debug(“NameNode metadata after re-processing ” +

“replication and invalidation queues during failover:n” +

metaSaveAsString());

}

long nextTxId = dir.fsImage.getLastAppliedTxId() + 1;

LOG.info(“Will take over writing edit logs at txnid ” +

nextTxId);

editLog.setNextTxId(nextTxId);

dir.fsImage.editLog.openForWrite();

}

if (haEnabled) {

// Renew all of the leases before becoming active.

// This is because, while we were in standby mode,

// the leases weren’t getting renewed on this NN.

// Give them all a fresh start here.

leaseManager.renewAllLeases();

}

leaseManager.startMonitor();

startSecretManagerIfNecessary();

} finally {

writeUnlock();

}

}
(4)NameNode对RPC操作的处理流程代码
到此为止NameNode的状态切换就完成了。我们前面提到过Active模式的NameNode会处理所有的RPC请求,而Standby模式的NameNode只会处理一部分请求(Standby模式的NameNode元数据更新是通过执行共享存储中存放的EditLog来实现的)。那么NameNode在执行RPC时是怎么区分的呢?前面已经提到过了,RPC到达NameNode后,都是调用NameNodeRpcServer.java类里相应的函数来执行的,而这些操作又会调用FSNamesystem.java类里对应的方法,然后在这些方法里检查当前处于的模式,时候继续执行下去还是抛出异常。
以一个典型的文件系统操作命令create为例说明:
NameNodeRpcServer.create()函数调用FSNamesystem.startFile()函数,在这个函数里调用checkOperation(OperationCategory.WRITE)函数检查。如果当前NameNode处于Active模式,这个函数返回true;如果处于Standby模式,这个函数抛出StandbyException,结束执行,然后通过RPC返回给client一个StandbyException。Client就知道刚才的那个RPC发错了,然后向另外一个NameNode发送。
在client发给NameNode的RPC操作中,读命令是两种模式的NameNode都可以执行的,写命令只有Active NameNode才能执行。后面HDFS也考虑向MySQL那样“主可读写,从可读”的主从HA机制。
说完client-NameNode的相关RPC协议之后,再来说说DataNode-NameNode的RPC相关协议。由于Active和Standby NameNode都保存有Block locations map信息,所以DataNode向两个NameNode发送的RPC都会被处理,不会有StandbyException。这样能够保证Standby的NameNode是热备,一旦切换直接可用。
至此NameNode端的HA机制相关代码告一段落。






运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-20258-1-1.html 上篇帖子: Hadoop 2.0 NameNode HA和Federation实践 下篇帖子: HDFS原理 架构和副本机制
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表