设为首页 收藏本站
查看: 1062|回复: 0

[经验分享] 2018-07-23期 Hadoop RPC模拟NameNode

[复制链接]

尚未签到

发表于 2018-10-28 11:22:31 | 显示全部楼层 |阅读模式
  一、服务端代码实现
  1、定义接口
  package cn.sjq.rpc.java;
  import org.apache.hadoop.ipc.VersionedProtocol;
  /**
  * 定义接口IMyNameNode并继承org.apache.hadoop.ipc.VersionedProtocol接口
  * 本接口主要模拟定义自定义的Hadoop RPC通信,模拟Namenode节点在HDFS创建元数据(创建目录)、浏览元数据(浏览目录、文件)
  * @author songjq
  *
  */
  public interface IMyNameNode extends VersionedProtocol {
  /*
  * 定义ID号 定义一个签名,通过这个ID,就能区分在客户端调用的时候,具体调用哪个实现 要求:名称必须叫versionID
  */
  public static long versionID = 1l;
  /*
  * 创建目录
  */
  public String createForder(String dir) throws Exception;
  /*
  * 浏览目录,包括子目录
  */
  public String listForder(String dir)  throws Exception;
  }
  2、实现接口
  package cn.sjq.rpc.java;
  import java.io.IOException;
  import java.net.URI;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.LocatedFileStatus;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.fs.RemoteIterator;
  import org.apache.hadoop.fs.permission.FsAction;
  import org.apache.hadoop.fs.permission.FsPermission;
  import org.apache.hadoop.ipc.ProtocolSignature;
  /**
  * IMyNameNode的实现类
  * 主要对IMyNameNode定义的方法进行实现
  * @author songjq
  *
  */

  public>  /*
  * 通过IMyNameNode.versionID构造一个签名
  * (non-Javadoc)
  * @see org.apache.hadoop.ipc.VersionedProtocol#getProtocolSignature(java.lang.String, long, int)
  */
  @Override
  public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) throws IOException {
  return new ProtocolSignature(IMyNameNode.versionID, null);
  }
  /* 直接返回IMyNameNode.versionID
  * (non-Javadoc)
  * @see org.apache.hadoop.ipc.VersionedProtocol#getProtocolVersion(java.lang.String, long)
  */
  @Override
  public long getProtocolVersion(String arg0, long arg1) throws IOException {
  return IMyNameNode.versionID;
  }
  /*
  * 在HDFS上创建目录
  * (non-Javadoc)
  * @see cn.sjq.rpc.java.IMyNameNode#createForder(java.lang.String)
  */
  @Override
  public String createForder(String dir) throws Exception{
  //获得HDFS客户端连接
  Configuration conf = new Configuration();
  conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
  FileSystem client = FileSystem.get(new URI("hdfs://hadoop-server01:9000"), conf, "root");
  //创建目录
  boolean mkdirs = client.mkdirs(new Path(dir));
  client.close();
  //返回创建结果
  return "Direcotory ->\t\t"+dir+"\t\t"+(mkdirs?"successfull created!":"created failed!");
  }
  /*
  * 在HDFS浏览目录
  * (non-Javadoc)
  * @see cn.sjq.rpc.java.IMyNameNode#listForder(java.lang.String)
  */
  @Override
  public String listForder(String dir) throws Exception {
  // 获得HDFS客户端连接
  Configuration conf = new Configuration();
  conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
  FileSystem client = FileSystem.get(new URI("hdfs://hadoop-server01:9000"), conf, "root");
  RemoteIterator listFiles = client.listFiles(new Path(dir), true);
  // 定义一个stringbuffer对象接收处理结果
  StringBuffer filebuffer = new StringBuffer("Onwer \t\t UserPri \t\t BlockSize \t\t Path \t\t\n");
  // 迭代listFiles
  while (listFiles.hasNext()) {
  LocatedFileStatus file = listFiles.next();
  String fname = file.getPath().getName();
  String path = file.getPath().toString();
  String owner = file.getOwner();
  long blockSize = file.getBlockSize();
  FsPermission permission = file.getPermission();
  FsAction userAction = permission.getUserAction();
  FsAction groupAction = permission.getGroupAction();
  FsAction otherAction = permission.getOtherAction();
  userAction.toString();
  filebuffer.append(owner).append(" \t\t").append(" ").append(userAction.toString()).append(" \t\t ")
  .append(blockSize).append(" \t\t ").append(path).append(" \t\t").append("\n");
  }
  client.close();
  return filebuffer.toString();
  }
  }
  3、构建RPC通信服务
  package cn.sjq.rpc.java;
  import java.io.IOException;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.ipc.RPC;
  import org.apache.hadoop.ipc.RPC.Builder;
  import org.apache.hadoop.ipc.RPC.Server;
  /**
  * 构造RPC通信程序,并将业务类注册到RPC通信服务中
  * @author songjq
  *
  */

  public>  public static void main(String[] args) throws Exception, IOException {
  //创建hadoop RPC通信builder
  Builder builder = new RPC.Builder(new Configuration());
  //设置RPC通信地址
  builder.setBindAddress("hadoop-server01");
  //设置RPC通信端口
  builder.setPort(9090);
  //将程序IMyNameNode部署到RPC server上
  builder.setProtocol(IMyNameNode.class);
  //将IMyNameNode接口实现也部署到RPC server上
  builder.setInstance(new MyNameNodeImpl());
  //构建一个RPC server
  Server server = builder.build();
  //启动RPC通信服务
  server.start();
  System.out.println("******* RPC Server has been started...  *********");
  }
  }
  二、客户端代码实现
  1、定义接口
  package cn.sjq.rpc.java;
  import org.apache.hadoop.ipc.VersionedProtocol;
  /**
  * RPC客户端定义接口,该接口需要和服务端定义的IMyNameNode完全一致
  */
  public interface IMyNameNode extends VersionedProtocol {
  /*
  * 定义ID号 定义一个签名,通过这个ID,就能区分在客户端调用的时候,具体调用哪个实现 要求:名称必须叫versionID
  */
  public static long versionID = 1l;
  /*
  * 创建目录
  */
  public String createForder(String dir) throws Exception;
  /*
  * 浏览目录,包括子目录
  */
  public String listForder(String dir)  throws Exception;
  }
  2、客户端代理调用
  package cn.sjq.rpc.java;
  import java.io.IOException;
  import java.net.InetSocketAddress;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.ipc.RPC;
  /**
  * RPC客户端调用,这个使用的RPC的动态代理实现对RPC服务端相关方法的访问
  *
  * @author songjq
  *
  */

  public>  /**
  * 通过RPC调用Server端的功能,拿到是一个代理对象 protocol 服务端部署的接口 clientVersion 服务端部署的版本号ID addr
  * 服务端RPC监听通信地址及端口 conf Hdfs的一个configuration实例对象
  *
  * @param args
  * @throws Exception
  */
  public static void main(String[] args) throws Exception {
  IMyNameNode proxy = RPC.getProxy(IMyNameNode.class,
  1l,
  new InetSocketAddress("hadoop-server01", 9090),
  new Configuration());
  String createForder = proxy.createForder("/rpc/20180720");
  System.out.println("****************************创建目录**********************************");
  System.out.println(createForder);
  String listForder = proxy.listForder("/user");
  System.out.println("****************************浏览目录**********************************");
  System.out.println(listForder);
  }
  }


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-627447-1-1.html 上篇帖子: 2018-07-21期 Hadoop Yarm体系结构剖析 下篇帖子: unable kill namenode hadoop3.0.3 解决到放弃解决的过程
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表