设为首页 收藏本站
查看: 1123|回复: 0

[经验分享] Hadoop源码学习笔记(4) ——Socket到RPC调用

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-7-12 11:56:39 | 显示全部楼层 |阅读模式
  Hadoop源码学习笔记(4)
   ——Socket到RPC调用
  Hadoop是一个分布式程序,分布在多台机器上运行,事必会涉及到网络编程。那这里如何让网络编程变得简单、透明的呢?
  网络编程中,首先我们要学的就是Socket编程,这是网络编程中最底层的程序接口,分为服务器端和客户端,服务器负责监听某个端口,客户端负责连接服务器上的某个端口,一旦连接通过后,服务器和客户端就可以双向通讯了,我们看下示例代码:


  • ServerSocket server = new ServerSocket(8111);

  • Socket socket = server.accept();


  • //由Socket对象得到输入流,并构造相应的BufferedReader对象

  • BufferedReader is = new BufferedReader(new InputStreamReader(socket.getInputStream()));

  • //由Socket对象得到输出流,并构造PrintWriter对象

  • PrintWriter os = new PrintWriter(socket.getOutputStream());


  • while(true){

  •    String inline = is.readLine();

  •    System.out.println(" 收到信息:" + inline);

  •    //服务器反回

  •    os.println("serverSend:" + inline);

  •    os.flush();

  •    if (inline == "bye")

  • break;

  • }

  • os.close();

  • is.close();

  • socket.close();

  • server.close();

  • System.out.println("服务器退出");
  


  • Socket socket = new Socket("127.0.0.1",8111);


  • //由Socket对象得到输入流,并构造相应的BufferedReader对象

  • BufferedReader is = new BufferedReader(new InputStreamReader(socket.getInputStream()));

  • //由Socket对象得到输出流,并构造PrintWriter对象

  • PrintWriter os = new PrintWriter(socket.getOutputStream());

  • BufferedReader sin=new BufferedReader(new InputStreamReader(System.in));

  • while(true){

  •    System.out.println("请输入:");

  •    String line = sin.readLine();

  •    os.println(line);

  •    os.flush();

  •    String inline = is.readLine();

  •    System.out.println("服务器获取值:" + inline);

  •    if (line=="bye")

  •       break;

  • }

  • os.close();

  • is.close();

  • socket.close();

  • System.out.println("客户端退出");
  
  这两段代码分别帖入两个类中,分开执行,先执行服务器端,再执行客户端,就可以互发消息了。
  观察下代码,发现代码中下面4~20行逻辑是一至的,都是通过流来通讯,所以Socket中不同的是开始地方,服务器是通过server.accept()来获取Socket,而客户端是通过直接创建Socket对象的。
  这段代码,其本运行是没问题的,但存在一个问题,就是当客户端接入时服务器端的accept函数才走下去,不然的话,会一直处于卡死等待状态。包括getInputStream函数,也会等待双方接通后,才往下走。除非等到客户端接入,或中断。当然有人会说,可以引入多线程啊,没错,是可以,但是想一下,是不是每个客户接入都得有一个线程? 否则少一个线程,就会有一堆的卡着。所以这种方式不适合在大最客户端接入的情况。
  
  在JDK1.4引入了非阻塞的通信方式,这样使得服务器端只需要一个线程就能处理所有客户端socket的请求。
  下面是几个需要用到的核心类:


  • ServerSocketChannel: ServerSocket 的替代类, 支持阻塞通信与非阻塞通信.

  • SocketChannel: Socket 的替代类, 支持阻塞通信与非阻塞通信.

  • Selector: 为ServerSocketChannel 监控接收客户端连接就绪事件, 为 SocketChannel 监控连接服务器就绪, 读就绪和写就绪事件.

  • SelectionKey: 代表 ServerSocketChannel 及 SocketChannel 向 Selector 注册事件的句柄. 当一个 SelectionKey 对象位于Selector 对象的 selected-keys 集合中时, 就表示与这个 SelectionKey 对象相关的事件发生了.在SelectionKey 类中有几个静态常量


    • SelectionKey.OP_ACCEPT->客户端连接就绪事件 等于监听serversocket.accept()返回一个socket

    • SelectionKey.OP_CONNECT->准备连接服务器就绪跟上面类似,只不过是对于socket的相当于监听了socket.connect()

    • SelectionKey.OP_READ->读就绪事件, 表示输入流中已经有了可读数据, 可以执行读操作了

    • SelectionKey.OP_WRITE->写就绪事件

  
  所以服务器端代码就可以升一下级了,变成如下:


  • public class SocketChannelTest implements Runnable {


  •    @Override

  •    public void run() {

  •       while (true) {

  •          try {

  •             selector.select();

  •             Set keys = selector.selectedKeys();

  •             Iterator iter = keys.iterator();

  •             SocketChannel sc;

  •             while (iter.hasNext()) {

  •                SelectionKey key = iter.next();

  •                if (key.isAcceptable())

  •                   ; // 新的连接

  •                else if (key.isReadable())

  •                   ;// 可读

  •                iter.remove(); // 处理完事件的要从keys中删去

  •             }

  •          } catch (Exception e) {

  •             e.printStackTrace();

  •          }

  •       }

  •    }

  •    static Selector selector;


  •    public static void main(String[] args) throws IOException,

  •          InterruptedException {

  •       selector = Selector.open(); // 静态方法 实例化selector

  •       ServerSocketChannel serverChannel = ServerSocketChannel.open();

  •       serverChannel.configureBlocking(false); // 设置为非阻塞方式,如果为true 那么就为传统的阻塞方式

  •       serverChannel.socket().bind(new InetSocketAddress(8001)); // 绑定IP 及 端口

  •       serverChannel.register(selector, SelectionKey.OP_ACCEPT); // 注册

  •                                                    // OP_ACCEPT事件

  •       Thread thd = new Thread(new SocketChannelTest());

  •       thd.start();// 开启线程 处理请求

  •       thd.join();

  •    }

  • }
  
  好,这样通讯代码简化了。但继续想,我们通讯的目的是什么?客户端发一个指令,服务器执行一些内容,然后把结果返回给客户端。这不就像调用一下函数么,调用函数名、传入参数、返回值。
  这个就称之为远程方法调用(RPC Remote Procedure Call Protocol),毫无疑问,这个RPC实现肯定是基于上面的这个Socket的。至于具体如何实现呢,我们看下面的分解。
  在看实现之前,我们先看一下,这个RPC是如何用的,如何做到调用透明的:
  我们在src下新建一个RPCTest的包,定义一个功能接口IRPCTestEntity.java:


  • package RPCTest;

  • import org.apache.hadoop.ipc.VersionedProtocol;

  • public interface IRPCTestEntity  extends VersionedProtocol {

  •     int Calc(int x,int y);

  • }
  该接口中有一个Calc的函数。
  定义一个实现类RPCTestEntity.java:


  • package RPCTest;

  • import java.io.IOException;

  • public class RPCTestEntity implements IRPCTestEntity{

  •    @Override

  •    public long getProtocolVersion(String protocol, long clientVersion)  throws IOException {

  •       return 0;

  •    }


  •    public int Calc(int x,int y){

  •       int z =0 ;

  •       z = x + y;

  •       return z;

  •    }


  • }
  这个类中实现了Calc函数,执行内容为将x,y相加,将和返回。
  我们再定义一个服务器类(RPCTestSvr.java),将该实现类注册成RPC服务:


  • package RPCTest;

  • import java.io.IOException;


  • public class RPCTestSvr {

  •    public static void main(String[] args) throws IOException, InterruptedException {

  •       RPCTestEntity obj = new RPCTestEntity();

  •       Configuration conf = new Configuration();

  •       Server server = RPC.getServer(obj, "", 9001, conf);

  •       server.start();

  •       server.join();

  •    }

  • }
  代码比较简单,定义了一个RPCTestEntity的实体,然后RPC创建一个Server,传入实体对象,然后这个服务就调用join卡住,用于不断接收请求。 创建完后,就可把这个"服务器"启动起来了。
  再创建一个客户端(RPCTestClient.java):


  • package RPCTest;


  • import java.io.IOException;

  • import java.net.InetSocketAddress;


  • import org.apache.hadoop.conf.Configuration;

  • import org.apache.hadoop.ipc.RPC;

  • import org.apache.hadoop.ipc.VersionedProtocol;


  • public class RPCTestClient {

  •    public static void main(String[] args) throws IOException {

  •       InetSocketAddress addr = new InetSocketAddress("127.0.0.1",9001);

  •       Configuration conf = new Configuration();

  •       VersionedProtocol obj = RPC.getProxy(IRPCTestEntity.class, 0, addr, conf);

  •       IRPCTestEntity ent = (IRPCTestEntity)obj;

  •       int x = ent.Calc(5, 6);

  •       System.out.println(x);

  •    }

  • }
  这里,我们通过RPC.getProxy函数获了一个IRPCTestEntity的接口实例,然后就可以直接调用了。
  运行后,发现这个值马上返回了过来,同时在"服务器"端也会收到一定的请求信息。说明两者之间通了。
  仔细看,这个客户端中,整个过程就没有涉及到RPCTestEntity这个实现的实体,换句话说,客户端产生的是一个虚拟的实现类,然后调用起来了。
  
  OK,示例程序跑起来了,也带给我们几个问题,1、这个客户端中的obj是什么对象?2、为什么我们调用obj对象中的函数(Calc)会跑到服务器上运行,如何实现的?
  
  底层的通讯,我们是知道的,肯定用socket,用它能够传递各种数据。如何与函数关联呢? 我们进入getProxy函数,
