设为首页 收藏本站
查看: 912|回复: 0

[经验分享] hadoop的心跳回忆

[复制链接]

尚未签到

发表于 2015-7-12 08:56:22 | 显示全部楼层 |阅读模式
  hadoop的集群是基于master/slave模式,namenode和jobtracker属于master,而datanode/tasktracker属于slaves。master只有一个,而slaves有多个。
namenode与datanode之间的通信,jobtracker与tasktracker直接的通信,都是通过“心跳”完成的。
以前看过hadoop心跳原理的源代码,今天再回忆一下,呵呵,所以叫“心跳回忆”。

1、心跳机制
心跳的机制大概是这样的:
1) master启动的时候,会开一个ipc server在那里。
2) slave启动时,会连接master,并每隔3秒钟主动向master发送一个“心跳”,将自己的状态信息告诉master,然后master也是通过这个心跳的返回值,向slave节点传达指令。

2、找到心跳的代码
拿namenode和datanode来说,在datanode的offerService方法中,每隔3秒向namenode发送心跳的代码:










Java代码  
/**
* Main loop for the DataNode.  Runs until shutdown,
* forever calling remote NameNode functions.
*/  
public void offerService() throws Exception {  
...  
//  
// Now loop for a long time....  
//  
while (shouldRun) {  
try {  
long startTime = now();  
//  
// Every so often, send heartbeat or block-report  
//  
// 如果到了3秒钟,就向namenode发心跳  
if (startTime - lastHeartbeat > heartBeatInterval) {  
//  
// All heartbeat messages include following info:  
// -- Datanode name  
// -- data transfer port  
// -- Total capacity  
// -- Bytes remaining  
//  
lastHeartbeat = startTime;  
DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,  
data.getCapacity(),  
data.getDfsUsed(),  
data.getRemaining(),  
xmitsInProgress.get(),  
getXceiverCount());  
// 注意上面这行代码,“发送心跳”竟然就是调用namenode的一个方法??  
  
myMetrics.heartbeats.inc(now() - startTime);  
//LOG.info("Just sent heartbeat, with name " + localName);  
// 处理对心跳的返回值(namenode传给datanode的指令)  
if (!processCommand(cmds))  
continue;  
}  
// 这里省略很多代码  
...  
} // while (shouldRun)  
} // offerService  
  
  
上面这段代码,如果是单机的程序,没什么值得奇怪的。但是,这是hadoop集群!datanode和namenode在2台不同的机器(或2个JVM)上运行!datanode机器竟然直接调用namenode的方法!这是怎么实现的?难道是传说中的RMI吗??
下面我们主要就来分析这个方法调用的细节。

3、心跳的底层细节一:datanode怎么获得namenode对象的?
首先,DataNode类中,有一个namenode的成员变量:






Java代码 DSC0000.png




  • public class DataNode extends Configured   
    implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {  
    ...  
    public DatanodeProtocol namenode = null;  
    ...   
    }  
      

  
下面是NameNode类的定义:






Java代码




  • public class NameNode implements ClientProtocol, DatanodeProtocol,  
    NamenodeProtocol, FSConstants,  
    RefreshAuthorizationPolicyProtocol {  
    ...   
    }  
      

  
注意:NameNode实现了DatanodeProtocol接口,DatanodeProtocol接口定义了namenode和datanode之间通信的方法。
那么,DataNode类是怎么获取到NameNode类的引用呢?
在Datanode端,为namenode变量赋值的代码:









Java代码  
// connect to name node  
this.namenode = (DatanodeProtocol)   
RPC.waitForProxy(DatanodeProtocol.class,  
DatanodeProtocol.versionID,  
nameNodeAddr,   
conf);  
  
  

在继续去RPC类中追踪:









Java代码  
VersionedProtocol proxy =  
(VersionedProtocol) Proxy.newProxyInstance(  
protocol.getClassLoader(), new Class[] { protocol },  
new Invoker(addr, ticket, conf, factory));  
  
  
现在,明白了!
1) 对namenode的赋值,并不是真正的new了一个实现了DatanodeProtocol接口的对象,而是获得了一个动态代理!!
2) 上面这段代码中,protocol的类型是DatanodeProtocol.class
3) 对namenode的所有调用,都被委托(delegate)给了Invoker

4、心跳的底层细节二:看看Invoker类
Invoker类是org.apache.hadoop.ipc.RPC类的一个静态内部类:










Java代码  
private static class Invoker implements InvocationHandler {  
  
  
在这个类中,看invoke方法:
  









Java代码  
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {  
...  
ObjectWritable value = (ObjectWritable)  
client.call(new Invocation(method, args), address,   
method.getDeclaringClass(), ticket);  
...  
return value.get();  
}  
  
  
所有的方法调用又被delegate给client的call方法了!
client是Invoker中的成员变量:
  









Java代码  
private Client client;  
  
  
所以可以看出:DatanodeProtocol中的每个方法调用,都被包装成一个Invocation对象,再由client.call()调用

5、心跳的底层细节三:Invocation类
Invocation类是org.apache.hadoop.ipc.RPC类的一个静态内部类
没有什么业务逻辑方法,主要作用就是一个VO

6、心跳的底层细节四:client类的call方法
接下来重点看client类的call方法:









Java代码  
public Writable call(Writable param, InetSocketAddress addr,   
Class protocol, UserGroupInformation ticket)   
throws InterruptedException, IOException {  
Call call = new Call(param);     
// 将Invocation转化为Call  
Connection connection = getConnection(addr, protocol, ticket, call);  
// 连接远程服务器  
connection.sendParam(call);                 // send the parameter  
// 将“序列化”后的call发给过去  
boolean interrupted = false;  
synchronized (call) {  
while (!call.done) {  
try {  
call.wait();                           // wait for the result  
// 等待调用结果  
} catch (InterruptedException ie) {  
// save the fact that we were interrupted  
interrupted = true;  
}  
}  
if (interrupted) {  
// set the interrupt flag now that we are done waiting  
      Thread.currentThread().interrupt();  
}  
if (call.error != null) {  
if (call.error instanceof RemoteException) {  
call.error.fillInStackTrace();  
throw call.error;  
} else { // local exception  
throw wrapException(addr, call.error);  
}  
} else {  
return call.value;  
// 返回  
    }  
}  
}  
  
  
7、现在,一目了然了










Java代码  
datanode向namenode发送heartbeat过程是这样的:  
a) 在datanode初始化获得namenode的proxy  
b) 在datanode上,调用namenode proxy的heartbeat方法:  
namenode.sendHeartbeat(dnRegistration,  
data.getCapacity(),  
data.getDfsUsed(),  
data.getRemaining(),  
xmitsInProgress.get(),  
getXceiverCount());  
c) 在datanode上的namenode动态代理类将这个调用包装成(或者叫“序列化成”)一个Invocation对象,并调用client.call方法  
d) client call方法将Invocation转化为Call对象  
e) client 将call发送到真正的namenode服务器  
f) namenode接收后,转化成namenode端的Call,并process后,通过Responder发回来!  
g) datanode接收结果,并将结果转化为DatanodeCommand[]  

  
  
8、再看动态代理
动态代理:让“只有接口,没事对应的实现类”成为可能,因为具体方法的实现可以委托给另一个类!!
在这个例子中,就datanode而言,DatanodeProtocol接口是没有实现类的!

*** THE END ***
  原文地址:http://thinkinginhadoop.iteye.com/blog/709993

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-85656-1-1.html 上篇帖子: 吐槽下《Hadoop权威指南(第二版)》的翻译 下篇帖子: hadoop streaming多路输出方法和注意点(附超大数据diff对比源码)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表