|
1、心跳机制
心跳的机制大概是这样的:
1) master启动的时候,会开一个ipc server在那里。
2) slave启动时,会连接master,并每隔3秒钟主动向master发送一个“心跳”,将自己的状态信息告诉master,然后master也是通过这个心跳的返回值,向slave节点传达指令。
2、找到心跳的代码
拿namenode和datanode来说,在datanode的offerService方法中,每隔3秒向namenode发送心跳
3、心跳的底层细节一:datanode怎么获得namenode对象的?
首先,DataNode类中,有一个namenode的成员变量:
Java代码
- public class DataNode extends Configured
- implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {
- ...
- public DatanodeProtocol namenode = null;
- ...
- }
public>
...
public DatanodeProtocol namenode = null;
4、心跳的底层细节二:看看Invoker类
Invoker类是org.apache.hadoop.ipc.RPC类的一个静态内部类:
Java代码
- private static class Invoker implements InvocationHandler {
private static>
在这个类中,看invoke方法
5、心跳的底层细节三:Invocation类
Invocation类是org.apache.hadoop.ipc.RPC类的一个静态内部类
没有什么业务逻辑方法,主要作用就是一个VO
6、心跳的底层细节四:client类的call方法
接下来重点看client类的call方法:
Java代码
- public Writable call(Writable param, InetSocketAddress addr,
- Class protocol, UserGroupInformation ticket)
- throws InterruptedException, IOException {
- Call call = new Call(param);
- // 将Invocation转化为Call
- Connection connection = getConnection(addr, protocol, ticket, call);
- // 连接远程服务器
- connection.sendParam(call); // send the parameter
- // 将“序列化”后的call发给过去
- boolean interrupted = false;
- synchronized (call) {
- while (!call.done) {
- try {
- call.wait(); // wait for the result
- // 等待调用结果
- } catch (InterruptedException ie) {
- // save the fact that we were interrupted
- interrupted = true;
- }
- }
- if (interrupted) {
- // set the interrupt flag now that we are done waiting
- Thread.currentThread().interrupt();
- }
- if (call.error != null) {
- if (call.error instanceof RemoteException) {
- call.error.fillInStackTrace();
- throw call.error;
- } else { // local exception
- throw wrapException(addr, call.error);
- }
- } else {
- return call.value;
- // 返回
- }
- }
- }
public Writable call(Writable param, InetSocketAddress addr,
Class protocol, UserGroupInformation ticket)
throws InterruptedException, IOException {
Call call = new Call(param);
// 将Invocation转化为Call
Connection connection = getConnection(addr, protocol, ticket, call);
// 连接远程服务器
connection.sendParam(call); // send the parameter
// 将“序列化”后的call发给过去
boolean interrupted = false;
synchronized (call) {
while (!call.done) {
try {
call.wait(); // wait for the result
// 等待调用结果
} catch (InterruptedException ie) {
// save the fact that we were interrupted
interrupted = true;
}
}
if (interrupted) {
// set the interrupt flag now that we are done waiting
Thread.currentThread().interrupt();
}
if (call.error != null) {
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
throw wrapException(addr, call.error);
}
} else {
return call.value;
// 返回
}
}
}
7、现在,一目了然了
Java代码
- datanode向namenode发送heartbeat过程是这样的:
- a) 在datanode初始化获得namenode的proxy
- b) 在datanode上,调用namenode proxy的heartbeat方法:
- namenode.sendHeartbeat(dnRegistration,
- data.getCapacity(),
- data.getDfsUsed(),
- data.getRemaining(),
- xmitsInProgress.get(),
- getXceiverCount());
- c) 在datanode上的namenode动态代理类将这个调用包装成(或者叫“序列化成”)一个Invocation对象,并调用client.call方法
- d) client call方法将Invocation转化为Call对象
- e) client 将call发送到真正的namenode服务器
- f) namenode接收后,转化成namenode端的Call,并process后,通过Responder发回来!
- g) datanode接收结果,并将结果转化为DatanodeCommand[]
|
|
|