本《hadoop学习笔记》系列是在《hadoop: the definitive guide 3th》的基础上通过网上额外搜集资料和查看hadoop的API再加上自己的实践方面 的理解编写而成的,主要针对hadoop的特性和功能学习以及Hadoop生态圈中的其他工具(如Pig,Hive,Hbase,Avro等等)。另外设计到hadoop编程方面的请查阅另一个笔记系列:《Hadoop编程笔记》。如果有同学同时也在研究这本书,欢迎沟通交流,在下能力有限,还望各路大神看到有不对的地方加以指正~~(本系列学习笔记还正在整理中,以后会陆续发布)。
本书第二章以一个很浅显的例子为大家提供了hadoop入门,那就是从气象站中保存的历史气象信息里面提取出每年的最高气温来。其大致过程如下(注意在不同阶段的数据形式):
注意:shuffle阶段是把map的输出传输到reduce任务的阶段,其中包括了键值对的排序和分组。
1.Map阶段
Map任务是十分简单的,我们只需要从输入文件中提取出年份和对应的温度值即可,同时过滤掉坏的记录。这里我们选择Text输入格式(默认的),其中数据集的每一行作为map任务输入中的key-value pair中的value值,key值是对应行在输入文件中的位移(以字节为单位),但我们并不需要key的值,所以忽略之。下面我们来看一下map任务的java表示:
1 import java.io.IOException;
2 import org.apache.hadoop.io.IntWritable;
3 import org.apache.hadoop.io.LongWritable;
4 import org.apache.hadoop.io.Text;
5 import org.apache.hadoop.mapreduce.Mapper;
6
7 public class MaxTemperatureMapper
8 extends Mapper { //注1
9 private static final int MISSING = 9999;
10 @Override
11 public void map(LongWritable key, Text value, Context context)
12 throws IOException, InterruptedException {
13 String line = value.toString();
14 String year = line.substring(15, 19);
15 int airTemperature;
16 if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
17 airTemperature = Integer.parseInt(line.substring(88, 92));
18 } else {
19 airTemperature = Integer.parseInt(line.substring(87, 92));
20 }
21 String quality = line.substring(92, 93);
22 if (airTemperature != MISSING && quality.matches("[01459]")) {
23 context.write(new Text(year), new IntWritable(airTemperature));
24 }
25 }
26 }
注1:Mapper类是一个泛型,其中的四个类型参数(LongWritable, Text, Text, IntWritable)指明了Mapper任务的输入(键,值)类型和输出(键,值)类型。其中LongWritable相当于java中的long类型,类似的,Text~String,IntWritable~int,只不过前者在网络传输时序列化操作方面做了优化。
2. Reduce阶段
同样的,Reducer类的四个类型参数也指明了Reducer任务的输入(键,值)类型和输出(键,值)类型。其输入类型必须和Mapper任务的输出类型匹配(在这个例子中为(Text,IntWritable))。
1 import java.io.IOException;
2 import org.apache.hadoop.io.IntWritable;
3 import org.apache.hadoop.io.Text;
4 import org.apache.hadoop.mapreduce.Reducer;
5
6 public class MaxTemperatureReducer
7 extends Reducer {
8 @Override
9 public void reduce(Text key, Iterable values,
10 Context context)
11 throws IOException, InterruptedException {
12 int maxValue = Integer.MIN_VALUE;
13 for (IntWritable value : values) {
14 maxValue = Math.max(maxValue, value.get());
15 }
16 context.write(key, new IntWritable(maxValue));
17 }
18 }
3. 下面我们总体查看一下作业的运行代码:
1 import org.apache.hadoop.fs.Path;
2 import org.apache.hadoop.io.IntWritable;
3 import org.apache.hadoop.io.Text;
4 import org.apache.hadoop.mapreduce.Job;
5 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
6 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
7
8 public class MaxTemperature {
9 public static void main(String[] args) throws Exception {
10 if (args.length != 2) {
11 System.err.println("Usage: MaxTemperature ");
12 System.exit(-1);
13 }
14 Job job = new Job();
15 job.setJarByClass(MaxTemperature.class);
16 job.setJobName("Max temperature");
17
18 FileInputFormat.addInputPath(job, new Path(args[0]));
19 FileOutputFormat.setOutputPath(job, new Path(args[1]));
20
21 job.setMapperClass(MaxTemperatureMapper.class);
22 job.setReducerClass(MaxTemperatureReducer.class);
23
24 job.setOutputKeyClass(Text.class); //注1
25 job.setOutputValueClass(IntWritable.class);
26
27 System.exit(job.waitForCompletion(true) ? 0 : 1);
28 }
29 }
注1:setOutputKeyClass()和setOutputValueClass()控制map任务和reduce任务的输出(两者输出类型相同的情况下),如果他们不一样,那么map的输出就要通过setMapOutputKeyClass()和setMapOutputValueClass()来设定了(reduce任务的输出设定之手前两者的影响)。
附1:map的输入格式由Job的静态方法 public void setInputFormatClass (Class
运维网声明
1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网 享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com