设为首页 收藏本站
查看: 1181|回复: 0

[经验分享] [hadoop源码阅读][6]-org.apache.hadoop.ipc-ipc.server

[复制链接]

尚未签到

发表于 2015-7-13 07:36:41 | 显示全部楼层 |阅读模式
1.      nio的reactor模式
DSC0000.jpg
具体的处理方式:

·     1.一个线程来处理所有连接(使用一个Selector)

·     2.一组线程来读取已经建立连接的数据(多个Selector,这里的线程数一般和cpu的核数相当);

·     3.一个线程池(这个线程池大小可以根据业务需求进行设置)

·     4.一个线程处理所有的连接的数据的写操作(一个Selector)

  
2.      简明流程图
DSC0001.jpg

3.      RPC Server主要流程
RPC Server作为服务提供者由两个部分组成:接收Call调用和处理Call调用。
DSC0002.jpg
接收Call调用负责接收来自RPC Client的调用请求,编码成Call对象后放入到Call队列中。这一过程由Listener线程完成。具体步骤:
l      Listener线程监视RPC Client发送过来的数据。
l      当有数据可以接收时,调用Connection的readAndProcess方法。
l      Connection边接收边对数据进行处理,如果接收到一个完整的Call包,则构建一个Call对象PUSH到Call队列中,由Handler线程来处理Call队列中的所有Call。

处理Call调用负责处理Call队列中的每个调用请求,由Handler线程完成:
l      Handler线程监听Call队列,如果Call队列非空,按FIFO规则从Call队列取出Call。
l      将Call交给RPC.Server处理。
l      借助JDK提供的Method,完成对目标方法的调用,目标方法由具体的业务逻辑实现。
l      返回响应。Server.Handler按照异步非阻塞的方式向RPC Client发送响应,如果有未发送出的数据,则交由Server.Responder来完成。

4. server类的结构
0)抽象类

这里的Server类是个抽象类,唯一抽象的地方,就是

public abstract Writable call(Writable param, long receiveTime) throws IOException;
由RPC.server来实现

1)Call

用以存储客户端发来的请求,这个请求会放入一个BlockQueue中;

2)Listener

监听类,用以监听客户端发来的请求。同时Listener下面还有一个静态类,Listener.Reader,当监听器监听到用户请求,便用让Reader读取用户请求。

Listener主要负责Socket的监听以及Connection的建立,同时监控ClientSocket的数据可读事件,通知Connection进行processData,收到完成请求包以后,封装为一个Call对象(包含Connection对象,从网络流中读取的参数信息,调用方法信息),将其放入队列

3)Responder

响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。

它不断地检查响应队列中是否有调用信息,如果有的话,就把调用的结果返回给客户端

4)Connection

连接类,真正的客户端请求读取逻辑在这个类中。

Connection,代表与Client端的连接,读取客户端的call并放到一个阻塞队列中,Handler负责从这个队列中读取数据并处理

5)Handler

请求(blockQueueCall)处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。

真正做事的实体。它从调用队列中获取调用信息,然后反射调用真正的对象,得到结果,然后再把此次调用放到响应队列(response queue)里  


5.      Server的启动,运行
5.1.Namenode的getserver实例使用


private void initialize(Configuration conf) throws IOException
{
this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount, false, conf, namesystem.getDelegationTokenSecretManager());
serviceRpcServer.start();
}




  
5.2.Start服务启动




public synchronized void start()
{
responder.start();
listener.start();
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++)
{
handlers = new Handler(i);
handlers.start();
}
}


responder、listener、handlers三个对象的线程均阻塞了,前两个阻塞在selector.select()方法上,handler阻塞在callQueue.take()方法,都在等待客户端请求。Responder设置了超时时间,为15分钟。而listener还开启了Reader线程,该线程也阻塞了。



