设为首页 收藏本站
查看: 677|回复: 0

[经验分享] Hadoop IPC源码分析(一)

[复制链接]

尚未签到

发表于 2016-12-9 07:03:51 | 显示全部楼层 |阅读模式
Hadoop IPC源码分析(一)
 
RPC
 
RPC包含了一系列的静态类和静态方法。

使用PROTOCOL_ENGINES = new HashMap<Class,RpcEngine>();

作为实现的接口—>RPCEngine的一个缓存

 

使用PROXY_ENGINES = new HashMap<Class,RpcEngine>();

作为实现的接口代理->PPCEngine的一个缓存

 

接口代理使用JDK的Proxy.getProxyClass获取对应接口的代理类。

默认的RpcEngine的实现为  WritableRpcEngine.class ;

 

waitForProxy方法是在时间限制内周期的调用getProxy方法,而getProxy是从缓存中获取接口所对应的RpcEngine实例,具体的Proxy获取由RpcEgine接口实现类实现。

 

RPC类管理一系列的PpcEngine,getProxy getServer call stopProxy方法的实际工作是由RpcEgine实现的。

 

接口RpcEngine

 

接口定义了getProxy getServer call stopProxy几个方法。

 

getProxy用于获取客户端代理,getProxy方法包含了一个InetSocketAddress addr参数,用于指定PPC服务器的地址和一个SocketFactory factory用于创建客户端到服务器之间的连接。

 

getSever用于构建PRC服务器端,getServer有一个Object instance参数,此参数代表接口实现的类。PPC服务器将请求转发给instance实例处理。

 

call方法参数包含一个请求参数列表数组Object[][] params和一组PPC服务器地址InetSocketAddress[] addrs和需要执行的方法Method method。一次call请求可以同时请求多个服务器。

 

RpcEgine有两种实现AvroRpcEngine和WritableRpcEngine。

 

WritableRpcEngine

 

WritableRpcEngine使用Invocation类表示一次方法调用,一次调用包含调用的函数名、参数类型和参数值信息。

 

Map<SocketFactory, Client> clients = new HashMap<SocketFactory, Client>();

每一个client和一个SocktFactory是对应的,一个节点上建立了几个SocketFactory对象就有几个client。这里说明client不是每次方法调用的时候都新建一个客户端,而是一个进程类所有的调用都用同一个客户端。

 

使用JDK RPC框架实现Proxy,Proxy将接口任务转交给Invocker处理,Invocker类实现了InvocationHandler接口。在Invocker类的实现中,由将方法调用转发给client类。

 

每次调用接口方法是,都会先获取一个client,增加client引用计数,调用完毕后,减少client引用计数,当client引用计数为0的时候,将client从缓存中移除。

 

Call方法的实现中,没有用到JDK的动态代理,直接将需要调用的方法给client实例处理了。

 

Server复写了Rpc.Server的call方法,Rpc的call方法实现的是一次方法的调用,与RpcEngine中的call方法不同的,在此类中的call方法中添加了一些统计功能。

 

这个类命名为WritablePpcEngine主要体现在两点:

1.    代理中的invoke方法中调用了clinet的call方法,call方法的参数Invocation是一个Writable实现类;
2.       server的实现中,server的call方法接收的是一个Writable的类。
 

总之,调用过程中传递参数是通过Writable的实现类Invocation包装的。

 

下面分析两个真正做事的类Client类和Server类。

 

Client

 

首先分析Client的call方法。Client的call方法接收Invocation参数,将参数打包为一个Call对象,每一个Call对象都有一个唯一的id,Call包含了此次方法调用是否成功,返回值的信息。

 

Call对象通过一个Connection对象发送出去,然后调用Object.wait()方法循环等待,如果中间线程被中断,忽略中断继续等待,当call对象被notify的时候返回,而后取出Call对象的value的值。

 

那么Call对象的value值是什么时候被设置的呢?Connection有一个receiveResponse方法,当收到相应Call的id是,将此id对应的call的value值设置,然后notify等待中的call。

 

Connection用一个Hashtable<Integer, Call> calls维护在此连接上的方法调用。

 

下面看一次方法调用时,怎样获得连接对象的。一个client使用Hashtable<ConnectionId, Connection> connections来维护在此client上的连接池。

 

一个Connection是有ConnectionId标识的,相同的远端地址,接口和用户有相同的ConnectionId。一个Call获取Connection的时候,获取到的Connection也将此Call加入到其维护的calls中。此后,connection会进行一些IO流的设置操作。

 

下面看connection在设置IO流的过程中做了些什么事情。

 

如果是第一次使用connection,有两件事情要做:

1.    向服务器写RPC请求头;
2.    启动此连接上等待RPC回应的线程。
 

如果不是第一次使用此connection,直接返回。

 

PRC请求头中开始几个字母为hrpc,向服务器表明这是一个RPC连接,然后是版本号和验证方式。

 

然后写入与请求的接口服务和请求用户身份信息。

 

此连接上的线程循环判断是否有工作要做,如果calls不为空,就调用receiveResponse方法接收服务器的响应。

 

响应由CallId、方法调用状态和调用返回值组成,如果方法调用状态成功,则将对应的Call从calls中移除,通知等待结果的调用方法。

 

ParallelCall和ParallelResult不分析。

 

Server

 

Server的实现比Client要复杂。

首先分析类Server的构造函数和启动过程。在Server的构造函数中设置了监听端口,处理队列大小,元素为Call对象的阻塞队列,初始化Listener和Responder。

 

在server的start方法中,做了以下一些工作。

1.    启动Responder线程
2.    启动Listener线程
3.    启动handlerCount个Handler线程
 

然后分别看看这几个线程是做什么用,怎么工作的。

 

Listener

 

这个类的实现用到了concurrent并发包和nio的一些东西。

 

在Listener的构造函数中,打开了一个ServerSocketChannel,并为之绑定监听地址。每个Listner有一个selector,将服务器通道的OP_ACCEPT时间在selector上注册为感兴趣事件。建立一个固定大小的线程池,用于执行多个Reader线程,多个Reader线程以同一个readSelector为构造参数。

 

下面看看Reader到底是干什么的。

Reader线程阻塞在readSelector的select方法上,等待有感兴趣的事件发生或者有线程调用wakeup方法。当有感兴趣的事情发生时,遍历对应的SelectionKey进行读取操作。读取操作在doRead方法中完成。

 

每个SelectionKey关联了一个Server.Connection对象。得到了Connection对象后,就可以进行读取和处理Connection对象数据上的事情了。

 

现在来看看Connection对象怎么注册到readSelector上的。原来是在Listner线程中发生了selector感兴趣的OP_ACCEPT事件后。当有OP_ACCEPT事件发生时,说明客户端数据已经准备好了,调用accept后就得到服务器与客户端的套接字通道。

 

得到套接字通道后,以round-robin的方式选择一个Reader,将channel的OP_READ事件注册在readSelector上。并以当前的客户端服务器channel新建一个Connection,附在注册感兴趣事件返回的SelectionKey上面。最后将得到的connection加入到connectionList中进行维护。

 

现在回过头来看看Listner主线程在干什么,Listner循环阻塞在selector.select方法上面,获取感兴趣的OP_ACCEPT事件。

 

了解完了Listner类是干什么的,上面有提到处理数据是在count = c.readAndProcess();中完成的,下面分析Connection类。

 

Connection

 

主要分析Connection的readAndProcess方法。

(待续)

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-311550-1-1.html 上篇帖子: hadoop datanode 磁盘坏掉之后的解决办法 下篇帖子: Hadoop 优化总结(一)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表