|
本博客属原创文章,转载请注明出处:http://guoyunsky.iyunv.com/blog/1235952
请先阅读:
1.Hadoop MapReduce 学习笔记(一) 序言和准备
2.Hadoop MapReduce 学习笔记(二) 序言和准备 2
3.Hadoop MapReduce 学习笔记(八) MapReduce实现类似SQL的order by/排序
4.Hadoop MapReduce 学习笔记(九) MapReduce实现类似SQL的order by/排序 正确写法
下一篇: Hadoop MapReduce 学习笔记(十一) MapReduce实现类似SQL的order by/排序3 改进
Hadoop MapReduce 学习笔记(九) MapReduce实现类似SQL的order by/排序 正确写法 只是实现了对单个字段的排序,如SELECT * FROM TABLE ORDER BY ID ASC.但如果想对多个字段排序呢,如SELECT * FROM TABLE ORDER BY ID ASC ,NAME DESC.具体请查看代码:
package com.guoyun.hadoop.mapreduce.study;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 通过MapReduce实现类似SQL的SELECT * FROM TABLE ORDER BY COL1 ASC,COL2 ASC
* 也就是对多个字段的排序
* 如果要实现更多字段或者DESC等排序,请自己实现WritableComparable
* 该MapReduce的改进版本请查看: @OrderByMultiMapReduceImproveTest
*
*/
public class OrderByMultiMapReduceTest extends MyMapReduceMultiColumnTest {
public static final Logger log=LoggerFactory.getLogger(OrderByMultiMapReduceTest.class);
public OrderByMultiMapReduceTest(long dataLength) throws Exception {
super(dataLength);
// TODO Auto-generated constructor stub
}
public OrderByMultiMapReduceTest(String outputPath) throws Exception {
super(outputPath);
// TODO Auto-generated constructor stub
}
public OrderByMultiMapReduceTest(String inputPath, String outputPath) {
super(inputPath, outputPath);
// TODO Auto-generated constructor stub
}
public OrderByMultiMapReduceTest(long dataLength, String inputPath,
String outputPath) throws Exception {
super(dataLength, inputPath, outputPath);
// TODO Auto-generated constructor stub
}
public static class OrderMultiColumnWritable extends MultiColumnWritable{
@Override
public int compareTo(Object obj) {
if(!(obj instanceof OrderMultiColumnWritable)){
return -1;
}
OrderMultiColumnWritable ocw=(OrderMultiColumnWritable)obj;
if(!this.getFrameworkName().equals(ocw.getFrameworkName())){
return this.getFrameworkName().compareTo(ocw.getFrameworkName());
}else if(this.getNumber()!=ocw.getNumber()){
return this.getNumber()<ocw.getNumber()?-1:1;
}else{
return 0;
}
}
public void set(String frameworkName,long number){
this.setFrameworkName(frameworkName);
this.setNumber(number);
}
@Override
public String toString() {
return this.getFrameworkName()+"\t"+this.getNumber();
}
}
/**
* map,get the source datas,and generate a (key,value) pair as (MultiWritable,NullWritable)
*/
public static class MyMapper extends Mapper<LongWritable,Text,OrderMultiColumnWritable,NullWritable>{
private OrderMultiColumnWritable writeKey=new OrderMultiColumnWritable();
private NullWritable writeValue=NullWritable.get();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
log.debug("begin to map");
String[] splits=null;
try {
splits=value.toString().split("\\t");
if(splits!=null&&splits.length==2){
writeKey.set(splits[0],Long.parseLong(splits[1].trim()));
}
} catch (NumberFormatException e) {
log.error("map error:"+e.getMessage());
}
context.write(writeKey, writeValue);
}
}
/**
* reduce,only use to output the result
*/
public static class MyReducer
extends Reducer<OrderMultiColumnWritable,NullWritable,OrderMultiColumnWritable,NullWritable>{
private NullWritable writeValue=NullWritable.get();
@Override
protected void reduce(OrderMultiColumnWritable key,
Iterable<NullWritable> values,Context context) throws IOException,
InterruptedException {
for(NullWritable value:values){
context.write(key, writeValue);
}
}
}
/**
* @param args
*/
public static void main(String[] args) {
MyMapReduceTest mapReduceTest=null;
Configuration conf=null;
Job job=null;
FileSystem fs=null;
Path inputPath=null;
Path outputPath=null;
long begin=0;
String input="testDatas/mapreduce/MRInput_Multi_OrderBy";
String output="testDatas/mapreduce/MROutput_Multi_OrderBy";
try {
mapReduceTest=new OrderByMultiMapReduceTest(1000,input,output);
inputPath=new Path(mapReduceTest.getInputPath());
outputPath=new Path(mapReduceTest.getOutputPath());
conf=new Configuration();
job=new Job(conf,"OrderBy");
fs=FileSystem.getLocal(conf);
if(fs.exists(outputPath)){
if(!fs.delete(outputPath,true)){
System.err.println("Delete output file:"+mapReduceTest.getOutputPath()+" failed!");
return;
}
}
job.setJarByClass(OrderByMultiMapReduceTest.class);
job.setMapOutputKeyClass(OrderMultiColumnWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(OrderMultiColumnWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setNumReduceTasks(2);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
begin=System.currentTimeMillis();
job.waitForCompletion(true);
System.out.println("===================================================");
if(mapReduceTest.isGenerateDatas()){
System.out.println("The maxValue is:"+mapReduceTest.getMaxValue());
System.out.println("The minValue is:"+mapReduceTest.getMinValue());
}
System.out.println("Spend time:"+(System.currentTimeMillis()-begin));
// Spend time:1235
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
更多文章、感悟、分享、勾搭,请用微信扫描:
|
|
|