设为首页 收藏本站
查看: 488|回复: 0

[经验分享] Hadoop RPC Server端的简单实现

[复制链接]

尚未签到

发表于 2016-12-11 09:44:12 | 显示全部楼层 |阅读模式
  Server端的主要负责接收client端发送的请求并处理,最后返回处理结果给客户端。
  Hadoop RPC的Server端采用了NIO技术,涉及到channel,selector等概念。Server类中主要有Listener,Connect,Call,Handler,Responder等类。
  1、Listener类和Reader类

private class Listener extends Thread {
private ServerSocketChannel acceptChannel = null;                        
private Selector selector = null;
private Reader[] readers = null;
private int currentReader = 0;
private InetSocketAddress address;
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
bind(acceptChannel.socket(), address);
selector = Selector.open();
readers = new Reader[readThreads];
for (int i = 0; i < readThreads; i++) {
System.out.println(">>>start reader" + i + "......");
Reader reader = new Reader("Socket Reader #" + (i + 1) + " for port" + port);
readers = reader;
reader.start();
}
System.out.println(">>>register listener selector on port" + port + "......");
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on:" + acceptChannel.socket().getLocalPort());
this.setDaemon(true);
}
private class Reader extends Thread {
private volatile boolean adding = false;
private final Selector readSelector;
Reader(String name) throws IOException {
super(name);
this.readSelector = Selector.open();
}
@Override
public void run() {
doRunLoop();
}
public synchronized void doRunLoop(){
while (running){
SelectionKey key = null;
try {
readSelector.select();
while(adding){
this.wait(1000);
}
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while(iter.hasNext()){
key = iter.next();
iter.remove();
if(key.isValid() && key.isReadable()){
doRead(key);
}
key = null;
}
}catch (InterruptedException e){
e.printStackTrace();
}catch (IOException e) {
e.printStackTrace();
}
}
}
public void doRead(SelectionKey key){
Connection c = (Connection)key.attachment();
if(c == null){
return;
}
int count = 0;
try {
System.out.println(">>>reader read and process " + this.toString() + "......");
count = c.readAndProcess();
} catch (IOException e) {
e.printStackTrace();
}catch (InterruptedException e){
e.printStackTrace();
}
if(count < 0) {
closeConnection(c);
c = null;
}
}
public void startAdd() {
adding = true;
readSelector.wakeup();
}
public synchronized void finishAdd() {
adding = false;
this.notify();
}
public synchronized SelectionKey registerChannel(SocketChannel channel) throws ClosedChannelException {
System.out.println(">>>register reader on channel:"+ this.toString() + "......");
return channel.register(readSelector, SelectionKey.OP_READ);
}
}
@Override
public void run() {
while (running) {
SelectionKey key = null;
try {
getSelector().select();
Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
if (key.isValid() && key.isAcceptable()) {
doAccept(key);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
synchronized (this) {
try {
acceptChannel.close();
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
selector = null;
acceptChannel = null;
while(!connectionList.isEmpty()){
closeConnection(connectionList.remove(0));
}
}
}
void doAccept(SelectionKey key) throws IOException {
Connection c = null;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
Reader reader = getReader();
reader.startAdd();
System.out.println(">>>start add reader" + reader.toString() + "...");
SelectionKey readKey = reader.registerChannel(channel);
System.out.println(">>>create connection...");
c = new Connection(readKey, channel);
readKey.attach(c);
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
}
reader.finishAdd();
}
}
Reader getReader() {
currentReader = (currentReader + 1) % readers.length;
return readers[currentReader];
}
synchronized Selector getSelector() {
return selector;
}
}
  2、Connection类

public class Connection {
private SocketChannel channel;
private ByteBuffer dataLengthBuffer;
private ByteBuffer data;
private int dataLength;
private LinkedList<Call> responseQueue;
public Connection(SelectionKey key, SocketChannel channel) {
this.channel = channel;
this.dataLengthBuffer = ByteBuffer.allocate(4);
this.data = null;
this.responseQueue = new LinkedList<Call>();
}
public int readAndProcess() throws IOException, InterruptedException {
int count = -1;
if(dataLengthBuffer.remaining() > 0){
System.out.println(">>>read the data length from the channel:" + channel.toString() + ".......");
count = channelRead(channel, dataLengthBuffer);
if(count < 0 || dataLengthBuffer.remaining() > 0){
return count;
}
}
System.out.println(">>>read the data from the channel:" + channel.toString() + ".......");
if(data == null){
dataLengthBuffer.flip();
dataLength = dataLengthBuffer.getInt();
data = ByteBuffer.allocate(dataLength);
}
count = channelRead(channel, data);
System.out.println(">>>finished reading the data from the channel and prepare to process the rpc.......");
if(data.remaining() == 0){
dataLengthBuffer.clear();
data.flip();
processOneRpc(data.array());
data = null;
}
return count;
}
private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf));
int id = 0;
Writable invocation = null;
try {
invocation = new Invocation(Client.class.getMethod("call", Writable.class, Client.ConnectionId.class), new Object[]{});
id = dis.readInt();
invocation.readFields(dis);
} catch (NoSuchMethodException e) {
e.printStackTrace();
}
System.out.println(">>> create the call according to the data: id#" + id + ":" + invocation.toString());
Call call = new Call(id, invocation, this);
callQueue.put(call);
}
public void close(){
}
}
  3、Call类

