设为首页 收藏本站
查看: 667|回复: 0

[经验分享] 第六章:小朱笔记hadoop之源码分析-ipc分析 第二节:Client类分析

[复制链接]

尚未签到

发表于 2016-12-13 10:57:21 | 显示全部楼层 |阅读模式
第六章:小朱笔记hadoop之源码分析-ipc分析

第二节:Client类分析

 Client端的底层通信直接采用了阻塞式IO编程:
 

/**
* A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on a
* port and is defined by a parameter class and a value class.
* Client端的底层通信直接采用了阻塞式IO编程
*
* @see Server
*/
public class Client {
public static final Log LOG = LogFactory.getLog(Client.class);
// // 客户端维护到服务端的一组连接
private Hashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId, Connection>();
private Class<? extends Writable> valueClass; // class of call values
private int counter; // counter for call ids
// 客户端进程是否在运行
private AtomicBoolean running = new AtomicBoolean(true); // if client runs
final private Configuration conf;
// // Socket工厂,用来创建Socket连接
private SocketFactory socketFactory; // how to create sockets
private int refCount = 1;
// // 通过配置文件读取ping间隔
final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
// 默认ping间隔为1分钟
final static int DEFAULT_PING_INTERVAL = 60000; // 1 min
final static int PING_CALL_ID = -1;
.......
}
(1)client运行的流程;
(a)创建代理对象;
(b)代理对象调用相应方法(invoke());
(c)invoke调用client对象的call方法,向服务器发送请求(参数、方法);
(d)再等待call方法的完成;
(c)返回请求结果;
 
(2)client主要的内部类
(a) Call,表示一次rpc的调用请求
(b)Connection,表示一个client与server之间的连接,一个连接一个线程启动。该类是一个连接管理内部线程类,该内部类是一个连接线程,继承自Thread类。它读取每一个Call调用实例执行后从服务端返回的响应信息,并通知其他调用实例.每一个连接具有一个连接到远程主机的Socket,该Socket能够实现多路复用,使得多个调用复用该Socket,客户端收到的调用得到的响应可能是无序的
(c)ConnectionId:连接的标记(包括server地址,协议,其他一些连接的配置项信息)
(d)ParallelCall:实现并行调用的请求
(e)ParallelResults:并行调用的执行结果
 
(3)client一次完整请求调用过程
demo示例:
 

/**
*
* Description: RPCserver test<br>
*
* Copyright: Copyright (c) 2013 <br>
* Company: www.renren.com
*
* @author zhuhui{hui.zhu@renren-inc.com} 2013-5-17
* @version 1.0
*/
public class RPCserver {
/**
* @param args
*/
public static void main(String[] args) {
Server server;
try {
server = RPC.getServer(new HelloProtocalImp(), "127.0.0.1", 9813, 6, true, new Configuration());
server.start();
try {
server.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
 
 
第一步: 创建代理对象

写道

HelloProtocal rpcInterface=(HelloProtocal)RPC.getProxy(HelloProtocal.class, HelloProtocal.versionID, nameNodeAddr, new Configuration());
(1)getproxy()方法
VersionedProtocol proxy =(VersionedProtocol) Proxy.newProxyInstance(protocol.getClassLoader(), new Class[] { protocol },new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
(2)Invoker(protocol, addr, ticket, conf, factory, rpcTimeout)方法
Invoker是一个实现了InvocationHandler 接口的类
this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,ticket, rpcTimeout, conf);
this.client = CLIENTS.getClient(conf, factory); CLIENTS是客户端与服务端链接缓存




 
第二步:调用业务方法

private static class Invoker implements InvocationHandler {
private Client.ConnectionId remoteId;
private Client client;
private boolean isClosed = false;
public Invoker(Class<? extends VersionedProtocol> protocol,
InetSocketAddress address, UserGroupInformation ticket,
Configuration conf, SocketFactory factory,
int rpcTimeout) throws IOException {
this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
ticket, rpcTimeout, conf);
this.client = CLIENTS.getClient(conf, factory);
}
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
final boolean logDebug = LOG.isDebugEnabled();
long startTime = 0;
if (logDebug) {
startTime = System.currentTimeMillis();
}
// 构造一个RPC.Invocation实例作为参数传递给调用程序,执行调用,返回值为value  
ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId);
if (logDebug) {
long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " " + callTime);
}
return value.get();
}
/* close the IPC client that's responsible for this invoker's RPCs */
synchronized private void close() {
if (!isClosed) {
isClosed = true;
CLIENTS.stopClient(client);
}
}

写道

ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId);

 
       Invocation 用于封装方法名和参数,作为数据传输层。远程调用的主要关键就是Invocation实现了Writable接口,Invocation在 write(DataOutput out)函数中将调用的methodName写入到out,将调用方法的参数个数写入out ,同时逐个将参数的className写入out,最后将所有参数逐个写入out,这也就决定了通过RPC实现调用的方法中的参数要么是简单类型,要么是 String,要么是实现了Writable接口的类(参数自己知道如何序列化到stream),要么是数组(数组的元素也必须为简单类 型,String,实现了Writable接口的类)。
 
 
