设为首页 收藏本站
查看: 757|回复: 0

[经验分享] Hadoop学习八:Hadoop-Hdfs RPC源码 Client

[复制链接]

尚未签到

发表于 2016-12-9 10:09:00 | 显示全部楼层 |阅读模式
一.Client类图
DSC0000.jpg

二.详细描述


  •  ConnectionId:This class holds the address and the user ticket. The client connections to servers are uniquely identified by <remoteAddress, protocol, ticket>。一个connection由一个ConnectionId唯一标识;所以要重写ConnectionId的equals和hashcode方法。
  • ConnectionHeader:The IPC connection header sent by the client to the server on connection establishment.
  • Connection:继承Thread。代表client到server的一个连接。我在文中将Connection对象称为“连接”。
      /** Thread that reads responses and notifies callers.  Each connection owns a
    * socket connected to a remote address.  Calls are multiplexed through this
    * socket: responses may be delivered out of order. */
    private class Connection extends Thread {
    //一个连接的基本信息
    private InetSocketAddress server;             // server ip:port,注意是服务端
    private ConnectionHeader header;              // connection header
    private final ConnectionId remoteId;                // connection id
    private Socket socket = null;                 // connected socket
    private DataInputStream in;
    private DataOutputStream out;
    //所有的调用
    private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
    //1.构造函数只初始化一些基本信息
    public Connection(ConnectionId remoteId) throws IOException {
    this.remoteId = remoteId;
    this.server = remoteId.getAddress();
    header = new ConnectionHeader(protocol == null ? null : protocol.getName(), ticket, authMethod);
    this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
    remoteId.getAddress().toString() +" from " + ((ticket==null)?"an unknown user":ticket.getUserName()));
    this.setDaemon(true);
    }
    //2.与IPC server建立socket连接
    private synchronized void setupConnection() throws IOException {
    while (true) {
    try {
    this.socket = socketFactory.createSocket();
    }
    }
    /** Connect to the server and set up the I/O streams. It then sends
    * a header to the server and starts
    * the connection thread that waits for responses.
    */
    private synchronized void setupIOstreams() throws InterruptedException {
    try {
    while (true) {
    //2.与IPC server建立socket连接
    setupConnection();
    //3.创建流
    InputStream inStream = NetUtils.getInputStream(socket);
    OutputStream outStream = NetUtils.getOutputStream(socket);
    //4.发送RPC报头
    writeRpcHeader(outStream);
    this.in = new DataInputStream(new BufferedInputStream (new PingInputStream(inStream)));
    this.out = new DataOutputStream(new BufferedOutputStream(outStream));
    //5.发送connection header到server
    writeHeader();
    //6.启动自己(线程),接受response
    start();
    return;
    }
    } catch (Throwable t) {
    if (t instanceof IOException) {
    markClosed((IOException)t);
    } else {
    markClosed(new IOException("Couldn't set up IO streams", t));
    }
    close();
    }
    }
    //4.发送RPC报头
    private void writeRpcHeader(OutputStream outStream) throws IOException {
    DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
    //public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
    //public static final byte CURRENT_VERSION = 4;
    out.write(Server.HEADER.array());
    out.write(Server.CURRENT_VERSION);
    authMethod.write(out);
    out.flush();
    }
    //5.发送connection header到server
    private void writeHeader() throws IOException {
    DataOutputBuffer buf = new DataOutputBuffer();
    header.write(buf);
    int bufLen = buf.getLength();
    out.writeInt(bufLen);
    out.write(buf.getData(), 0, bufLen);
    }
    //6.启动自己(线程),等待接受response,接受完后关闭此连接
    public void run() {
    while (waitForWork()) {//Return true if it is time to read a response; false otherwise.
    receiveResponse();
    }
    //关闭此连接
    close();
    }
    //7.发送请求:长度+内容
    public void sendParam(Call call) {
    DataOutputBuffer d=null;
    synchronized (this.out) {
    d = new DataOutputBuffer();
    d.writeInt(call.id);
    call.param.write(d);
    byte[] data = d.getData();
    int dataLength = d.getLength();
    out.writeInt(dataLength);      //first put the data length
    out.write(data, 0, dataLength);//write the data
    out.flush();
    }
    }  
    //6.接受response,把结果赋值给Call对象
    private void receiveResponse() {
    int id = in.readInt();                    // try to read an id
    Call call = calls.get(id);
    int state = in.readInt();     // read call status
    if (state == Status.SUCCESS.state) {
    Writable value = ReflectionUtils.newInstance(valueClass, conf);
    value.readFields(in);                 // read value
    call.setValue(value);
    calls.remove(id);
    }
    }
    }
  • Call对象:RPC是基于反射的,每次方法调用都对应一个Call对象,我在文中将Call对象称为“调用”。
        private class Call {
    int id;                                       // call id,唯一标示一个Call
    Writable param;                        // parameter,创建Call对象时赋值
    Writable value;                          // value, Connecion线程接受到server的response后赋值给value
    boolean done;                           // true when call is done
    protected Call(Writable param) {
    this.param = param;
    synchronized (Client.this) {
    this.id = counter++;
    }
    }
    }
  • ParallelCall:继承Call,还是一个Call对象,只是这些Call对象共享一个ParallelResults。
            private class ParallelCall extends Call {
    private ParallelResults results; //多个Call共享一个ParallelResults
    private int index;
    public ParallelCall(Writable param, ParallelResults results, int index) {
    super(param);
    this.results = results;
    this.index = index;
    }
    /** Deliver result to result collector. */
    protected void callComplete() {
    results.callComplete(this);
    }
    }
     
  • ParallelResults:一组Call对象的返回结果。
    private static class ParallelResults {
    private Writable[] values;
    private int size;   //一共有多少个Call要返回
    private int count;  //实际已经返回几个
    public ParallelResults(int size) {
    this.values = new Writable[size];
    this.size = size;
    }
    /** Collect a result. */
    public synchronized void callComplete(ParallelCall call) {
    values[call.index] = call.value;            // store the value
    count++;                                    // count it
    if (count == size)                          // if all values are in
    notify();                                 // then notify waiting caller
    }

  • Client:IPC client端。调用client的call方法,传入Writable作为参数,返回一个Writable作为结果。
    /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
    * parameter, and return a {@link Writable} as their value.  A service runs on
    * a port and is defined by a parameter class and a value class.
    *
    * @see Server
    */
    public class Client {
    //缓存client到server的所有连接
    private Hashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId, Connection>();
    private Class<? extends Writable> valueClass;   //Call对象value的类型
    private int counter;                            //创建一个Call的时候,用counter++作为Call的id
    final private Configuration conf;
    private SocketFactory socketFactory;           //服务器端的ip+port创建的socketFactory
    //1.初始化Client
    public Client(Class<? extends Writable> valueClass, Configuration conf,
    SocketFactory factory) {
    this.valueClass = valueClass;
    this.conf = conf;
    this.socketFactory = factory; //初始化client时传入
    }
    //2.用client发送一个请求
    public Writable call(Writable param, ConnectionId remoteId)  
    throws InterruptedException, IOException {
    //创建Call对象
    Call call = new Call(param);
    //创建Connection对象
    Connection connection = getConnection(remoteId, call);
    //发送请求,参考Connection类代码7
    connection.sendParam(call);                 
    ...
    return call.value;
    }
    //2.用client一次发送多个请求
    public Writable[] call(Writable[] params, InetSocketAddress[] addresses,
    Class<?> protocol, UserGroupInformation ticket, Configuration conf)
    throws IOException, InterruptedException {
    //创建一个结果集
    ParallelResults results = new ParallelResults(params.length);
    synchronized (results) {
    for (int i = 0; i < params.length; i++) {
    //创建每个Call
    ParallelCall call = new ParallelCall(params, results, i);
    try {
    //创建每个Connection对象,不同的Call存放到不同的连接上(Each parameter is sent to the corresponding address.)。
    ConnectionId remoteId = ConnectionId.getConnectionId(addresses,
    protocol, ticket, 0, conf);
    Connection connection = getConnection(remoteId, call);
    //发送请求,参考Connection类代码7
    connection.sendParam(call);            
    } catch (IOException e) {
    }
    }
    while (results.count != results.size) {
    try {
    results.wait();                    // wait for all results
    } catch (InterruptedException e) {}
    }
    //放回所有结果
    return results.values;
    }
    }
    /** Get a connection from the pool, or create a new one and add it to the
    * pool.  Connections to a given ConnectionId are reused. */
    //获得一个连接,首先从缓存中去;取不到,创建一个,并放到缓存中。
    private Connection getConnection(ConnectionId remoteId, Call call) throws IOException, InterruptedException {
    Connection connection;
    do {
    synchronized (connections) {
    connection = connections.get(remoteId);
    if (connection == null) {
    //参考Connection类代码1
    connection = new Connection(remoteId);
    connections.put(remoteId, connection);
    }
    }
    } while (!connection.addCall(call));//往创建的连接里加入call
    //参考Connection类代码23456
    connection.setupIOstreams();
    return connection;
    }
    }
     

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-311831-1-1.html 上篇帖子: Hadoop学习笔记——Hadoop 读写文件过程剖析 下篇帖子: Hadoop学习九:Hadoop-hdfs RPC源码 Server
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表