设为首页 收藏本站
查看: 2336|回复: 0

[经验分享] Hadoop源码分析之一(RPC机制之Server)

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2014-6-6 12:09:41 | 显示全部楼层 |阅读模式

想了解Hadoop是如何工作的,首先需要了解Hadoop RPC原理。Hadoop提供了一个统一的RPC机制来处理client-namenode, namenode-dataname,client-dataname之间的通信。整个机制中主要运用了如下技术:


  • Java dynamic proxy:主要用于处理client的代理来统一向Server端发送信息;
  • Google Protocal Buffer:主要用于把Request和Response序列化和反序列化成byte进行通信;
  • Java NIO:主要用于RPC Client与Server之间的通信;
下面主要介绍RPC Server主要的实现逻辑:
QQ截图20140606121006.png
如上图Server端主要Listener, Responer Thread和Handler, Reader Thread Pool。
  • Listener Thread:Server端会启一个Listener线程主要用于监听Client发送过来的Request,Listener主要完成创建一个Connection Object并启动一组Reader Thread Pool,并把Connection通过NIO的SelectionKey传递给Reader,这样就解决了Listener单线程带来的可能的性能瓶颈,因为Listener只作了一层转发;
  • Reader Thread Pool:主要用于读取Listener传过来的Connection,并调用Connection的readAndProcess方法来读取Request,并封装成一个Call放到Call Queue中;
  • Hanlder Thread Pool:Server会启动一组线程组来处理Call Queue中Call,并把处理的Respone中放到response queue中,Hanlder怎么处理Call会在后续介绍;
  • Responder Thread:主要处理response queue中的response,并把response发送给client,如果当前response queue为空,在加了新的response时会马上发送给client端,不会通过responer thread来发送。
  • 下面主要结合实际代码来介绍一下Hadoop RPC Server端整个的处理过程:(整个核心逻辑都在org.apache.hadoop.ipc.Server这个类中实现)

    首先在Server的start方法中启用相关的线程,start方法会在相关的Server中调用,如NameNode, DataNode等

      public synchronized void start() {
        responder.start();
        listener.start();
        handlers = new Handler[handlerCount];
       
        for (int i = 0; i < handlerCount; i++) {
          handlers = new Handler(i);
          handlers.start();
        }
      }

    如上可以看到启动了listener,responer和handler threand pool,可以通过ipc.server.handler.queue.size来配置handler的个数,默认为100个。
    在Server中有一个Listener的内部类,Listener的创造函数中会启动相应的Reader Thread Pool,可以通过ipc.server.read.threadpool.size来配置Reader的线程数,默认为1个。Listener会监听OP_ACCEPT事件,并在doAccpet中创建一个Connection传递给Reader,如下:

    while ((channel = server.accept()) != null) {
            channel.configureBlocking(false);
            channel.socket().setTcpNoDelay(tcpNoDelay);
            //get one reader instance for reader pool
            Reader reader = getReader();
            try {
          //lock this reader
              reader.startAdd();
              SelectionKey readKey = reader.registerChannel(channel);
              //build Connection instance
              c = new Connection(readKey, channel, Time.now());
              readKey.attach(c);
              synchronized (connectionList) {
                connectionList.add(numConnections, c);
                numConnections++;
              }   
            } finally {
              //unlock this reader
              reader.finishAdd();
            }
          }
        }

    在Reader中主要通用Connection来读取Requeset并封装成Call,读取Request分为2种处理,一种为权限读取,另一种为普通读取,具体的实现可以查看hadoop源码,这里不粘出。但是这里会介绍一下整个request在整个传送过程中的结构,request由head和conent组成。

    Head的组成如下:
    * +----------------------------------+
    * | "hrpc" 4 bytes |
    * +----------------------------------+
    * | Version (1 bytes) |
    * +----------------------------------+
    * | Authmethod (1 byte) |
    * +----------------------------------+
    * | IpcSerializationType (1 byte) |
    * +----------------------------------+

    conent主要有4 bytes的是date length和具体的data byte array组成。
    Handler怎么处理具体的Call会在后续具体介绍,这里不做具体展开。
    Responder主要处理Call产生的Response并通过NIO传输给具体的client, 使用的socket channel与request为同一个,Responder会做相应的check,如下:

        //check select key channel is same as call.connection.channel
        if (key.channel() != call.connection.channel) {
            throw new IOException("doAsyncWrite: bad channel");
        }


        // Extract the first call
        call = responseQueue.removeFirst();
        SocketChannel channel = call.connection.channel;
        // Using call channel to send as much data as we can in the non-blocking fashion
        int numBytes = channelWrite(channel, call.rpcResponse);


    整个Hadoop RPC Server处理的机制就介绍到这里,后续会介绍call怎么处理到具体的处理及Client如何向Server发送信息。



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-20287-1-1.html 上篇帖子: Hadoop中DataNode与NameNode之间的心跳机制 下篇帖子: MapReduce错误任务失败处理
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表