DSC0000.png
  我们看到这个getProxy函数中,返回了VersionedProtocol接口的对象,从字面意思,这个Proxy意为代理, 所以我们得到的obj就是一个代理类。同时也看出,要作为RPC处理对象,这个接口必实现VersionedProtocol(简单地看下里面,只有一个函数,返回版本号,是用于判断双方版本所用,只有版本匹配,才能调用)。
  其创建可以看到,用到了:
  Proxy.newProxyInstance(
   protocol.getClassLoader(), new Class[] { protocol },
   new Invoker(addr, ticket, conf, factory));
  然后这个代理类,就自动实现了伟放的protocol这个接口类型。然后当我们调用代理类中的函数时,这个传入的Invoker类,就会收到通知,通知里包含了调用信息,我们进入Invoker中看一下:
  private static class Invoker implements InvocationHandler
  这是一个写在RPC类中的内部类,且是私有的,意思就是只为这个RPC调用,其实现的规定接口InvocationHandler,那么就要实现规定的函数Invoke咯:


  • public Object invoke(Object proxy, Method method, Object[] args)

  •      throws Throwable {

  •      final boolean logDebug = LOG.isDebugEnabled();

  •      long startTime = 0;

  •      if (logDebug) {

  •        startTime = System.currentTimeMillis();

  •      }


  •      ObjectWritable value = (ObjectWritable)

  •        client.call(new Invocation(method, args), address,

  •                    method.getDeclaringClass(), ticket);

  •      if (logDebug) {

  •        long callTime = System.currentTimeMillis() - startTime;

  •        LOG.debug("Call: " + method.getName() + "" + callTime);

  •      }

  •      return value.get();

  •    }
  这个invoke函数,就是当我们调用代理类中的函数(obj.Calc)时,会收到的请求,看下参数,传入的有,Method(函数),args(参数),一应俱全,有了这些内容后,就可以调用底层的Socket,将这些信息打包起来(放入的Invocation类)中,一并发向服务器中。
  同时,服务器端中,就比较容易了,在收到请求后,就可以解析出要调用的函数和参数,然后通过反射来调用在服务器一开始注册上的对象中的函数,再将返回值通过Socket传回客户端,再由这个invoke函数将值返回。
  
  OK,这个几个点想通了,整个过程就容易理解了。总之:
  服务器端——注册服务:RPC.getServer(obj, "", 9001, conf);
  客户端——取得代理类:obj = RPC.getProxy()
  通过这样的包装后,网络访问就非常透明了。
  
  但这里,还是有很多深层次的问题,比如服务端对象生命状态如何管理,多个客户端并发怎么处理,传送数据中流怎么办? 这一系列的问题,就得深入看一下这个Server Client RPC这几个类才能找到答案了。 深入问题先留着,后面再来看, 目前先把它的架子搞清楚。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-85797-1-1.html 上篇帖子: hadoop配置机架感知 下篇帖子: Hadoop使用常见问题以及解决方法(转载)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表