设为首页 收藏本站
查看: 1303|回复: 1

[经验分享] Hadoop MapReduce编程 API入门系列之FOF(Fund of Fund)(二十三)

[复制链接]

尚未签到

发表于 2017-12-18 12:40:40 | 显示全部楼层 |阅读模式
  不多说,直接上代码。
DSC0000.png

DSC0001.png

DSC0002.png

DSC0003.png

DSC0004.png

DSC0005.png

DSC0006.png

DSC0007.png

DSC0008.png

DSC0009.png

  代码
  package zhouls.bigdata.myMapReduce.friend;
  import org.apache.hadoop.io.Text;

  public>  public Fof(){//无参构造
  super();
  }
  public Fof(String a,String b){//有参构造
  super(getFof(a, b));
  }
  public static String getFof(String a,String b){
  int r =a.compareTo(b);
  if(r<0){
  return a+"\t"+b;
  }else{
  return b+"\t"+a;
  }
  }
  }
  package zhouls.bigdata.myMapReduce.friend;
  import java.io.DataInput;
  import java.io.DataOutput;
  import java.io.IOException;
  import org.apache.hadoop.io.WritableComparable;

  public>  //WritableComparable,实现这个方法,要多很多
  //readFields是读入,write是写出
  private String uname;
  private int friendsCount;
  public String getUname() {
  return uname;
  }
  public void setUname(String uname) {
  this.uname = uname;
  }
  public int getFriendsCount() {
  return friendsCount;
  }
  public void setFriendsCount(int friendsCount) {
  this.friendsCount = friendsCount;
  }//这一大段的get和set,可以右键,source,产生get和set,自动生成。
  public User() {//无参构造
  }
  public User(String uname,int friendsCount){//有参构造
  this.uname=uname;
  this.friendsCount=friendsCount;
  }
  public void write(DataOutput out) throws IOException { //序列化
  out.writeUTF(uname);
  out.writeInt(friendsCount);
  }
  public void readFields(DataInput in) throws IOException {//反序列化
  this.uname=in.readUTF();
  this.friendsCount=in.readInt();
  }
  public int compareTo(User o) {//核心
  int result = this.uname.compareTo(o.getUname());
  if(result==0){
  return Integer.compare(this.friendsCount, o.getFriendsCount());
  }
  return result;
  }
  }
  package zhouls.bigdata.myMapReduce.friend;
  import org.apache.hadoop.io.WritableComparable;
  import org.apache.hadoop.io.WritableComparator;

  public>  public FoFSort() {//把自定义的User,传进了
  super(User.class,true);
  }
  public int compare(WritableComparable a, WritableComparable b) {//排序核心
  User u1 =(User) a;
  User u2=(User) b;
  int result =u1.getUname().compareTo(u2.getUname());
  if(result==0){
  return -Integer.compare(u1.getFriendsCount(), u2.getFriendsCount());
  }
  return result;
  }
  }
  package zhouls.bigdata.myMapReduce.friend;
  import org.apache.hadoop.io.WritableComparable;
  import org.apache.hadoop.io.WritableComparator;

  public>  public FoFGroup() {//把自定义的User,传进了
  super(User.class,true);
  }
  public int compare(WritableComparable a, WritableComparable b) {//分组核心
  User u1 =(User) a;
  User u2=(User) b;
  return u1.getUname().compareTo(u2.getUname());
  }
  }
  package zhouls.bigdata.myMapReduce.friend;
  import java.io.IOException;
  import java.text.SimpleDateFormat;
  import java.util.Calendar;
  import java.util.Date;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.DoubleWritable;
  import org.apache.hadoop.io.IntWritable;
  import org.apache.hadoop.io.NullWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Job;
  import org.apache.hadoop.mapreduce.Mapper;
  import org.apache.hadoop.mapreduce.Reducer;
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  import org.apache.hadoop.util.StringUtils;

  public>  //小明老王如花林志玲
  //老王小明凤姐排序在FoFSort.java
  //如花小明李刚凤姐
  //林志玲小明李刚凤姐郭美美            分组在FoFGroup.java
  //李刚如花凤姐林志玲
  //郭美美凤姐林志玲
  //凤姐如花老王林志玲郭美美
  public static void main(String[] args) {
  Configuration config =new Configuration();
  //config.set("fs.defaultFS", "hdfs://HadoopMaster:9000");
  //config.set("yarn.resourcemanager.hostname", "HadoopMaster");
  //config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");
  //config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");//默认分隔符是制表符"\t",这里自定义,如","
  if(run1(config)){
  run2(config);//设置两个run,即两个mr。
  }
  }
  public static void run2(Configuration config) {
  try {
  FileSystem fs =FileSystem.get(config);
  Job job =Job.getInstance(config);
  job.setJarByClass(RunJob.class);
  job.setJobName("fof2");
  job.setMapperClass(SortMapper.class);
  job.setReducerClass(SortReducer.class);
  job.setSortComparatorClass(FoFSort.class);
  job.setGroupingComparatorClass(FoFGroup.class);
  job.setMapOutputKeyClass(User.class);
  job.setMapOutputValueClass(User.class);
  job.setInputFormatClass(KeyValueTextInputFormat.class);
  ////设置MR执行的输入文件
  //FileInputFormat.addInputPath(job, new Path("hdfs://HadoopMaster:9000/f1"));
  //
  ////该目录表示MR执行之后的结果数据所在目录,必须不能存在
  //Path outputPath=new Path("hdfs://HadoopMaster:9000/out/f2");
  //设置MR执行的输入文件
  FileInputFormat.addInputPath(job, new Path("./out/f1"));
  //该目录表示MR执行之后的结果数据所在目录,必须不能存在
  Path outputPath=new Path("./out/f2");
  if(fs.exists(outputPath)){
  fs.delete(outputPath, true);
  }
  FileOutputFormat.setOutputPath(job, outputPath);
  boolean f =job.waitForCompletion(true);
  if(f){
  System.out.println("job 成功执行");
  }
  } catch (Exception e) {
  e.printStackTrace();
  }
  }
  public static boolean run1(Configuration config) {
  try {
  FileSystem fs =FileSystem.get(config);
  Job job =Job.getInstance(config);
  job.setJarByClass(RunJob.class);
  job.setJobName("friend");
  job.setMapperClass(FofMapper.class);
  job.setReducerClass(FofReducer.class);
  job.setMapOutputKeyClass(Fof.class);
  job.setMapOutputValueClass(IntWritable.class);
  job.setInputFormatClass(KeyValueTextInputFormat.class);
  //FileInputFormat.addInputPath(job, new Path("hdfs://HadoopMaster:9000/friend/friend.txt"));//下有friend.txt
  //
  //Path outpath =new Path("hdfs://HadoopMaster:9000/out/f1");
  FileInputFormat.addInputPath(job, new Path("./data/friend/friend.txt"));//下有friend.txt
  Path outpath =new Path("./out/f1");
  if(fs.exists(outpath)){
  fs.delete(outpath, true);
  }
  FileOutputFormat.setOutputPath(job, outpath);
  boolean f= job.waitForCompletion(true);
  return f;
  } catch (Exception e) {
  e.printStackTrace();
  }
  return false;
  }

  static>  protected void map(Text key, Text value,
  Context context)
  throws IOException, InterruptedException {
  String user =key.toString();
  String[] friends =StringUtils.split(value.toString(), '\t');
  for (int i = 0; i < friends.length; i++) {
  String f1 = friends;
  Fof ofof =new Fof(user, f1);
  context.write(ofof, new IntWritable(0));
  for (int j = i+1; j < friends.length; j++) {
  String f2 = friends[j];
  Fof fof =new Fof(f1, f2);
  context.write(fof, new IntWritable(1));
  }
  }
  }
  }

  static>  protected void reduce(Fof arg0, Iterable<IntWritable> arg1,
  Context arg2)
  throws IOException, InterruptedException {
  int sum =0;
  boolean f =true;
  for(IntWritable i: arg1){
  if(i.get()==0){
  f=false;
  break;
  }else{
  sum=sum+i.get();
  }
  }
  if(f){
  arg2.write(arg0, new IntWritable(sum));
  }
  }
  }

  static>  protected void map(Text key, Text value,
  Context context)
  throws IOException, InterruptedException {
  String[] args=StringUtils.split(value.toString(),'\t');
  String other=args[0];
  int friendsCount =Integer.parseInt(args[1]);
  context.write(new User(key.toString(),friendsCount), new User(other,friendsCount));
  context.write(new User(other,friendsCount), new User(key.toString(),friendsCount));
  }
  }

  static>  protected void reduce(User arg0, Iterable<User> arg1,
  Context arg2)
  throws IOException, InterruptedException {
  String user =arg0.getUname();
  StringBuffer sb =new StringBuffer();
  for(User u: arg1 ){
  sb.append(u.getUname()+":"+u.getFriendsCount());
  sb.append(",");
  }
  arg2.write(new Text(user), new Text(sb.toString()));
  }
  }

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-425369-1-1.html 上篇帖子: 基于Maven引入Hadoop包报Missing artifact jdk.tools:jdk.tools:jar:1.6 下篇帖子: 第三篇:配置Hadoop的Eclipse开发环境
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表