参考:http://eric-gcm.iteye.com/blog/1807468
file1.txt:
2
32
654
32
15
756
65223
file2.txt:
5956
22
650
92
file3.txt:
26
54
6
JAVA代码:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Sort {
// map将输入中的value化成IntWritable类型,作为输出的key
public static class Map extends
Mapper {
private static IntWritable data = new IntWritable();
// 实现map函数
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
data.set(Integer.parseInt(line));
context.write(data, new IntWritable(1));
}
}
// reduce将输入中的key复制到输出数据的key上,
// 然后根据输入的value-list中元素的个数决定key的输出次数
// 用全局linenum来代表key的位次
public static class Reduce extends
Reducer {
private static IntWritable linenum = new IntWritable(1);
// 实现reduce函数
public void reduce(IntWritable key, Iterable values,
Context context)
throws IOException, InterruptedException {
for (IntWritable val : values) {
context.write(linenum, key);
linenum = new IntWritable(linenum.get() + 1);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 这句话很关键
conf.set("mapred.job.tracker", "172.16.11.74:9001");
String[] ioArgs = new String[] { "sort_in", "sort_out" };
String[] otherArgs = new GenericOptionsParser(conf, ioArgs)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Data Sort ");
System.exit(2);
}
Job job = new Job(conf, "Data Sort");
job.setJarByClass(Sort.class);
// 设置Map和Reduce处理类
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
// 设置输出类型
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入和输出目录
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Sort 运行结果:
1 2
2 6
3 15
4 22
5 26
6 32
7 32
8 54
9 92
10 650
11 654
12 756
13 5956
14 65223
具体打包运行步骤:
参考上一篇博文:http://www.iyunv.com/-wangjiannan/p/3590324.html
知识点:
MapReduce的默认排序规则是按照key值进行排序的。
如果key为封装int的IntWritable类型,那么MapReduce按照数字大小对key排序,
如果key为封装为String的Text类型,那么MapReduce按照字典顺序对字符串排序。
代码理解:
map阶段:
1. String line = value.toString();
实现的map方法中,针对文本的一行(line)处理,遍历每行的代码框架内部实现了
2. context.write(data, new IntWritable(1));
每一行:key是data(强转成IntWritable类型的 line),value是IntWritable类型的 1
3. 所有行默认排序好了,而且是按递增顺序的
若有重复的行,那么data对应的value合并成一个集合{Values}({IntWritable类型的 1+})
reduce阶段:
1. reduce(IntWritable key, Iterable values, Context context)
每一行:key是map阶段后的data,values是data对应的集合{Values}
2. for (IntWritable val : values) { context.write(linenum, key); linenum = new IntWritable(linenum.get() + 1); }
这行代码的作用是输出: 行号 data
同时:行号递增,若有重复的行,则换行输出
运维网声明
1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网 享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com