第三步:client对象的call方法,向服务器发送请求(参数、方法);
      客户端Client类提供的最基本的功能就是执行RPC调用,其中,提供了两种调用方式,一种就是串行单个调用,另一种就是并行调用。
这里只分析串行单个调用的实现方法call,如下所示:

    /**
* Make a call, passing <code>param</code>, to the IPC server defined by
* <code>remoteId</code>, returning the value. Throws exceptions if there
* are network problems or if the remote code threw an exception.
*/
public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException {
// // 使用请求参数值构造一个Call实例
Call call = new Call(param);
// // 从连接池connections中获取到一个连接(或可能创建一个新的连接)
Connection connection = getConnection(remoteId, call);
connection.sendParam(call); // send the parameter // 向IPC服务器发送参数
boolean interrupted = false;
synchronized (call) {
while (!call.done) {
try {
call.wait(); // wait for the result // 等待IPC服务器响应
// 等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程
} catch (InterruptedException ie) {
// save the fact that we were interrupted
interrupted = true;
}
}
if (interrupted) {
// set the interrupt flag now that we are done waiting
Thread.currentThread().interrupt();
}
if (call.error != null) {
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
throw call.error;
} else { // local exception
// use the connection because it will reflect an ip change,
// unlike
// the remoteId
throw wrapException(connection.getRemoteAddress(), call.error);
}
} else {
return call.value;// 调用返回的响应值
}
}
}
 (1)获得连接对象getConnection(remoteId, call);

private Connection getConnection(ConnectionId remoteId, Call call) throws IOException, InterruptedException {
if (!running.get()) {
// the client is stopped
throw new IOException("The client is stopped");
}
Connection connection;
/*
* we could avoid this allocation for each RPC by having a connectionsId
* object and with set() method. We need to manage the refs for keys in
* HashMap properly. For now its ok.
*/
do {
synchronized (connections) {
connection = connections.get(remoteId);
if (connection == null) {
connection = new Connection(remoteId);
connections.put(remoteId, connection);
}
}
} while (!connection.addCall(call));// 可见 一个connection 可以有多个调用call
connection.setupIOstreams();// 实际进行连接 每个connection都新起一个线程
// we don't invoke the method below inside "synchronized (connections)"
// block above. The reason for that is if the server happens to be slow,
// it will take longer to establish a connection and that will slow the
// entire system down.
return connection;
}
 (2)实际进行连接并启动接收线程 connection.setupIOstreams();

private synchronized void setupIOstreams() throws InterruptedException {
short numRetries = 0;
final short maxRetries = 15;
Random rand = null;
while (true) {
setupConnection(); //建立连接  
InputStream inStream = NetUtils.getInputStream(socket); //获得输入流  
OutputStream outStream = NetUtils.getOutputStream(socket); //获得输出流  
writeRpcHeader(outStream);
if (useSasl) {
final InputStream in2 = inStream;
final OutputStream out2 = outStream;
UserGroupInformation ticket = remoteId.getTicket();
if (authMethod == AuthMethod.KERBEROS) {
if (ticket.getRealUser() != null) {
ticket = ticket.getRealUser();
}
}
boolean continueSasl = false;
try {
continueSasl =
ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws IOException {
return setupSaslConnection(in2, out2);
}
});
} catch (Exception ex) {
if (rand == null) {
rand = new Random();
}
handleSaslConnectionFailure(numRetries++, maxRetries, ex, rand,
ticket);
continue;
}
if (continueSasl) {
// Sasl connect is successful. Let's set up Sasl i/o streams.
inStream = saslRpcClient.getInputStream(inStream);
outStream = saslRpcClient.getOutputStream(outStream);
} else {
// fall back to simple auth because server told us so.
authMethod = AuthMethod.SIMPLE;
header = new ConnectionHeader(header.getProtocol(),
header.getUgi(), authMethod);
useSasl = false;
}
}
this.in = new DataInputStream(new BufferedInputStream
(new PingInputStream(inStream)));//将输入流装饰成DataInputStream
this.out = new DataOutputStream//将输出流装饰成DataOutputStream  
(new BufferedOutputStream(outStream));
writeHeader();
// update last activity time
touch();
//当连接建立时,启动接受线程等待服务端传回数据,注意:Connection继承了Tread  
// start the receiver thread after the socket connection has been set up
start();
return;
}
} catch (Throwable t) {
if (t instanceof IOException) {
markClosed((IOException)t);
} else {
markClosed(new IOException("Couldn't set up IO streams", t));
}
close();
}
}
//注意:setupConnection(); //建立连接  
// start();  Connection继承了Tread 启动接受线程等待服务端传回数据
public void run() {
while (waitForWork()) {//wait here for work - read or close connection  
//等待某个连接实例空闲,如果存在则唤醒它执行一些任务  
receiveResponse();
}
close();
}
 
