ConnectionId:This class holds the address and the user ticket. The client connections to servers are uniquely identified by <remoteAddress, protocol, ticket>。一个connection由一个ConnectionId唯一标识;所以要重写ConnectionId的equals和hashcode方法。
ConnectionHeader:The IPC connection header sent by the client to the server on connection establishment.
Connection:继承Thread。代表client到server的一个连接。我在文中将Connection对象称为“连接”。
/** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
private class Connection extends Thread {
//一个连接的基本信息
private InetSocketAddress server; // server ip:port,注意是服务端
private ConnectionHeader header; // connection header
private final ConnectionId remoteId; // connection id
private Socket socket = null; // connected socket
private DataInputStream in;
private DataOutputStream out;
//所有的调用
private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
//1.构造函数只初始化一些基本信息
public Connection(ConnectionId remoteId) throws IOException {
this.remoteId = remoteId;
this.server = remoteId.getAddress();
header = new ConnectionHeader(protocol == null ? null : protocol.getName(), ticket, authMethod);
this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
remoteId.getAddress().toString() +" from " + ((ticket==null)?"an unknown user":ticket.getUserName()));
this.setDaemon(true);
}
//2.与IPC server建立socket连接
private synchronized void setupConnection() throws IOException {
while (true) {
try {
this.socket = socketFactory.createSocket();
}
}
/** Connect to the server and set up the I/O streams. It then sends
* a header to the server and starts
* the connection thread that waits for responses.
*/
private synchronized void setupIOstreams() throws InterruptedException {
try {
while (true) {
//2.与IPC server建立socket连接
setupConnection();
//3.创建流
InputStream inStream = NetUtils.getInputStream(socket);
OutputStream outStream = NetUtils.getOutputStream(socket);
//4.发送RPC报头
writeRpcHeader(outStream);
this.in = new DataInputStream(new BufferedInputStream (new PingInputStream(inStream)));
this.out = new DataOutputStream(new BufferedOutputStream(outStream));
//5.发送connection header到server
writeHeader();
//6.启动自己(线程),接受response
start();
return;
}
} catch (Throwable t) {
if (t instanceof IOException) {
markClosed((IOException)t);
} else {
markClosed(new IOException("Couldn't set up IO streams", t));
}
close();
}
}
//4.发送RPC报头
private void writeRpcHeader(OutputStream outStream) throws IOException {
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
//public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
//public static final byte CURRENT_VERSION = 4;
out.write(Server.HEADER.array());
out.write(Server.CURRENT_VERSION);
authMethod.write(out);
out.flush();
}
//5.发送connection header到server
private void writeHeader() throws IOException {
DataOutputBuffer buf = new DataOutputBuffer();
header.write(buf);
int bufLen = buf.getLength();
out.writeInt(bufLen);
out.write(buf.getData(), 0, bufLen);
}
//6.启动自己(线程),等待接受response,接受完后关闭此连接
public void run() {
while (waitForWork()) {//Return true if it is time to read a response; false otherwise.
receiveResponse();
}
//关闭此连接
close();
}
//7.发送请求:长度+内容
public void sendParam(Call call) {
DataOutputBuffer d=null;
synchronized (this.out) {
d = new DataOutputBuffer();
d.writeInt(call.id);
call.param.write(d);
byte[] data = d.getData();
int dataLength = d.getLength();
out.writeInt(dataLength); //first put the data length
out.write(data, 0, dataLength);//write the data
out.flush();
}
}
//6.接受response,把结果赋值给Call对象
private void receiveResponse() {
int id = in.readInt(); // try to read an id
Call call = calls.get(id);
int state = in.readInt(); // read call status
if (state == Status.SUCCESS.state) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // read value
call.setValue(value);
calls.remove(id);
}
}
}
ParallelCall:继承Call,还是一个Call对象,只是这些Call对象共享一个ParallelResults。
private class ParallelCall extends Call {
private ParallelResults results; //多个Call共享一个ParallelResults
private int index;
public ParallelCall(Writable param, ParallelResults results, int index) {
super(param);
this.results = results;
this.index = index;
}
/** Deliver result to result collector. */
protected void callComplete() {
results.callComplete(this);
}
}
ParallelResults:一组Call对象的返回结果。
private static class ParallelResults {
private Writable[] values;
private int size; //一共有多少个Call要返回
private int count; //实际已经返回几个
public ParallelResults(int size) {
this.values = new Writable[size];
this.size = size;
}
/** Collect a result. */
public synchronized void callComplete(ParallelCall call) {
values[call.index] = call.value; // store the value
count++; // count it
if (count == size) // if all values are in
notify(); // then notify waiting caller
}
}
Client:IPC client端。调用client的call方法,传入Writable作为参数,返回一个Writable作为结果。
/** A client for an IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on
* a port and is defined by a parameter class and a value class.
*
* @see Server
*/
public class Client {
//缓存client到server的所有连接
private Hashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId, Connection>();
private Class<? extends Writable> valueClass; //Call对象value的类型
private int counter; //创建一个Call的时候,用counter++作为Call的id
final private Configuration conf;
private SocketFactory socketFactory; //服务器端的ip+port创建的socketFactory
//1.初始化Client
public Client(Class<? extends Writable> valueClass, Configuration conf,
SocketFactory factory) {
this.valueClass = valueClass;
this.conf = conf;
this.socketFactory = factory; //初始化client时传入
}
//2.用client发送一个请求
public Writable call(Writable param, ConnectionId remoteId)
throws InterruptedException, IOException {
//创建Call对象
Call call = new Call(param);
//创建Connection对象
Connection connection = getConnection(remoteId, call);
//发送请求,参考Connection类代码7
connection.sendParam(call);
...
return call.value;
}
//2.用client一次发送多个请求
public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
Class<?> protocol, UserGroupInformation ticket, Configuration conf)
throws IOException, InterruptedException {
//创建一个结果集
ParallelResults results = new ParallelResults(params.length);
synchronized (results) {
for (int i = 0; i < params.length; i++) {
//创建每个Call
ParallelCall call = new ParallelCall(params, results, i);
try {
//创建每个Connection对象,不同的Call存放到不同的连接上(Each parameter is sent to the corresponding address.)。
ConnectionId remoteId = ConnectionId.getConnectionId(addresses,
protocol, ticket, 0, conf);
Connection connection = getConnection(remoteId, call);
//发送请求,参考Connection类代码7
connection.sendParam(call);
} catch (IOException e) {
}
}
while (results.count != results.size) {
try {
results.wait(); // wait for all results
} catch (InterruptedException e) {}
}
//放回所有结果
return results.values;
}
}
/** Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given ConnectionId are reused. */
//获得一个连接,首先从缓存中去;取不到,创建一个,并放到缓存中。
private Connection getConnection(ConnectionId remoteId, Call call) throws IOException, InterruptedException {
Connection connection;
do {
synchronized (connections) {
connection = connections.get(remoteId);
if (connection == null) {
//参考Connection类代码1
connection = new Connection(remoteId);
connections.put(remoteId, connection);
}
}
} while (!connection.addCall(call));//往创建的连接里加入call
//参考Connection类代码23456
connection.setupIOstreams();
return connection;
}
}