设为首页 收藏本站
查看: 1125|回复: 0

[经验分享] 分别使用Hadoop和Spark实现二次排序

[复制链接]

尚未签到

发表于 2017-12-17 07:25:43 | 显示全部楼层 |阅读模式
零、序(注意本部分与标题无太大关系,可直接翻到第一部分)
  既然没用为啥会有序?原因不想再开一篇文章,来抒发点什么感想或者计划了,就在这里写点好了:
  前些日子买了几本书,打算学习和研究大数据方面的知识,一直因为实习、考试、毕业设计等问题搞得没有时间,现在进入了寒假,可以安心的学点有用的知识了。
  这篇博客里的算法部分的内容来自《数据算法:Hadoop/Spark大数据处理技巧》一书,不过书中的代码虽然思路正确,但是代码不完整,并且只有java部分的编程,我在它的基础上又加入scala部分,当然是在使用Spark的时候写的scala。
  废话不多说,进入正题。

一、输入、期望输出、思路。
  输入为SecondarySort.txt,内容为:
  

2000,12,04,10  
2000,11,01,20
  
2000,12,02,-20
  
2000,11,07,30
  
2000,11,24,-40
  
2012,12,21,30
  
2012,12,22,-20
  
2012,12,23,60
  
2012,12,24,70
  
2012,12,25,10
  
2013,01,23,90
  
2013,01,24,70
  
2013,01,20,-10
  

  意义为:
  年,月,日,温度
  期望输出:
  

2013-01 90,70,-10  
2012-12 70,60,30,10,-20
  
2000-12 10,-20
  
2000-11 30,20,-40
  

  意义为:
  年-月 温度1,温度2,温度3,……
  年-月从上之下降序排列,
  温度从左到右降序排列
  思路:
  抛弃不需要的代表日的哪一行数据
  将年月作为组合键(key),比较大小,降序排列
  将对应年月(key)的温度的值(value)进行降序排列和拼接

二、使用Java编写MapReduce程序实现二次排序
  代码要实现的类有:
DSC0000.png

  除了常见的SecondarySortingMapper,SecondarySortingReducer,和SecondarySortDriver以外
  这里还多出了两个个插件类(DateTemperatureGroupingComparator和DateTemperaturePartioner)和一个自定义类型(DateTemperaturePair)
  以下是实现的代码(注意以下每个文件的代码段我去掉了包名,所以要使用的话自己加上吧):
  SecondarySortDriver.java
  

import org.apache.hadoop.conf.Configuration;  

import org.apache.hadoop.conf.Configured;  

import org.apache.hadoop.fs.Path;  

import org.apache.hadoop.io.IntWritable;  

import org.apache.hadoop.mapreduce.Job;  

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  

import org.apache.hadoop.util.Tool;  

import org.apache.hadoop.util.ToolRunner;  

  

public>public int run(String[] args) throws Exception {  Configuration configuration
= getConf();  Job job
= Job.getInstance(configuration, "SecondarySort");  job.setJarByClass(SecondarySortDriver.
class);  job.setJobName(
"SecondarySort");  

  Path inputPath
= new Path(args[0]);  Path outputPath
= new Path(args[1]);  FileInputFormat.setInputPaths(job, inputPath);
  FileOutputFormat.setOutputPath(job, outputPath);
  

// 设置map输出key value格式  job.setMapOutputKeyClass(DateTemperaturePair.class);
  job.setMapOutputValueClass(IntWritable.class);
  // 设置reduce输出key value格式
  job.setOutputKeyClass(DateTemperaturePair.class);
  job.setOutputValueClass(IntWritable.class);
  

  job.setMapperClass(SecondarySortingMapper.class);
  job.setReducerClass(SecondarySortingReducer.class);
  job.setPartitionerClass(DateTemperaturePartitioner.class);
  job.setGroupingComparatorClass(DateTemperatureGroupingComparator.class);
  

  boolean status = job.waitForCompletion(true);
  return status ? 0 : 1;
  }
  

  public static void main(String[] args) throws Exception {
  if (args.length != 2) {
  throw new IllegalArgumentException(
  "!!!!!!!!!!!!!! Usage!!!!!!!!!!!!!!: SecondarySortDriver"
  + "<input-path> <output-path>");
  }
  int returnStatus = ToolRunner.run(new SecondarySortDriver(), args);
  System.exit(returnStatus);
  }
  
}
  

  DateTemperaturePair.java
  

import org.apache.hadoop.io.Text;  

import org.apache.hadoop.io.Writable;  

import org.apache.hadoop.io.WritableComparable;  

  

import java.io.DataInput;  

import java.io.DataOutput;  

import java.io.IOException;  

  

