设为首页 收藏本站
查看: 1710|回复: 0

[经验分享] 二次排序问题(分别使用Hadoop和Spark实现)

[复制链接]

尚未签到

发表于 2017-12-17 23:36:19 | 显示全部楼层 |阅读模式
  不多说,直接上干货!
  这篇博客里的算法部分的内容来自《数据算法:Hadoop/Spark大数据处理技巧》一书,不过书中的代码虽然思路正确,但是代码不完整,并且只有java部分的编程,我在它的基础上又加入scala部分,当然是在使用Spark的时候写的scala。

一、输入、期望输出、思路。
  输入为SecondarySort.txt,内容为:
  

2000,12,04,10  
2000,11,01,20
  
2000,12,02,-20
  
2000,11,07,30
  
2000,11,24,-40
  
2012,12,21,30
  
2012,12,22,-20
  
2012,12,23,60
  
2012,12,24,70
  
2012,12,25,10
  
2013,01,23,90
  
2013,01,24,70
  
2013,01,20,-10
  

  意义为:年,月,日,温度
  期望输出:
  

2013-01 90,70,-10  
2012-12 70,60,30,10,-20
  
2000-12 10,-20
  
2000-11 30,20,-40
  

  意义为:
  年-月 温度1,温度2,温度3,……
  年-月从上之下降序排列,
  温度从左到右降序排列
  思路:
  抛弃不需要的代表日的哪一行数据
  将年月作为组合键(key),比较大小,降序排列
  将对应年月(key)的温度的值(value)进行降序排列和拼接

二、使用Java编写MapReduce程序实现二次排序
  代码要实现的类有:
DSC0000.png

  除了常见的SecondarySortingMapper,SecondarySortingReducer,和SecondarySortDriver以外
  这里还多出了两个个插件类(DateTemperatureGroupingComparator和DateTemperaturePartioner)和一个自定义类型(DateTemperaturePair)
  以下是实现的代码(注意以下每个文件的代码段我去掉了包名,所以要使用的话自己加上吧):
  SecondarySortDriver.java
  

import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.conf.Configured;
  
import org.apache.hadoop.fs.Path;
  
import org.apache.hadoop.io.IntWritable;
  
import org.apache.hadoop.mapreduce.Job;
  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
import org.apache.hadoop.util.Tool;
  
import org.apache.hadoop.util.ToolRunner;
  

  

public>
public int run(String[] args) throws Exception {  
Configuration configuration
= getConf();  
Job job
= Job.getInstance(configuration, "SecondarySort");  
job.setJarByClass(SecondarySortDriver.
class);  
job.setJobName(
"SecondarySort");  

  
Path inputPath
= new Path(args[0]);  
Path outputPath
= new Path(args[1]);  
FileInputFormat.setInputPaths(job, inputPath);
  
FileOutputFormat.setOutputPath(job, outputPath);
  

  

// 设置map输出key value格式  
job.setMapOutputKeyClass(DateTemperaturePair.class);
  
job.setMapOutputValueClass(IntWritable.class);
  
// 设置reduce输出key value格式
  
job.setOutputKeyClass(DateTemperaturePair.class);
  
job.setOutputValueClass(IntWritable.class);
  

  
job.setMapperClass(SecondarySortingMapper.class);
  
job.setReducerClass(SecondarySortingReducer.class);
  
job.setPartitionerClass(DateTemperaturePartitioner.class);
  
job.setGroupingComparatorClass(DateTemperatureGroupingComparator.class);
  

  
boolean status = job.waitForCompletion(true);
  
return status ? 0 : 1;
  
}
  

  
public static void main(String[] args) throws Exception {
  
if (args.length != 2) {
  
throw new IllegalArgumentException(
  
"!!!!!!!!!!!!!! Usage!!!!!!!!!!!!!!: SecondarySortDriver"
  
+ "<input-path> <output-path>");
  
}
  
int returnStatus = ToolRunner.run(new SecondarySortDriver(), args);
  
System.exit(returnStatus);
  
}
  
}
  

  DateTemperaturePair.java
  

import org.apache.hadoop.io.Text;  
import org.apache.hadoop.io.Writable;
  
import org.apache.hadoop.io.WritableComparable;
  

  
import java.io.DataInput;
  
import java.io.DataOutput;
  
import java.io.IOException;
  

  