5.3. Listener线程做的工作
Listener监听到请求,获得所有请求的SelectionKey,执行doAccept(key)方法,该方法将所有的连接对象放入list中,并将connection对象与key绑定,以供reader使用。初始化玩所有的conne对象之后,就可以激活Reader线程了.
Reader的run方法和Listener基本一致,也是获得所有的SelectionKey,再执行doRead(key)方法。该方法获得key中绑定的connection,并执行conection的readAndProcess()方法
简明调用函数过程:
Listener:: run-> Listener:: doAccept( 激活Reader线程)->>Reader:: doRead->>connection:: readAndProcess->> connection::processOneRpc->>connection:: processData



  


private void processData(byte[] buf) throws  IOException, InterruptedException
{
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf));
int id = dis.readInt();      // 尝试读取id
    Writable param = ReflectionUtils.newInstance(paramClass, conf);//读取参数
    param.readFields(dis);//这个就是client传递过来的Invocation,包含了函数名和参数
    Call call = new Call(id, param, this);  //封装成call
    callQueue.put(call);   // 将call存入callQueue
    incRpcCount();  // 增加rpc请求的计数
}


5.4. Handler线程做的工作
Handler线程的run函数



while (running)
{
try
{
final Call call = callQueue.take(); //弹出call,可能会阻塞
//调用ipc.Server类中的call()方法,但该call()方法是抽象方法,具体实现在RPC.Server类中
        value = call(call.connection.protocol, call.param, call.timestamp);
synchronized (call.connection.responseQueue)
{
setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR, value, errorClass, error);
//给客户端响应请求
            responder.doRespond(call);
}
}
}

关于call函数的调用,稍后分析

5.5.Responder线程做的工作



void doRespond(Call call) throws IOException
{
synchronized (call.connection.responseQueue)
{
call.connection.responseQueue.addLast(call);//放到队列里面去
        if (call.connection.responseQueue.size() == 1)
{
processResponse(call.connection.responseQueue, true);
}
}
}


简明调用结构为:

Responder::run->>doAsyncWrite->>processResponse


5.6.最后来看server.call函数是怎么执行的



  


public Writable call(Class protocol, Writable param, long receivedTime) throws IOException
{
try
{
Invocation call = (Invocation) param;
Method method = protocol.getMethod(call.getMethodName(), call.getParameterClasses());//获取client端调用的函数
        Object value = method.invoke(instance, call.getParameters());//instance即启动服务的对象,也即实现protocol的对象
        return new ObjectWritable(method.getReturnType(), value);//将结果序列化
    } catch (InvocationTargetException e)
{
} catch (Throwable e)
{
}
}

6.      用户可以做的操作


1. Reader数量
      正常情况下,一个客户端关联一个Reader,如果有很多客户端(client或DataNode),那么就可以相应增加这个配置
      参数:ipc.server.read.threadpool.size,默认是1,需要注意的是,这个配置参数是0.21版本的,不同版本的参数可能不一样
2. Handler数量
      对于这种做事的线程,不好把握度,到底多少才是合适。
      参数:dfs.namenode.handler.count, 这里是以NameNode举例
3. 客户端重试次数
      客户端在调用时发生异常,重试是无可厚非。但如果对实时性有要求,那么这里的重试就有考量。Fackbook在做的Realtime分析就有提到RPC的重试是需要修改的
      参数:ipc.client.connect.max.retries,默认是10
4. tcp no delay
      不建议对它有什么设置。如果我们对整个调用的过程中数据量大小及网络环境不清楚的话,就是设置了也不知道它是否有作用。
      参数:ipc.client.tcpnodelay,默认是false
  

  7. 时序图
DSC0003.jpg
  
  8. 类图
DSC0004.jpg
  
  9.参考
  http://blog.iyunv.com/sxf_824/article/details/4842153
  http://www.wikieno.com/2012/02/hadoop-ipc-server/
  http://caibinbupt.iteye.com/blog/281281
  http://www.tbdata.org/archives/1413  http://blog.iyunv.com/shirdrn/article/details/4598295
  http://lidejiasw.wordpress.com/2011/05/07/hadoop-rpc%E5%88%86%E6%9E%90/

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-85893-1-1.html 上篇帖子: Hadoop的调度器总结(转) 下篇帖子: Hadoop 部署
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表