|
一.Server类图
二.详细描述
- Call:server端的Call对象,对应着client的一个Call对象,两者id相同。同client Call一样,server Call封装了每次方法调用的参数信息和调用结果。
//server端的Call对象,对应着client的一个Call对象,两者id相同
private static class Call {
private int id; // cleint Call的id
private Writable param; // client Call传过来的参数,实际上就是client Call的param,实际上也是RPC.Invocation
private Connection connection; //server到client的连接
private ByteBuffer response; //server Call的结果,类似client Call的value
public Call(int id, Writable param, Connection connection) {
this.id = id;
this.param = param;
this.connection = connection;
this.response = null;
}
public void setResponse(ByteBuffer response) {
this.response = response;
}
}
- Connection:client向server发送消息时,server端nio接受后会创建一个到client的连接,用来向client发送消息。
public class Connection {
private boolean rpcHeaderRead = false; // if initial rpc header is read
private boolean headerRead = false; //if the connection header that
private SocketChannel channel;
private ByteBuffer data;
private LinkedList<Call> responseQueue;//需要发往client端的Call
private Socket socket;
//初始化server Connction
//channel client的SocketChannel
public Connection(SelectionKey key, SocketChannel channel,
this.channel = channel;
this.socket = channel.socket();
this.responseQueue = new LinkedList<Call>();
}
- Listener:监听client发送过来的消息,分发给Reader线程处理,最终目的是把消息封装成Server call对象,放入callQueue。
//监听线程,NIO Reactor模式,读取client发送过来的消息
private class Listener extends Thread {
private ServerSocketChannel acceptChannel = null; //the accept channel
private Selector selector = null; //the selector that we use for the server
private Reader[] readers = null;
private InetSocketAddress address; //the address we bind at
private ExecutorService readPool;
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// bindserver socket到本机ip+指定port 50020
bind(acceptChannel.socket(), address, backlogLength);
// create a selector;
selector= Selector.open();
//线程池执行所有消息
readers = new Reader[readThreads];
readPool = Executors.newFixedThreadPool(readThreads);
for (int i = 0; i < readThreads; i++) {
Selector readSelector = Selector.open();
Reader reader = new Reader(readSelector);
readers = reader;
readPool.execute(reader);
}
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
}
//3.1.创建Reader线程读取消息,最终目的把消息封装成Server call对象,放入callQueue
private class Reader implements Runnable {
private Selector readSelector = null;
Reader(Selector readSelector) {
this.readSelector = readSelector;
}
public void run() {
nio方式处理消息{
doRead(key);
}
}
}
- Handler:从callQueue里面取Call,调用call方法(实际调用RPC.Server的call方法,真正执行远程命令的地方),远程命令执行完后,决定是否立刻向client发送执行命令的结果(Server call的response)。
//4.从callQueue里面取Call,调用call方法(实际调用RPC.Server的call方法)
private class Handler extends Thread {
@Override
public void run() {
while (running) {
//4.1 从callQueue里面取Call,调用call方法
final Call call = callQueue.take(); // pop the queue; maybe blocked here
CurCall.set(call);
value = call(call.connection.protocol, call.param, call.timestamp);
CurCall.set(null);
//4.2 为Server call赋值
setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR, value, errorClass, error);
//4.3处理赋值完后的call
responder.doRespond(call);
}
}
}
- Responder:异步向client发送Server call的结果。
//5.异步发送Server call的response到client端
private class Responder extends Thread {
private Selector writeSelector;
Responder() throws IOException {
writeSelector = Selector.open(); // create a selector
}
@Override
public void run() {
while (running) {
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
//5.异步写
if (key.isValid() && key.isWritable()) {
doAsyncWrite(key);
}
}
}
}
//4.3将赋值完的server call放入responder队列里
void doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);
//如果真有一个call,同步发送
if (call.connection.responseQueue.size() == 1) {
processResponse(call.connection.responseQueue, true);
}
//如果多个call,交给Responder线程异步发送
}
}
//5异步发送
private void doAsyncWrite(SelectionKey key) throws IOException {
Call call = (Call)key.attachment();
processResponse(call.connection.responseQueue, false);
}
//向client写消息
private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler) throws IOException {
Call call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
channelWrite(channel, call.response);
}
}
- Server:IPC Server。
public abstract class Server {
private BlockingQueue<Call> callQueue; // queued calls
private Listener listener = null;
private Responder responder = null;
private int numConnections = 0;
private Handler[] handlers = null;
//1.初始化一个IPC server,指定RPC服务器地址和端口
protected Server(String bindAddress, int port, Invocation.class, ...) throws IOException {
this.bindAddress = bindAddress;
this.port = port;
this.paramClass = paramClass;
//创建listener线程
listener = new Listener();
this.port = listener.getAddress().getPort();
//创建responder线程
responder = new Responder();
}
//2.启动IPC server
public synchronized void start() {
//启动istener线程
responder.start();
//启动responder线程
listener.start();
//创建Handler线程,启动Handler线程
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++) {
handlers = new Handler(i);
handlers.start();
}
}
//3.client第一次发送消息到server时触发doAccept
public void run() {
nio方式接受消息{
doAccept(key);
}
}
//3.doAccept
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c = null;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
//3.1.创建Reader线程读取消息
Reader reader = getReader();
reader.startAdd();
SelectionKey readKey = reader.registerChannel(channel);
//3.2得根据client的SocketChannel创建一个server Connection,传递给Handler线程
c = new Connection(readKey, channel, System.currentTimeMillis());
readKey.attach(c);
}
}
//3.1Reader线程读取消息
void doRead(SelectionKey key) throws InterruptedException {
Connection c = (Connection)key.attachment();
c.readAndProcess();
}
}
//3.1读取channel
public int readAndProcess() throws IOException, InterruptedException {
int count = channelRead(channel, rpcHeaderBuffer);
count = channelRead(channel, data);
processOneRpc(data.array());
return count;
}
private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
processData(buf);
}
//3.1根据client channel内容(方法参数)创建一个server Call,放入callQueue
private void processData(byte[] buf) throws IOException, InterruptedException {
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf));
int id = dis.readInt(); // try to read an id
Writable param = ReflectionUtils.newInstance(paramClass, conf);//read param
param.readFields(dis);
Call call = new Call(id, param, this);
callQueue.put(call); // queue the call; maybe blocked here
}
}
//4.1
public abstract Writable call(Class<?> protocol, Writable param, long receiveTime , throws IOException;
//4.2 为server call赋值(response)
private void setupResponse(ByteArrayOutputStream response, Call call, Status status,
Writable rv, String errorClass, String error) throws IOException {
call.setResponse(ByteBuffer.wrap(response.toByteArray()));
}
}
三.关于Socket和ServerSocket的bind方法
- Java网络编程从入门到精通(25):创建ServerSocket对象 http://edu.xvna.com/html/68156_3.html
- 简单分析一下socket中的bind http://www.cnblogs.com/nightwatcher/archive/2011/07/03/2096717.html
|
|