public>
WritableComparable<DateTemperaturePair> {  

private String yearMonth;  

private String day;  

protected Integer temperature;  

  

public int compareTo(DateTemperaturePair o) {  

int compareValue = this.yearMonth.compareTo(o.getYearMonth());  

if (compareValue == 0) {  
compareValue
= temperature.compareTo(o.getTemperature());  
}
  

return -1 * compareValue;  
}
  

  

public void write(DataOutput dataOutput) throws IOException {  
Text.writeString(dataOutput, yearMonth);
  
dataOutput.writeInt(temperature);
  

  
}
  

  

public void readFields(DataInput dataInput) throws IOException {  

this.yearMonth = Text.readString(dataInput);  

this.temperature = dataInput.readInt();  

  
}
  

  
@Override
  

public String toString() {  

return yearMonth.toString();  
}
  

  

public String getYearMonth() {  

return yearMonth;  
}
  

  

public void setYearMonth(String text) {  

this.yearMonth = text;  
}
  

  

public String getDay() {  

return day;  
}
  

  

public void setDay(String day) {  

this.day = day;  
}
  

  

public Integer getTemperature() {  

return temperature;  
}
  

  

public void setTemperature(Integer temperature) {  

this.temperature = temperature;  
}
  
}
  

  SecondarySortingMapper.java
  

import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.LongWritable;
  
import org.apache.hadoop.io.Text;
  
import org.apache.hadoop.mapreduce.Mapper;
  

  
import java.io.IOException;
  

  

public>
Mapper<LongWritable, Text, DateTemperaturePair, IntWritable> {  
@Override
  

protected void map(LongWritable key, Text value, Context context)  
throws IOException, InterruptedException {
  
String[] tokens
= value.toString().split(",");  

// YYYY = tokens[0]  

// MM = tokens[1]  

// DD = tokens[2]  

// temperature = tokens[3]  
String yearMonth = tokens[0] + "-" + tokens[1];
  
String day = tokens[2];
  
int temperature = Integer.parseInt(tokens[3]);
  

  
DateTemperaturePair reduceKey = new DateTemperaturePair();
  
reduceKey.setYearMonth(yearMonth);
  
reduceKey.setDay(day);
  
reduceKey.setTemperature(temperature);
  
context.write(reduceKey, new IntWritable(temperature));
  
}
  
}
  

  DateTemperaturePartioner.java
  

import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Partitioner;
  

  

public>
Partitioner<DateTemperaturePair, Text> {  
@Override
  

public int getPartition(DateTemperaturePair dataTemperaturePair, Text text,  

int i) {  

return Math.abs(dataTemperaturePair.getYearMonth().hashCode() % i);  
}
  
}
  

  DateTemperatureGroupingComparator.java
  

import org.apache.hadoop.io.WritableComparable;  
import org.apache.hadoop.io.WritableComparator;
  

  

public>
  

public DateTemperatureGroupingComparator() {  
super(DateTemperaturePair.
class, true);  
}
  

  
@Override
  

public int compare(WritableComparable a, WritableComparable b) {  
DateTemperaturePair pair1
= (DateTemperaturePair) a;  
DateTemperaturePair pair2
= (DateTemperaturePair) b;  

return pair1.getYearMonth().compareTo(pair2.getYearMonth());  
}
  
}
  

  SecondarySortingReducer.java
  

import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;
  
import org.apache.hadoop.mapreduce.Reducer;
  

  
import java.io.IOException;
  

  

public>
Reducer<DateTemperaturePair, IntWritable, DateTemperaturePair, Text> {  

  
@Override
  

protected void reduce(DateTemperaturePair key,  
Iterable
<IntWritable> values, Context context) throws IOException,  
InterruptedException {
  
StringBuilder sortedTemperatureList
= new StringBuilder();  

for (IntWritable temperature : values) {  
sortedTemperatureList.append(temperature);
  
sortedTemperatureList.append(
",");  
}
  
sortedTemperatureList.deleteCharAt(sortedTemperatureList.length()
-1);  
context.write(key,
new Text(sortedTemperatureList.toString()));  
}
  

  
}
  

  三、使用scala编写Spark程序实现二次排序
  这个代码想必就比较简洁了。如下:
  SecondarySort.scala
  

package spark  
import org.apache.spark.{SparkContext, SparkConf}
  
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions
  
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
  

  

object SecondarySort {  
def main(args: Array[String]) {
  
val conf
= new SparkConf().setAppName(" Secondary Sort ")  
.setMaster(
"local")  

var sc = new SparkContext(conf)  
sc.setLogLevel(
"Warn")  

//val file = sc.textFile("hdfs://localhost:9000/Spark/SecondarySort/Input/SecondarySort2.txt")  
val file = sc.textFile("e:\\SecondarySort.txt")
  
val rdd = file.map(line => line.split(","))
  
.map(x=>((x(0),x(1)),x(3))).groupByKey().sortByKey(false)
  
.map(x => (x._1._1+"-"+x._1._2,x._2.toList.sortWith(_>_)))
  
rdd.foreach(
  
x=>{
  
val buf = new StringBuilder()
  
for(a <- x._2){
  
buf.append(a)
  
buf.append(",")
  
}
  
buf.deleteCharAt(buf.length()-1)
  
println(x._1+" "+buf.toString())
  
})
  
sc.stop()
  
}
  
}
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-425200-1-1.html 上篇帖子: Hadoop中ssh+IP、ssh+别名免秘钥登录配置 下篇帖子: 【大数据系列】Hadoop DataNode读写流程
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表