public static class Call {
private final int callId;             //标识调用的id,在客户端处理返回结果时用到
private final Writable rpcRequest;    //封装请求
private final Connection connection;   //连接中包含channel信息
private ByteBuffer rpcResponse;         //返回结果
public Call(int id, Writable param, Connection connection) {
this.callId = id;
this.rpcRequest = param;
this.connection = connection;
}
public void setResponse(ByteBuffer response){
this.rpcResponse = response;
}
}
  4、Handler类

private class Handler extends Thread{
public Handler(int instanceNumber){
this.setDaemon(true);
this.setName("IPC Server handler " + instanceNumber + "on port" + port);
}
@Override
public void run(){
ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
while(running){
Writable value = null;
try {
final Call call = callQueue.take();
System.out.println(">>>call the service on the server...");
value = call(call);
synchronized (call.connection.responseQueue){
System.out.println(">>>prepare to respond the call...");
setupResponse(buf, call, value);
responder.doRespond(call);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
  5、Responder类

private class Responder extends Thread{
private final Selector writeSelector;
private int pending;
final static int PURGE_INTERVAL = 900000; // 15mins
Responder() throws IOException {
this.setName("IPC Server Responder");
this.setDaemon(true);
writeSelector = Selector.open();
pending = 0;
}
@Override
public void run(){
while(running){
try {
waitPending();
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
iter.remove();
if(key.isValid() && key.isWritable()){
doAsyncWrite(key);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
try {
writeSelector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
void doRespond(Call call){
synchronized (call.connection.responseQueue){
call.connection.responseQueue.addLast(call);
System.out.println(">>>only one response then directly respond the call......");
if(call.connection.responseQueue.size() == 1){
processResponse(call.connection.responseQueue, true);
}
}
}
private void doAsyncWrite(SelectionKey key) throws IOException {
Call call = (Call) key.attachment();
if(call == null){
return;
}
if(key.channel() != call.connection.channel){
throw new IOException("doAsyncWrite: bad channel");
}
synchronized (call.connection.responseQueue){
System.out.println(">>>doAsyncwrite...........");
processResponse(call.connection.responseQueue, false);
}
}
private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler){
boolean done  = false;
Call call = null;
int numElements = 0;
synchronized (responseQueue){
if((numElements = responseQueue.size()) == 0){
return true;
}
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
try {
int numBytes = channelWrite(channel, call.rpcResponse);
if(numBytes < 0){
return true;
}
if(!call.rpcResponse.hasRemaining()){
System.out.println(">>>data writing is finished.....");
call.rpcResponse = null;
if(numElements == 1){
done = true;
}else{
done = false;
}
}else{
System.out.println(">>>data writing is not finished and register writeselector on the channel.....");
call.connection.responseQueue.addFirst(call);
if(inHandler){
incPending();
try {
writeSelector.wakeup();
channel.register(writeSelector, SelectionKey.OP_WRITE, call);
}catch (ClosedChannelException e){
done = true;
}finally {
decPending();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
return done;
}
private synchronized void incPending(){
pending++;
}
private synchronized void decPending(){
pending--;
notify();
}
private synchronized void waitPending() throws InterruptedException {
while(pending > 0){
wait();
}
}
}
  6、Server类的成员

volatile private boolean running = true;
private String bindAddress;
private int port;
private BlockingDeque<Call> callQueue;
private int handlerCount;
private Handler[] handlers = null;
private Responder responder = null;
private List<Connection> connectionList =
Collections.synchronizedList(new LinkedList<Connection>());
private Listener listener = null;
private int numConnections = 0;
private int readThreads;
private final boolean tcpNoDelay;
private static int NIO_BUFFER_LIMIT = 8 * 1024;
  Server类的方法

protected Server(String bindAddress, int port, int numReader) throws IOException {
this.tcpNoDelay = false;
this.bindAddress = bindAddress;
this.port = port;
this.readThreads = numReader;
this.callQueue = new LinkedBlockingDeque<Call>();
listener = new Listener();
responder = new Responder();
handlerCount = 1;
}
public synchronized void start(){
responder.start();
listener.start();
handlers = new Handler[handlerCount];
for(int i = 0; i < handlerCount; i++){
handlers = new Handler(i);
handlers.start();
}
}
public synchronized void stop(){
running = false;
running = false;
if (handlers != null) {
for (int i = 0; i < handlerCount; i++) {
if (handlers != null) {
handlers.interrupt();
}
}
}
listener.interrupt();
responder.interrupt();
notifyAll();
}
public static void bind(ServerSocket socket, InetSocketAddress address) throws IOException {
socket.bind(address);
if (!socket.isBound()) {
throw new BindException("could not find a free port...");
}
}
private void closeConnection(Connection connection){
synchronized (connectionList){
if(connectionList.remove(connection))
numConnections--;
}
connection.close();
}
private int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.read(buffer) : channelIO(channel, null, buffer);
return count;
}
private static int channelIO(ReadableByteChannel readCh,
WritableByteChannel writeCh,
ByteBuffer buf) throws IOException {
int originalLimit = buf.limit();
int initialRemaining = buf.remaining();
int ret = 0;
while(buf.remaining() > 0){
try {
int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
buf.limit(buf.position() + ioSize);
ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
} finally {
buf.limit(originalLimit);
}
}
int nBytes = initialRemaining - buf.remaining();
return (nBytes > 0) ? nBytes : ret;
}
private int channelWrite(SocketChannel channel, ByteBuffer rpcResponse) throws IOException {
int count = (rpcResponse.remaining() <= NIO_BUFFER_LIMIT)?
channel.write(rpcResponse):channelIO(null, channel, rpcResponse);
return count;
}
private void setupResponse(ByteArrayOutputStream responseBuf, Call call, Writable rv){
responseBuf.reset();
DataOutputStream out = new DataOutputStream(responseBuf);
try {
final DataOutputBuffer buf = new DataOutputBuffer();
rv.write(buf);
byte[] data = buf.getData();
int fullLength = buf.getLength();
//            out.writeInt(fullLength);
out.writeInt(call.callId);
out.write(data, 0, buf.getLength());
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(">>>set response of the call#" + call.callId + "........");
call.setResponse(ByteBuffer.wrap(responseBuf.toByteArray()));
}
public Writable call(Call call){
return call.rpcRequest;
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312633-1-1.html 上篇帖子: Hadoop In Action 第四章(2) 下篇帖子: Hadoop中MapReduce多种join实现实例分析
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表