|
Server端的主要负责接收client端发送的请求并处理,最后返回处理结果给客户端。
Hadoop RPC的Server端采用了NIO技术,涉及到channel,selector等概念。Server类中主要有Listener,Connect,Call,Handler,Responder等类。
1、Listener类和Reader类
private class Listener extends Thread {
private ServerSocketChannel acceptChannel = null;
private Selector selector = null;
private Reader[] readers = null;
private int currentReader = 0;
private InetSocketAddress address;
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
bind(acceptChannel.socket(), address);
selector = Selector.open();
readers = new Reader[readThreads];
for (int i = 0; i < readThreads; i++) {
System.out.println(">>>start reader" + i + "......");
Reader reader = new Reader("Socket Reader #" + (i + 1) + " for port" + port);
readers = reader;
reader.start();
}
System.out.println(">>>register listener selector on port" + port + "......");
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on:" + acceptChannel.socket().getLocalPort());
this.setDaemon(true);
}
private class Reader extends Thread {
private volatile boolean adding = false;
private final Selector readSelector;
Reader(String name) throws IOException {
super(name);
this.readSelector = Selector.open();
}
@Override
public void run() {
doRunLoop();
}
public synchronized void doRunLoop(){
while (running){
SelectionKey key = null;
try {
readSelector.select();
while(adding){
this.wait(1000);
}
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while(iter.hasNext()){
key = iter.next();
iter.remove();
if(key.isValid() && key.isReadable()){
doRead(key);
}
key = null;
}
}catch (InterruptedException e){
e.printStackTrace();
}catch (IOException e) {
e.printStackTrace();
}
}
}
public void doRead(SelectionKey key){
Connection c = (Connection)key.attachment();
if(c == null){
return;
}
int count = 0;
try {
System.out.println(">>>reader read and process " + this.toString() + "......");
count = c.readAndProcess();
} catch (IOException e) {
e.printStackTrace();
}catch (InterruptedException e){
e.printStackTrace();
}
if(count < 0) {
closeConnection(c);
c = null;
}
}
public void startAdd() {
adding = true;
readSelector.wakeup();
}
public synchronized void finishAdd() {
adding = false;
this.notify();
}
public synchronized SelectionKey registerChannel(SocketChannel channel) throws ClosedChannelException {
System.out.println(">>>register reader on channel:"+ this.toString() + "......");
return channel.register(readSelector, SelectionKey.OP_READ);
}
}
@Override
public void run() {
while (running) {
SelectionKey key = null;
try {
getSelector().select();
Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid() && key.isAcceptable()) {
doAccept(key);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
synchronized (this) {
try {
acceptChannel.close();
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
selector = null;
acceptChannel = null;
while(!connectionList.isEmpty()){
closeConnection(connectionList.remove(0));
}
}
}
void doAccept(SelectionKey key) throws IOException {
Connection c = null;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
Reader reader = getReader();
reader.startAdd();
System.out.println(">>>start add reader" + reader.toString() + "...");
SelectionKey readKey = reader.registerChannel(channel);
System.out.println(">>>create connection...");
c = new Connection(readKey, channel);
readKey.attach(c);
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
}
reader.finishAdd();
}
}
Reader getReader() {
currentReader = (currentReader + 1) % readers.length;
return readers[currentReader];
}
synchronized Selector getSelector() {
return selector;
}
}
2、Connection类
public class Connection {
private SocketChannel channel;
private ByteBuffer dataLengthBuffer;
private ByteBuffer data;
private int dataLength;
private LinkedList<Call> responseQueue;
public Connection(SelectionKey key, SocketChannel channel) {
this.channel = channel;
this.dataLengthBuffer = ByteBuffer.allocate(4);
this.data = null;
this.responseQueue = new LinkedList<Call>();
}
public int readAndProcess() throws IOException, InterruptedException {
int count = -1;
if(dataLengthBuffer.remaining() > 0){
System.out.println(">>>read the data length from the channel:" + channel.toString() + ".......");
count = channelRead(channel, dataLengthBuffer);
if(count < 0 || dataLengthBuffer.remaining() > 0){
return count;
}
}
System.out.println(">>>read the data from the channel:" + channel.toString() + ".......");
if(data == null){
dataLengthBuffer.flip();
dataLength = dataLengthBuffer.getInt();
data = ByteBuffer.allocate(dataLength);
}
count = channelRead(channel, data);
System.out.println(">>>finished reading the data from the channel and prepare to process the rpc.......");
if(data.remaining() == 0){
dataLengthBuffer.clear();
data.flip();
processOneRpc(data.array());
data = null;
}
return count;
}
private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf));
int id = 0;
Writable invocation = null;
try {
invocation = new Invocation(Client.class.getMethod("call", Writable.class, Client.ConnectionId.class), new Object[]{});
id = dis.readInt();
invocation.readFields(dis);
} catch (NoSuchMethodException e) {
e.printStackTrace();
}
System.out.println(">>> create the call according to the data: id#" + id + ":" + invocation.toString());
Call call = new Call(id, invocation, this);
callQueue.put(call);
}
public void close(){
}
}
3、Call类
public static class Call {
private final int callId; //标识调用的id,在客户端处理返回结果时用到
private final Writable rpcRequest; //封装请求
private final Connection connection; //连接中包含channel信息
private ByteBuffer rpcResponse; //返回结果
public Call(int id, Writable param, Connection connection) {
this.callId = id;
this.rpcRequest = param;
this.connection = connection;
}
public void setResponse(ByteBuffer response){
this.rpcResponse = response;
}
}
4、Handler类
private class Handler extends Thread{
public Handler(int instanceNumber){
this.setDaemon(true);
this.setName("IPC Server handler " + instanceNumber + "on port" + port);
}
@Override
public void run(){
ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
while(running){
Writable value = null;
try {
final Call call = callQueue.take();
System.out.println(">>>call the service on the server...");
value = call(call);
synchronized (call.connection.responseQueue){
System.out.println(">>>prepare to respond the call...");
setupResponse(buf, call, value);
responder.doRespond(call);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
5、Responder类
private class Responder extends Thread{
private final Selector writeSelector;
private int pending;
final static int PURGE_INTERVAL = 900000; // 15mins
Responder() throws IOException {
this.setName("IPC Server Responder");
this.setDaemon(true);
writeSelector = Selector.open();
pending = 0;
}
@Override
public void run(){
while(running){
try {
waitPending();
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
iter.remove();
if(key.isValid() && key.isWritable()){
doAsyncWrite(key);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
try {
writeSelector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
void doRespond(Call call){
synchronized (call.connection.responseQueue){
call.connection.responseQueue.addLast(call);
System.out.println(">>>only one response then directly respond the call......");
if(call.connection.responseQueue.size() == 1){
processResponse(call.connection.responseQueue, true);
}
}
}
private void doAsyncWrite(SelectionKey key) throws IOException {
Call call = (Call) key.attachment();
if(call == null){
return;
}
if(key.channel() != call.connection.channel){
throw new IOException("doAsyncWrite: bad channel");
}
synchronized (call.connection.responseQueue){
System.out.println(">>>doAsyncwrite...........");
processResponse(call.connection.responseQueue, false);
}
}
private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler){
boolean done = false;
Call call = null;
int numElements = 0;
synchronized (responseQueue){
if((numElements = responseQueue.size()) == 0){
return true;
}
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
try {
int numBytes = channelWrite(channel, call.rpcResponse);
if(numBytes < 0){
return true;
}
if(!call.rpcResponse.hasRemaining()){
System.out.println(">>>data writing is finished.....");
call.rpcResponse = null;
if(numElements == 1){
done = true;
}else{
done = false;
}
}else{
System.out.println(">>>data writing is not finished and register writeselector on the channel.....");
call.connection.responseQueue.addFirst(call);
if(inHandler){
incPending();
try {
writeSelector.wakeup();
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
}catch (ClosedChannelException e){
done = true;
}finally {
decPending();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
return done;
}
private synchronized void incPending(){
pending++;
}
private synchronized void decPending(){
pending--;
notify();
}
private synchronized void waitPending() throws InterruptedException {
while(pending > 0){
wait();
}
}
}
6、Server类的成员
volatile private boolean running = true;
private String bindAddress;
private int port;
private BlockingDeque<Call> callQueue;
private int handlerCount;
private Handler[] handlers = null;
private Responder responder = null;
private List<Connection> connectionList =
Collections.synchronizedList(new LinkedList<Connection>());
private Listener listener = null;
private int numConnections = 0;
private int readThreads;
private final boolean tcpNoDelay;
private static int NIO_BUFFER_LIMIT = 8 * 1024;
Server类的方法
protected Server(String bindAddress, int port, int numReader) throws IOException {
this.tcpNoDelay = false;
this.bindAddress = bindAddress;
this.port = port;
this.readThreads = numReader;
this.callQueue = new LinkedBlockingDeque<Call>();
listener = new Listener();
responder = new Responder();
handlerCount = 1;
}
public synchronized void start(){
responder.start();
listener.start();
handlers = new Handler[handlerCount];
for(int i = 0; i < handlerCount; i++){
handlers = new Handler(i);
handlers.start();
}
}
public synchronized void stop(){
running = false;
running = false;
if (handlers != null) {
for (int i = 0; i < handlerCount; i++) {
if (handlers != null) {
handlers.interrupt();
}
}
}
listener.interrupt();
responder.interrupt();
notifyAll();
}
public static void bind(ServerSocket socket, InetSocketAddress address) throws IOException {
socket.bind(address);
if (!socket.isBound()) {
throw new BindException("could not find a free port...");
}
}
private void closeConnection(Connection connection){
synchronized (connectionList){
if(connectionList.remove(connection))
numConnections--;
}
connection.close();
}
private int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.read(buffer) : channelIO(channel, null, buffer);
return count;
}
private static int channelIO(ReadableByteChannel readCh,
WritableByteChannel writeCh,
ByteBuffer buf) throws IOException {
int originalLimit = buf.limit();
int initialRemaining = buf.remaining();
int ret = 0;
while(buf.remaining() > 0){
try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
buf.limit(buf.position() + ioSize);
ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
} finally {
buf.limit(originalLimit);
}
}
int nBytes = initialRemaining - buf.remaining();
return (nBytes > 0) ? nBytes : ret;
}
private int channelWrite(SocketChannel channel, ByteBuffer rpcResponse) throws IOException {
int count = (rpcResponse.remaining() <= NIO_BUFFER_LIMIT)?
channel.write(rpcResponse):channelIO(null, channel, rpcResponse);
return count;
}
private void setupResponse(ByteArrayOutputStream responseBuf, Call call, Writable rv){
responseBuf.reset();
DataOutputStream out = new DataOutputStream(responseBuf);
try {
final DataOutputBuffer buf = new DataOutputBuffer();
rv.write(buf);
byte[] data = buf.getData();
int fullLength = buf.getLength();
// out.writeInt(fullLength);
out.writeInt(call.callId);
out.write(data, 0, buf.getLength());
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(">>>set response of the call#" + call.callId + "........");
call.setResponse(ByteBuffer.wrap(responseBuf.toByteArray()));
}
public Writable call(Call call){
return call.rpcRequest;
} |
|
|