设为首页 收藏本站
查看: 2393|回复: 0

[经验分享] Hadoop学习九:Hadoop-hdfs RPC源码 Server

[复制链接]

尚未签到

发表于 2016-12-9 10:09:58 | 显示全部楼层 |阅读模式
一.Server类图
  DSC0000.jpg
二.详细描述


  • Call:server端的Call对象,对应着client的一个Call对象,两者id相同。同client Call一样,server Call封装了每次方法调用的参数信息和调用结果。
      //server端的Call对象,对应着client的一个Call对象,两者id相同
    private static class Call {
    private int id;                               // cleint Call的id
    private Writable param;                // client Call传过来的参数,实际上就是client Call的param,实际上也是RPC.Invocation
    private Connection connection;      //server到client的连接
    private ByteBuffer response;         //server Call的结果,类似client Call的value
    public Call(int id, Writable param, Connection connection) {
    this.id = id;
    this.param = param;
    this.connection = connection;
    this.response = null;
    }
    public void setResponse(ByteBuffer response) {
    this.response = response;
    }
    }
  • Connection:client向server发送消息时,server端nio接受后会创建一个到client的连接,用来向client发送消息。
      public class Connection {
    private boolean rpcHeaderRead = false; // if initial rpc header is read
    private boolean headerRead = false;  //if the connection header that
    private SocketChannel channel;
    private ByteBuffer data;
    private LinkedList<Call> responseQueue;//需要发往client端的Call
    private Socket socket;
    //初始化server Connction
    //channel  client的SocketChannel
    public Connection(SelectionKey key, SocketChannel channel,
    this.channel = channel;
    this.socket = channel.socket();
    this.responseQueue = new LinkedList<Call>();
    }   
     
  • Listener:监听client发送过来的消息,分发给Reader线程处理,最终目的是把消息封装成Server call对象,放入callQueue。
    //监听线程,NIO Reactor模式,读取client发送过来的消息
    private class Listener extends Thread {
    private ServerSocketChannel acceptChannel = null; //the accept channel
    private Selector selector = null; //the selector that we use for the server
    private Reader[] readers = null;
    private InetSocketAddress address; //the address we bind at
    private ExecutorService readPool;
    public Listener() throws IOException {
    address = new InetSocketAddress(bindAddress, port);
    acceptChannel = ServerSocketChannel.open();
    acceptChannel.configureBlocking(false);
    // bindserver socket到本机ip+指定port 50020
    bind(acceptChannel.socket(), address, backlogLength);
    // create a selector;
    selector= Selector.open();
    //线程池执行所有消息
    readers = new Reader[readThreads];
    readPool = Executors.newFixedThreadPool(readThreads);
    for (int i = 0; i < readThreads; i++) {
    Selector readSelector = Selector.open();
    Reader reader = new Reader(readSelector);
    readers = reader;
    readPool.execute(reader);
    }
    acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
    }
    //3.1.创建Reader线程读取消息,最终目的把消息封装成Server call对象,放入callQueue
    private class Reader implements Runnable {
    private Selector readSelector = null;
    Reader(Selector readSelector) {
    this.readSelector = readSelector;
    }
    public void run() {
    nio方式处理消息{
    doRead(key);
    }
    }

  • Handler:从callQueue里面取Call,调用call方法(实际调用RPC.Server的call方法,真正执行远程命令的地方),远程命令执行完后,决定是否立刻向client发送执行命令的结果(Server call的response)。
      //4.从callQueue里面取Call,调用call方法(实际调用RPC.Server的call方法)
    private class Handler extends Thread {
    @Override
    public void run() {
    while (running) {
    //4.1 从callQueue里面取Call,调用call方法
    final Call call = callQueue.take(); // pop the queue; maybe blocked here
    CurCall.set(call);
    value = call(call.connection.protocol, call.param, call.timestamp);
    CurCall.set(null);
    //4.2 为Server call赋值
    setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR,  value, errorClass, error);
    //4.3处理赋值完后的call
    responder.doRespond(call);
    }
    }
    }
  • Responder:异步向client发送Server call的结果。
    //5.异步发送Server call的response到client端
    private class Responder extends Thread {
    private Selector writeSelector;
    Responder() throws IOException {
    writeSelector = Selector.open(); // create a selector
    }
    @Override
    public void run() {
    while (running) {
    Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
    while (iter.hasNext()) {
    SelectionKey key = iter.next();
    iter.remove();
    //5.异步写
    if (key.isValid() && key.isWritable()) {
    doAsyncWrite(key);
    }
    }
    }
    }
    //4.3将赋值完的server call放入responder队列里
    void doRespond(Call call) throws IOException {
    synchronized (call.connection.responseQueue) {
    call.connection.responseQueue.addLast(call);
    //如果真有一个call,同步发送
    if (call.connection.responseQueue.size() == 1) {
    processResponse(call.connection.responseQueue, true);
    }
    //如果多个call,交给Responder线程异步发送
    }
    }
    //5异步发送
    private void doAsyncWrite(SelectionKey key) throws IOException {
    Call call = (Call)key.attachment();
    processResponse(call.connection.responseQueue, false);
    }
    //向client写消息
    private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler) throws IOException {
    Call call = responseQueue.removeFirst();
    SocketChannel channel = call.connection.channel;
    channelWrite(channel, call.response);
    }
    }
  •  Server:IPC Server。
    public abstract class Server {
    private BlockingQueue<Call> callQueue; // queued calls
    private Listener listener = null;
    private Responder responder = null;
    private int numConnections = 0;
    private Handler[] handlers = null;
    //1.初始化一个IPC server,指定RPC服务器地址和端口
    protected Server(String bindAddress, int port,   Invocation.class, ...) throws IOException {
    this.bindAddress = bindAddress;
    this.port = port;
    this.paramClass = paramClass;
    //创建listener线程
    listener = new Listener();
    this.port = listener.getAddress().getPort();   
    //创建responder线程
    responder = new Responder();
    }

    //2.启动IPC server
    public synchronized void start() {
    //启动istener线程
    responder.start();
    //启动responder线程
    listener.start();
    //创建Handler线程,启动Handler线程
    handlers = new Handler[handlerCount];
    for (int i = 0; i < handlerCount; i++) {
    handlers = new Handler(i);
    handlers.start();
    }
    }
    //3.client第一次发送消息到server时触发doAccept
    public void run() {
    nio方式接受消息{
    doAccept(key);
    }
    }
    //3.doAccept
    void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
    Connection c = null;
    ServerSocketChannel server = (ServerSocketChannel) key.channel();
    SocketChannel channel;
    while ((channel = server.accept()) != null) {
    channel.configureBlocking(false);
    channel.socket().setTcpNoDelay(tcpNoDelay);
    //3.1.创建Reader线程读取消息
    Reader reader = getReader();
    reader.startAdd();
    SelectionKey readKey = reader.registerChannel(channel);
    //3.2得根据client的SocketChannel创建一个server Connection,传递给Handler线程
    c = new Connection(readKey, channel, System.currentTimeMillis());
    readKey.attach(c);
    }
    }
    //3.1Reader线程读取消息
    void doRead(SelectionKey key) throws InterruptedException {
    Connection c = (Connection)key.attachment();
    c.readAndProcess();
    }   
    }
    //3.1读取channel
    public int readAndProcess() throws IOException, InterruptedException {
    int count = channelRead(channel, rpcHeaderBuffer);
    count =  channelRead(channel, data);
    processOneRpc(data.array());
    return count;
    }
    private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
    processData(buf);
    }
    //3.1根据client channel内容(方法参数)创建一个server Call,放入callQueue
    private void processData(byte[] buf) throws  IOException, InterruptedException {
    DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf));
    int id = dis.readInt();                    // try to read an id
    Writable param = ReflectionUtils.newInstance(paramClass, conf);//read param
    param.readFields(dis);        
    Call call = new Call(id, param, this);
    callQueue.put(call);              // queue the call; maybe blocked here
    }
    }
    //4.1
    public abstract Writable call(Class<?> protocol,  Writable param, long receiveTime , throws IOException;
    //4.2 为server call赋值(response)
    private void setupResponse(ByteArrayOutputStream response,  Call call, Status status,
    Writable rv, String errorClass, String error)  throws IOException {
    call.setResponse(ByteBuffer.wrap(response.toByteArray()));
    }
    }

     

三.关于Socket和ServerSocket的bind方法


  • Java网络编程从入门到精通(25):创建ServerSocket对象 http://edu.xvna.com/html/68156_3.html
  • 简单分析一下socket中的bind http://www.cnblogs.com/nightwatcher/archive/2011/07/03/2096717.html

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-311832-1-1.html 上篇帖子: Hadoop学习八:Hadoop-Hdfs RPC源码 Client 下篇帖子: Hadoop学习十七:Hadoop-Hdfs DataXceiverServer源码写数据
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表