(3)最终建立链接 setupConnection()
 
  

  private synchronized void setupConnection() throws IOException {
short ioFailures = 0;
short timeoutFailures = 0;
while (true) {
try {
this.socket = socketFactory.createSocket();
this.socket.setTcpNoDelay(tcpNoDelay);
/*
* Bind the socket to the host specified in the principal
* name of the client, to ensure Server matching address of
* the client connection to host name in principal passed.
*/
if (UserGroupInformation.isSecurityEnabled()) {
KerberosInfo krbInfo = remoteId.getProtocol().getAnnotation(KerberosInfo.class);
if (krbInfo != null && krbInfo.clientPrincipal() != null) {
String host = SecurityUtil.getHostFromPrincipal(remoteId.getTicket().getUserName());
// If host name is a valid local address then bind
// socket to it
InetAddress localAddr = NetUtils.getLocalInetAddress(host);
if (localAddr != null) {
this.socket.bind(new InetSocketAddress(localAddr, 0));
}
}
}
// 设置连接超时为20s
// connection time out is 20s
NetUtils.connect(this.socket, server, 20000);
if (rpcTimeout > 0) {
pingInterval = rpcTimeout; // rpcTimeout overwrites
// pingInterval
}
this.socket.setSoTimeout(pingInterval);
return;
} catch (SocketTimeoutException toe) {
/*
* Check for an address change and update the local
* reference. Reset the failure counter if the address was
* changed
*/
/*
* 设置最多连接重试为45次。 总共有20s*45 = 15 分钟的重试时间。
*/
if (updateAddress()) {
timeoutFailures = ioFailures = 0;
}
/*
* The max number of retries is 45, which amounts to 20s*45
* = 15 minutes retries.
*/
handleConnectionFailure(timeoutFailures++, 45, toe);
} catch (IOException ie) {
if (updateAddress()) {
timeoutFailures = ioFailures = 0;
}
handleConnectionFailure(ioFailures++, maxRetries, ie);
}
}
}

 (4)发送调用参数

  public void sendParam(Call call) {
if (shouldCloseConnection.get()) {
return;
}
DataOutputBuffer d = null;
try {
synchronized (this.out) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
// for serializing the
// data to be written
d = new DataOutputBuffer();
d.writeInt(call.id);
call.param.write(d);
byte[] data = d.getData();
int dataLength = d.getLength();
out.writeInt(dataLength); // 首先写出数据的长度
out.write(data, 0, dataLength); // 向服务端写数据
out.flush();
}
} catch (IOException e) {
markClosed(e);
} finally {
// the buffer is just an in-memory buffer, but it is still
// polite to
// close early
IOUtils.closeStream(d);
}
}
 
第四步 receiveResponse接收服务器返回数据

         /*
* Receive a response. Because only one receiver, so no synchronization
* on in.
* 接收到响应(因为每次从DataInputStream in中读取响应信息只有一个,无需同步)
*/
private void receiveResponse() {
if (shouldCloseConnection.get()) {
return;
}
touch();
try {
int id = in.readInt(); // try to read an id // 阻塞读取id
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + id);
Call call = calls.get(id); // 在calls池中找到发送时的那个对象
int state = in.readInt(); // read call status // 阻塞读取call对象的状态
if (state == Status.SUCCESS.state) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
// 读取数据
// 将读取到的值赋给call对象,同时唤醒Client等待线程
call.setValue(value);
calls.remove(id);
} else if (state == Status.ERROR.state) {
call.setException(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in)));
calls.remove(id);
} else if (state == Status.FATAL.state) {
// Close the connection
markClosed(new RemoteException(WritableUtils.readString(in), WritableUtils.readString(in)));
}
} catch (IOException e) {
markClosed(e);
}
}
 
      Hadoop的RPC对外的接口其实是同步的,但是,RPC的内部实现其实是异步消息机制。hadoop用线程wait/notify机制 实现异步转同步,发送请求(call)之后wait请求处理完毕,接收完响应(connection.receiveResponse())之后 notify,notify()方法在call.setValue中。但现在有一个问题,一个connection有多个call。可能同时有多个 call在等待接收消息,那么是当client接收到response后,怎样确认它到底是之前哪个request的response呢?这个就是依靠的 connection中的一个HashTable<Integer, Call>了,其中的Integer是用来标识Call,这样就可以将request和response对应上了。
 
 
时序图:
 
 
DSC0000.png
 
 
 

DSC0001.png
 
 

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-313698-1-1.html 上篇帖子: 重启机器,namenode不能启动 下篇帖子: 在VMWare Workstation上使用RedHat Linux安装和配置Hadoop群集环境03_配置虚拟机之间SSH无密码登录
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表