public>WritableComparable<DateTemperaturePair> {private String yearMonth;private String day;protected Integer temperature;  

public int compareTo(DateTemperaturePair o) {int compareValue = this.yearMonth.compareTo(o.getYearMonth());if (compareValue == 0) {  compareValue
= temperature.compareTo(o.getTemperature());  }
return -1 * compareValue;  }
  

public void write(DataOutput dataOutput) throws IOException {  Text.writeString(dataOutput, yearMonth);
  dataOutput.writeInt(temperature);
  

  }
  

public void readFields(DataInput dataInput) throws IOException {this.yearMonth = Text.readString(dataInput);this.temperature = dataInput.readInt();  

  }
  

  @Override
public String toString() {return yearMonth.toString();  }
  

public String getYearMonth() {return yearMonth;  }
  

public void setYearMonth(String text) {this.yearMonth = text;  }
  

public String getDay() {return day;  }
  

public void setDay(String day) {this.day = day;  }
  

public Integer getTemperature() {return temperature;  }
  

public void setTemperature(Integer temperature) {this.temperature = temperature;  }
  
}
  

  SecondarySortingMapper.java
  

import org.apache.hadoop.io.IntWritable;  

import org.apache.hadoop.io.LongWritable;  

import org.apache.hadoop.io.Text;  

import org.apache.hadoop.mapreduce.Mapper;  

  

import java.io.IOException;  

  

public>Mapper<LongWritable, Text, DateTemperaturePair, IntWritable> {  @Override
protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {  String[] tokens
= value.toString().split(",");// YYYY = tokens[0]// MM = tokens[1]// DD = tokens[2]// temperature = tokens[3]  String yearMonth = tokens[0] + "-" + tokens[1];
  String day = tokens[2];
  int temperature = Integer.parseInt(tokens[3]);
  

  DateTemperaturePair reduceKey = new DateTemperaturePair();
  reduceKey.setYearMonth(yearMonth);
  reduceKey.setDay(day);
  reduceKey.setTemperature(temperature);
  context.write(reduceKey, new IntWritable(temperature));
  }
  
}
  

  DateTemperaturePartioner.java
  

import org.apache.hadoop.io.Text;  

import org.apache.hadoop.mapreduce.Partitioner;  

  

public>Partitioner<DateTemperaturePair, Text> {  @Override
public int getPartition(DateTemperaturePair dataTemperaturePair, Text text,int i) {return Math.abs(dataTemperaturePair.getYearMonth().hashCode() % i);  }
  
}
  

  DateTemperatureGroupingComparator.java
  

import org.apache.hadoop.io.WritableComparable;  

import org.apache.hadoop.io.WritableComparator;  

  

public>
public DateTemperatureGroupingComparator() {super(DateTemperaturePair.class, true);  }
  

  @Override
public int compare(WritableComparable a, WritableComparable b) {  DateTemperaturePair pair1
= (DateTemperaturePair) a;  DateTemperaturePair pair2
= (DateTemperaturePair) b;return pair1.getYearMonth().compareTo(pair2.getYearMonth());  }
  
}
  

  SecondarySortingReducer.java
  

import org.apache.hadoop.io.IntWritable;  

import org.apache.hadoop.io.Text;  

import org.apache.hadoop.mapreduce.Reducer;  

  

import java.io.IOException;  

  

public>Reducer<DateTemperaturePair, IntWritable, DateTemperaturePair, Text> {  

  @Override
protected void reduce(DateTemperaturePair key,  Iterable
<IntWritable> values, Context context) throws IOException,  InterruptedException {
  StringBuilder sortedTemperatureList
= new StringBuilder();for (IntWritable temperature : values) {  sortedTemperatureList.append(temperature);
  sortedTemperatureList.append(
",");  }
  sortedTemperatureList.deleteCharAt(sortedTemperatureList.length()
-1);  context.write(key,
new Text(sortedTemperatureList.toString()));  }
  

  
}
  

  三、使用scala编写Spark程序实现二次排序
  这个代码想必就比较简洁了。如下:
  SecondarySort.scala
  

package spark  

import org.apache.spark.{SparkContext, SparkConf}  

import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions  

import org.apache.spark.rdd.RDD.rddToPairRDDFunctions  

  
object SecondarySort {
  def main(args: Array[String]) {
  val conf
= new SparkConf().setAppName(" Secondary Sort ")  .setMaster(
"local")  var sc = new SparkContext(conf)
  sc.setLogLevel("Warn")
  //val file = sc.textFile("hdfs://localhost:9000/Spark/SecondarySort/Input/SecondarySort2.txt")
  val file = sc.textFile("e:\\SecondarySort.txt")
  val rdd = file.map(line => line.split(","))
  .map(x=>((x(0),x(1)),x(3))).groupByKey().sortByKey(false)
  .map(x => (x._1._1+"-"+x._1._2,x._2.toList.sortWith(_>_)))
  rdd.foreach(
  x=>{
  val buf = new StringBuilder()
  for(a <- x._2){
  buf.append(a)
  buf.append(",")
  }
  buf.deleteCharAt(buf.length()-1)
  println(x._1+" "+buf.toString())
  })
  sc.stop()
  }
  
}
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-424910-1-1.html 上篇帖子: 大数据平台搭建(hadoop+spark) 下篇帖子: Hadoop运行单词统计
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表