设为首页 收藏本站
查看: 1075|回复: 0

[经验分享] Hadoop日记Day17---计数器、map规约、分区学习

[复制链接]
累计签到:18 天
连续签到:1 天
发表于 2015-7-13 08:09:03 | 显示全部楼层 |阅读模式
一、Hadoop计数器

1.1 什么是Hadoop计数器
  Haoop是处理大数据的,不适合处理小数据,有些大数据问题是小数据程序是处理不了的,他是一个高延迟的任务,有时处理一个大数据需要花费好几个小时这都是正常的。下面我们说一下Hadoop计数器,Hadoop计数器就相当于我们的日志,而日志可以让我们查看程序运行时的很多状态,而计数器也有这方面的作用。那么就研究一下Hadoop自身的计数器。计数器的程序如代码1.1所示,下面代码还是以内容为“hello you;hell0 me”的单词统计为例。


DSC0000.gif DSC0001.gif


1 package counter;
2
3 import java.net.URI;
4
5 import org.apache.hadoop.conf.Configuration;
6 import org.apache.hadoop.fs.FileSystem;
7 import org.apache.hadoop.fs.Path;
8 import org.apache.hadoop.io.LongWritable;
9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Counter;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
19
20 public class WordCountApp {
21     static final String INPUT_PATH = "hdfs://hadoop:9000/input";
22     static final String OUT_PATH = "hdfs://hadoop:9000/output";
23     
24     public static void main(String[] args) throws Exception {
25         
26         Configuration conf = new Configuration();
27         
28         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
29         final Path outPath = new Path(OUT_PATH);
30         
31         if(fileSystem.exists(outPath)){
32             fileSystem.delete(outPath, true);
33         }        
34         final Job job = new Job(conf , WordCountApp.class.getSimpleName());
35         
36         //1.1指定读取的文件位于哪里
37         FileInputFormat.setInputPaths(job, INPUT_PATH);        
38         job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
39         
40         //1.2 指定自定义的map类
41         job.setMapperClass(MyMapper.class);
42         job.setMapOutputKeyClass(Text.class);//map输出的类型。
43         job.setMapOutputValueClass(LongWritable.class);//如果的类型与类型一致,则可以省略
44         
45         //1.3 分区
46         job.setPartitionerClass(HashPartitioner.class);        
47         job.setNumReduceTasks(1);//有一个reduce任务运行               
48         
49         //2.2 指定自定义reduce类
50         job.setReducerClass(MyReducer.class);
51         
52         job.setOutputKeyClass(Text.class);//指定reduce的输出类型
53         job.setOutputValueClass(LongWritable.class);
54         
55         //2.3 指定写出到哪里
56         FileOutputFormat.setOutputPath(job, outPath);        
57         job.setOutputFormatClass(TextOutputFormat.class);//指定输出文件的格式化类
58                 
59         job.waitForCompletion(true);//把job提交给JobTracker运行
60     }
61     
62     /**
63      * KEYIN    即k1        表示行的偏移量
64      * VALUEIN    即v1        表示行文本内容
65      * KEYOUT    即k2        表示行中出现的单词
66      * VALUEOUT    即v2        表示行中出现的单词的次数,固定值1
67      */
68     static class MyMapper extends Mapper{
69         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {            
70             final String line = v1.toString();        
71             final String[] splited = line.split("\t");
72             for (String word : splited) {
73                 context.write(new Text(word), new LongWritable(1));
74             }
75         };
76     }
77     
78     /**
79      * KEYIN    即k2        表示行中出现的单词
80      * VALUEIN    即v2        表示行中出现的单词的次数
81      * KEYOUT    即k3        表示文本中出现的不同单词
82      * VALUEOUT    即v3        表示文本中出现的不同单词的总次数
83      *
84      */
85     static class MyReducer extends Reducer{
86         protected void reduce(Text k2, java.lang.Iterable v2s, Context ctx) throws java.io.IOException ,InterruptedException {
87             long times = 0L;
88             for (LongWritable count : v2s) {
89                 times += count.get();
90             }
91             ctx.write(k2, new LongWritable(times));
92         };
93     }
94         
95 }
View Code  代码 1.1
  运行结果如下图1.1所示。



Counters: 19//Counter表示计数器,19表示有19个计数器(下面一共4计数器组)
File Output Format Counters //文件输出格式化计数器组
Bytes Written=19       //reduce输出到hdfs的字节数,一共19个字节
FileSystemCounters//文件系统计数器组
FILE_BYTES_READ=481
HDFS_BYTES_READ=38
FILE_BYTES_WRITTEN=81316
HDFS_BYTES_WRITTEN=19
File Input Format Counters
//文件输入格式化计数器组
Bytes Read=19     //map从hdfs读取的字节数
Map-Reduce Framework//MapReduce框架
Map output materialized bytes=49
Map input records=2       //map读入的记录行数,读取两行记录,”hello you”,”hello me”
Reduce shuffle bytes=0//规约分区的字节数
Spilled Records=8
Map output bytes=35
Total committed heap usage (bytes)=266469376
SPLIT_RAW_BYTES=105
Combine
input records=0//合并输入的记录数
Reduce input records=4     //reduce从map端接收的记录行数
Reduce input groups=3     //reduce函数接收的key数量,即归并后的k2数量
Combine output records=0//合并输出的记录数
Reduce output records=3    //reduce输出的记录行数。,,
Map output records=4     //map输出的记录行数,输出4行记录
  图 1.1
  通过上面我们对计数器的分析,可以知道,我们可以通过计数器来分析MapReduece程序的运行状态。

1.2 自定义计数器
  通过上面的分析,我们了解了计数器的作用,那么我们可以自定义一个计数器,来实现我们自己想要的功能。定义一个记录敏感词的计数器,记录敏感词在一行所出现的次数,如代码2.1所示。我们处理文件内容为“hello you”,“hello me”。





1 Counters: 19//Counter表示计数器,19表示有19个计数器(下面一共4计数器组)
2    File Output Format Counters //文件输出格式化计数器组
3      Bytes Written=19       //reduce输出到hdfs的字节数,一共19个字节
4    FileSystemCounters//文件系统计数器组
5      FILE_BYTES_READ=481
6      HDFS_BYTES_READ=38
7      FILE_BYTES_WRITTEN=81316
8      HDFS_BYTES_WRITTEN=19
9    File Input Format Counters //文件输入格式化计数器组
10      Bytes Read=19     //map从hdfs读取的字节数
11    Map-Reduce Framework//MapReduce框架
12      Map output materialized bytes=49
13      Map input records=2       //map读入的记录行数,读取两行记录,”hello you”,”hello me”
14      Reduce shuffle bytes=0//规约分区的字节数
15      Spilled Records=8
16      Map output bytes=35
17      Total committed heap usage (bytes)=266469376
18      SPLIT_RAW_BYTES=105
19      Combine input records=0//合并输入的记录数
20      Reduce input records=4     //reduce从map端接收的记录行数
21      Reduce input groups=3     //reduce函数接收的key数量,即归并后的k2数量
22      Combine output records=0//合并输出的记录数
23      Reduce output records=3    //reduce输出的记录行数。,,
24      Map output records=4     //map输出的记录行数,输出4行记录
View Code  代码2.1
  运行结果如下图2.1所示。



Counters: 20
Sensitive Words
hello=2
File Output Format Counters

Bytes Written=21
FileSystemCounters

FILE_BYTES_READ=359
HDFS_BYTES_READ=42
FILE_BYTES_WRITTEN=129080
HDFS_BYTES_WRITTEN=21
File Input Format Counters

Bytes Read=21
Map-
Reduce Framework
Map output materialized bytes=67
Map input records=2
Reduce shuffle bytes=0
Spilled Records=8
Map output bytes=53
Total committed heap usage (bytes)=391774208
SPLIT_RAW_BYTES=95
Combine input records
=0
Reduce input records=4
Reduce input groups=3
Combine output records
=0
Reduce output records=3
Map output records=4
  图 2.1

二、Combiners编程

2.1 什么是Combiners
  从上面程序运行的结果我们可以发现,在Map-Reduce Framework即MapReduce框架的输出中,Combine input records这个字段为零, 那么combine怎么使用呢?其实这是MapReduce程序中Mapper任务中第五步,这是可选的一步,使用方法非常简单,以上面单词统计为例,只需添加下面一行代码即可,如下: job.setCombinerClass(MyReducer.class);
  combine操作是一个可选的操作,使用时需要我们自己设定,我们用MyReducer类来设置Combiners,表示Combiners与Reduce功能相同,带有combine功能的MapRduce程序如代码3.1所示。





  1 package combine;
  2
  3 import java.net.URI;
  4
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.fs.FileSystem;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.LongWritable;
  9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.hadoop.mapreduce.Partitioner;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17 import org.apache.jasper.tagplugins.jstl.core.If;
18
19 public class WordCountApp2 {
20     static final String INPUT_PATH = "hdfs://hadoop:9000/hello";
21     static final String OUT_PATH = "hdfs://hadoop:9000/out";
22     
23     public static void main(String[] args) throws Exception {
24         Configuration conf = new Configuration();
25         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
26         final Path outPath = new Path(OUT_PATH);
27         if(fileSystem.exists(outPath)){
28             fileSystem.delete(outPath, true);
29         }
30         final Job job = new Job(conf , WordCountApp2.class.getSimpleName());
31         job.setJarByClass(WordCountApp2.class);
32         
33         //1.1指定读取的文件位于哪里
34         FileInputFormat.setInputPaths(job, INPUT_PATH);        
35         job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
36         
37         //1.2 指定自定义的map类
38         job.setMapperClass(MyMapper.class);
39         job.setMapOutputKeyClass(Text.class);//map输出的类型。
40         job.setMapOutputValueClass(LongWritable.class);//如果的类型与类型一致,则可以省略
41         
42         //1.3 分区
43         job.setPartitionerClass(MyPartitioner.class);
44         //有几个reduce任务运行
45         job.setNumReduceTasks(2);
46         
47         //1.4 TODO 排序、分组
48         
49         //1.5 规约
50         job.setCombinerClass(MyCombiner.class);
51         
52         //2.2 指定自定义reduce类
53         job.setReducerClass(MyReducer.class);
54         //指定reduce的输出类型
55         job.setOutputKeyClass(Text.class);
56         job.setOutputValueClass(LongWritable.class);
57         
58         //2.3 指定写出到哪里
59         FileOutputFormat.setOutputPath(job, outPath);
60         //指定输出文件的格式化类
61         //job.setOutputFormatClass(TextOutputFormat.class);
62         
63         //把job提交给JobTracker运行
64         job.waitForCompletion(true);
65     }
66     
67     static class MyPartitioner extends Partitioner{
68         @Override
69         public int getPartition(Text key, LongWritable value, int numReduceTasks) {
70             return (key.toString().equals("hello"))?0:1;
71         }
72     }
73     
74     /**
75      * KEYIN    即k1        表示行的偏移量
76      * VALUEIN    即v1        表示行文本内容
77      * KEYOUT    即k2        表示行中出现的单词
78      * VALUEOUT    即v2        表示行中出现的单词的次数,固定值1
79      */
80     static class MyMapper extends Mapper{
81         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
82             final String[] splited = v1.toString().split("\t");
83             for (String word : splited) {
84                 context.write(new Text(word), new LongWritable(1));
85                 System.out.println("Mapper输出");
86             }
87         };
88     }
89     
90     /**
91      * KEYIN    即k2        表示行中出现的单词
92      * VALUEIN    即v2        表示行中出现的单词的次数
93      * KEYOUT    即k3        表示文本中出现的不同单词
94      * VALUEOUT    即v3        表示文本中出现的不同单词的总次数
95      *
96      */
97     static class MyReducer extends Reducer{
98         protected void reduce(Text k2, java.lang.Iterable v2s, Context ctx) throws java.io.IOException ,InterruptedException {
99             //显示次数表示redcue函数被调用了多少次,表示k2有多少个分组
100             System.out.println("MyReducer输入分组");
101             long times = 0L;
102             for (LongWritable count : v2s) {
103                 times += count.get();
104                 //显示次数表示输入的k2,v2的键值对数量
105                 System.out.println("MyReducer输入键值对");
106             }
107             ctx.write(k2, new LongWritable(times));
108         };
109     }
110     
111     
112     static class MyCombiner extends Reducer{
113         protected void reduce(Text k2, java.lang.Iterable v2s, Context ctx) throws java.io.IOException ,InterruptedException {
114             //显示次数表示redcue函数被调用了多少次,表示k2有多少个分组
115             System.out.println("Combiner输入分组");
116             long times = 0L;
117             for (LongWritable count : v2s) {
118                 times += count.get();
119                 //显示次数表示输入的k2,v2的键值对数量
120                 System.out.println("Combiner输入键值对");
121             }
122            
123             ctx.write(k2, new LongWritable(times));
124             //显示次数表示输出的k2,v2的键值对数量
125             System.out.println("Combiner输出键值对");
126         };
127     }
128 }
View Code  代码 3.1
  运行结果如下图3.1所示。



Counters: 20
Sensitive Words

hello=2
File Output Format Counters

Bytes Written=21
FileSystemCounters

FILE_BYTES_READ=359
HDFS_BYTES_READ=42
FILE_BYTES_WRITTEN=129080
HDFS_BYTES_WRITTEN=21
File Input Format Counters

Bytes Read=21
Map-
Reduce Framework
Map output materialized bytes=67
Map input records=2
Reduce shuffle bytes=0
Spilled Records=8
Map output bytes=53
Total committed heap usage (bytes)=391774208
SPLIT_RAW_BYTES=95
Combine input records=4
Reduce input records=3
Reduce input groups=3
Combine output records=3
Reduce output records=3
Map output records=4
  图 3.1
  从上面的运行结果我们可以发现,此时Combine input records=4,Combine output records=3,Reduce input records=3,因为Combine阶段在Ma pper结束与Reducer开始之间,Combiners处理的数据,就是在不设置Combiners时,Reduce所应该接受的数据,所以为4,然后再将Combiners的输出作为Re duce端的输入,所以Reduce input records这个字段由4变成了3。注意,combine操作是一个可选的操作,使用时需要我们自己设定,在本代码中我们用MyRed ucer类来设置Combiners,Combine方法的使用的是Reduce的方法,这说明归约的方法是通用的,Reducer阶段的方法也可以用到Mapper阶段。

2.1 自定义Combiners
  为了能够更加清晰的理解Combiners的工作原理,我们自定义一个Combiners类,不再使用MyReduce做为Combiners的类,如代码3.2所示。





  1 package combine;
  2
  3 import java.net.URI;
  4
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.fs.FileSystem;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.LongWritable;
  9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.hadoop.mapreduce.Partitioner;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17 import org.apache.jasper.tagplugins.jstl.core.If;
18
19 /**
20  * 问:为什么使用Combiner?
21  * 答:Combiner发生在Map端,对数据进行规约处理,数据量变小了,传送到reduce端的数据量变小了,传输时间变短,作业的整体时间变短。
22  *
23  * 问:为什么Combiner不作为MR运行的标配,而是可选步骤哪?
24  * 答:因为不是所有的算法都适合使用Combiner处理,例如求平均数。
25  *
26  * 问:Combiner本身已经执行了reduce操作,为什么在Reducer阶段还要执行reduce操作哪?
27  * 答:combiner操作发生在map端的,处理一个任务所接收的文件中的数据,不能跨map任务执行;只有reduce可以接收多个map任务处理的数据。
28  *
29  */
30 public class WordCountApp2 {
31     static final String INPUT_PATH = "hdfs://hadoop:9000/hello";
32     static final String OUT_PATH = "hdfs://hadoop:9000/out";
33     
34     public static void main(String[] args) throws Exception {
35         Configuration conf = new Configuration();
36         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
37         final Path outPath = new Path(OUT_PATH);
38         if(fileSystem.exists(outPath)){
39             fileSystem.delete(outPath, true);
40         }
41         final Job job = new Job(conf , WordCountApp2.class.getSimpleName());
42         job.setJarByClass(WordCountApp2.class);
43         
44         //1.1指定读取的文件位于哪里
45         FileInputFormat.setInputPaths(job, INPUT_PATH);        
46         job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
47         
48         //1.2 指定自定义的map类
49         job.setMapperClass(MyMapper.class);
50         job.setMapOutputKeyClass(Text.class);//map输出的类型。
51         job.setMapOutputValueClass(LongWritable.class);//如果的类型与类型一致,则可以省略
52         
53         //1.3 分区
54         job.setPartitionerClass(MyPartitioner.class);
55         //有几个reduce任务运行
56         job.setNumReduceTasks(2);
57         
58         //1.4 TODO 排序、分组
59         
60         //1.5 规约
61         job.setCombinerClass(MyCombiner.class);
62         
63         //2.2 指定自定义reduce类
64         job.setReducerClass(MyReducer.class);
65         //指定reduce的输出类型
66         job.setOutputKeyClass(Text.class);
67         job.setOutputValueClass(LongWritable.class);
68         
69         //2.3 指定写出到哪里
70         FileOutputFormat.setOutputPath(job, outPath);
71         //指定输出文件的格式化类
72         //job.setOutputFormatClass(TextOutputFormat.class);
73         
74         //把job提交给JobTracker运行
75         job.waitForCompletion(true);
76     }
77     
78     static class MyPartitioner extends Partitioner{
79         @Override
80         public int getPartition(Text key, LongWritable value, int numReduceTasks) {
81             return (key.toString().equals("hello"))?0:1;
82         }
83     }
84     
85     /**
86      * KEYIN    即k1        表示行的偏移量
87      * VALUEIN    即v1        表示行文本内容
88      * KEYOUT    即k2        表示行中出现的单词
89      * VALUEOUT    即v2        表示行中出现的单词的次数,固定值1
90      */
91     static class MyMapper extends Mapper{
92         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
93             final String[] splited = v1.toString().split("\t");
94             for (String word : splited) {
95                 context.write(new Text(word), new LongWritable(1));
96                 System.out.println("Mapper输出");
97             }
98         };
99     }
100     
101     /**
102      * KEYIN    即k2        表示行中出现的单词
103      * VALUEIN    即v2        表示行中出现的单词的次数
104      * KEYOUT    即k3        表示文本中出现的不同单词
105      * VALUEOUT    即v3        表示文本中出现的不同单词的总次数
106      *
107      */
108     static class MyReducer extends Reducer{
109         protected void reduce(Text k2, java.lang.Iterable v2s, Context ctx) throws java.io.IOException ,InterruptedException {
110             //显示次数表示redcue函数被调用了多少次,表示k2有多少个分组
111             System.out.println("MyReducer输入分组");
112             long times = 0L;
113             for (LongWritable count : v2s) {
114                 times += count.get();
115                 //显示次数表示输入的k2,v2的键值对数量
116                 System.out.println("MyReducer输入键值对");
117             }
118             ctx.write(k2, new LongWritable(times));
119         };
120     }
121     
122     
123     static class MyCombiner extends Reducer{
124         protected void reduce(Text k2, java.lang.Iterable v2s, Context ctx) throws java.io.IOException ,InterruptedException {
125             //显示次数表示redcue函数被调用了多少次,表示k2有多少个分组
126             System.out.println("Combiner输入分组");
127             long times = 0L;
128             for (LongWritable count : v2s) {
129                 times += count.get();
130                 //显示次数表示输入的k2,v2的键值对数量
131                 System.out.println("Combiner输入键值对");
132             }
133            
134             ctx.write(k2, new LongWritable(times));
135             //显示次数表示输出的k2,v2的键值对数量
136             System.out.println("Combiner输出键值对");
137         };
138     }
139 }
View Code  代码 3.2
  运行结果如图3.2所示。



14/10/07 18:56:32 INFO mapred.MapTask: record buffer = 262144/327680
Mapper输出
14/10/07 18:56:32 INFO mapred.MapTask: Starting flush of map output
Mapper输出
Mapper输出
Mapper输出
Combiner输入分组
Combiner输入键值对
Combiner输入键值对
Combiner输出键值对
Combiner输入分组
Combiner输入键值对
Combiner输出键值对
Combiner输入分组
Combiner输入键值对
Combiner输出键值对
14/10/07 18:56:32 INFO mapred.MapTask: Finished spill 0
14/10/07 18:56:32 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/10/07 18:56:32 INFO mapred.LocalJobRunner:
14/10/07 18:56:32 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
14/10/07 18:56:32 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
14/10/07 18:56:32 INFO mapred.LocalJobRunner:
14/10/07 18:56:32 INFO mapred.Merger: Merging 1 sorted segments
14/10/07 18:56:32 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 47 bytes
14/10/07 18:56:32 INFO mapred.LocalJobRunner:
MyReducer输入分组
MyReducer输入键值对
MyReducer输入分组
MyReducer输入键值对
MyReducer输入分组
MyReducer输入键值对
14/10/07 18:56:33 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/10/07 18:56:33 INFO mapred.LocalJobRunner:
14/10/07 18:56:33 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
14/10/07 18:56:33 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://hadoop:9000/output
14/10/07 18:56:33 INFO mapred.LocalJobRunner: reduce > reduce
14/10/07 18:56:33 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
14/10/07 18:56:33 INFO mapred.JobClient:  map 100% reduce 100%
14/10/07 18:56:33 INFO mapred.JobClient: Job complete: job_local_0001
14/10/07 18:56:33 INFO mapred.JobClient: Counters: 19
14/10/07 18:56:33 INFO mapred.JobClient:   File Output Format Counters
14/10/07 18:56:33 INFO mapred.JobClient:     Bytes Written=21
14/10/07 18:56:33 INFO mapred.JobClient:   FileSystemCounters
14/10/07 18:56:33 INFO mapred.JobClient:     FILE_BYTES_READ=343
14/10/07 18:56:33 INFO mapred.JobClient:     HDFS_BYTES_READ=42
14/10/07 18:56:33 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=129572
14/10/07 18:56:33 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=21
14/10/07 18:56:33 INFO mapred.JobClient:   File Input Format Counters
14/10/07 18:56:33 INFO mapred.JobClient:     Bytes Read=21
14/10/07 18:56:33 INFO mapred.JobClient:   Map-Reduce Framework
14/10/07 18:56:33 INFO mapred.JobClient:     Map output materialized bytes=51
14/10/07 18:56:33 INFO mapred.JobClient:     Map input records=2
14/10/07 18:56:33 INFO mapred.JobClient:     Reduce shuffle bytes=0
14/10/07 18:56:33 INFO mapred.JobClient:     Spilled Records=6
14/10/07 18:56:33 INFO mapred.JobClient:     Map output bytes=53
14/10/07 18:56:33 INFO mapred.JobClient:     Total committed heap usage (bytes)=391774208
14/10/07 18:56:33 INFO mapred.JobClient:     SPLIT_RAW_BYTES=95
14/10/07 18:56:33 INFO mapred.JobClient:     Combine input records=4
14/10/07 18:56:33 INFO mapred.JobClient:     Reduce input records=3
14/10/07 18:56:33 INFO mapred.JobClient:     Reduce input groups=3
14/10/07 18:56:33 INFO mapred.JobClient:     Combine output records=3
14/10/07 18:56:33 INFO mapred.JobClient:     Reduce output records=3
14/10/07 18:56:33 INFO mapred.JobClient:     Map output records=4
  图 3.2
  从上面的运行结果我们可以得知,combine具体作用如下:


  • 每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。
  • combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。
  • 如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

   注意:Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那 种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。
  解释一下
  *问:为什么使用Combiner?
   答:Combiner发生在Map端,对数据进行规约处理,数据量变小了,传送到reduce端的数据量变小了,传输时间变短,作业的整体时间变短。
* 问:为什么Combiner不作为MR运行的标配,而是可选步骤?
    答:因为不是所有的算法都适合使用Combiner处理,例如求平均数。
* 问:Combiner本身已经执行了reduce操作,为什么在Reducer阶段还要执行reduce操作?
    答:combiner操作发生在map端的,智能处理一个map任务中的数据,不能跨map任务执行;只有reduce可以接收多个map任务处理的数据。

三、Partitioner编程

4.1 什么是分区
  在MapReuce程序中的Mapper任务的第三步就是分区,那么分区到底是干什么的呢?其实,把数据分区是为了更好的利用数据,根据数据的属性不同来分成不同区,再根据不同的分区完成不同的任务。MapReduce程序中他的默认分区是1个分区,我们看一下默认分区的代码,还是以单词统计为例如代码4.1所示。





  1 package counter;
  2
  3 import java.net.URI;
  4
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.fs.FileSystem;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.LongWritable;
  9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Counter;
11 import org.apache.hadoop.mapreduce.Job;
12 import org.apache.hadoop.mapreduce.Mapper;
13 import org.apache.hadoop.mapreduce.Reducer;
14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
18 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
19
20 public class WordCountApp {
21     static final String INPUT_PATH = "hdfs://hadoop:9000/input";
22     static final String OUT_PATH = "hdfs://hadoop:9000/output";
23     
24     public static void main(String[] args) throws Exception {
25         
26         Configuration conf = new Configuration();
27         
28         final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
29         final Path outPath = new Path(OUT_PATH);
30         
31         if(fileSystem.exists(outPath)){
32             fileSystem.delete(outPath, true);
33         }        
34         final Job job = new Job(conf , WordCountApp.class.getSimpleName());
35         
36         //1.1指定读取的文件位于哪里
37         FileInputFormat.setInputPaths(job, INPUT_PATH);        
38         job.setInputFormatClass(TextInputFormat.class);//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
39         
40         //1.2 指定自定义的map类
41         job.setMapperClass(MyMapper.class);
42         job.setMapOutputKeyClass(Text.class);//map输出的类型。
43         job.setMapOutputValueClass(LongWritable.class);//如果的类型与类型一致,则可以省略
44         
45         //1.3 分区
46         job.setPartitionerClass(HashPartitioner.class);        
47         job.setNumReduceTasks(1);//有一个reduce任务运行               
48         
49         job.setCombinerClass(MyReducer.class);
50         //2.2 指定自定义reduce类
51         job.setReducerClass(MyReducer.class);
52         
53         job.setOutputKeyClass(Text.class);//指定reduce的输出类型
54         job.setOutputValueClass(LongWritable.class);
55         
56         //2.3 指定写出到哪里
57         FileOutputFormat.setOutputPath(job, outPath);        
58         job.setOutputFormatClass(TextOutputFormat.class);//指定输出文件的格式化类
59                 
60         job.waitForCompletion(true);//把job提交给JobTracker运行
61     }
62     
63     /**
64      * KEYIN    即k1        表示行的偏移量
65      * VALUEIN    即v1        表示行文本内容
66      * KEYOUT    即k2        表示行中出现的单词
67      * VALUEOUT    即v2        表示行中出现的单词的次数,固定值1
68      */
69     static class MyMapper extends Mapper{
70         protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
71             final Counter helloCounter = context.getCounter("Sensitive Words", "hello");
72            
73             final String line = v1.toString();
74             if(line.contains("hello")){
75                 //记录敏感词出现在一行中
76                 helloCounter.increment(1L);
77             }
78             final String[] splited = line.split("\t");
79             for (String word : splited) {
80                 context.write(new Text(word), new LongWritable(1));
81             }
82         };
83     }
84     
85     /**
86      * KEYIN    即k2        表示行中出现的单词
87      * VALUEIN    即v2        表示行中出现的单词的次数
88      * KEYOUT    即k3        表示文本中出现的不同单词
89      * VALUEOUT    即v3        表示文本中出现的不同单词的总次数
90      *
91      */
92     static class MyReducer extends Reducer{
93         protected void reduce(Text k2, java.lang.Iterable v2s, Context ctx) throws java.io.IOException ,InterruptedException {
94             long times = 0L;
95             for (LongWritable count : v2s) {
96                 times += count.get();
97             }
98             ctx.write(k2, new LongWritable(times));
99         };
100     }
101         
102 }
View Code  代码 4.1
  在MapReduce程序中默认的分区方法为HashPartitioner,代码job.setNumReduceTasks(1)表示运行的Reduce任务数,他会将numReduceTask这个变量设为1. HashPartitioner继承自Partitioner,Partitioner是Partitioner的基类,如果需要定制partitioner也需要继承该类。 HashPartitioner计算方法如代码4.2所示。



1 public class HashPartitioner extends Partitioner {
2
3   /** Use {@link Object#hashCode()} to partition. */
4   public int getPartition(K key, V value,
5                           int numReduceTasks) {
6     return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
7   }
8
9 }
  代码 4.2
  在上面的代码中K和V,表示k2和v2,该类中只有一个方法getPartition(),返回值如下”(key.hashCode()& Integer.MAX_VALUE)%numReduceTasks“其中key.hashCode()表示该关键是否属于该类。numReduceTasks的值在上面代码中设置为1,取模后只有一种结果那就是0。getPartition()的意义就是表示划分到不同区域的一个标记,返回0,就是表示划分到第0区,所以我们可以把它理解分区的下标,来代表不同的分区。

4.2 自定义分区
    下面我们尝试自定义一个分区,来处理一下手机的日志数据(在前面学习中用过),手机日志数据如下图4.1所示。
  
  图 4.1
    从图中我们可以发现,在第二列上并不是所有的数据都是手机号,我们任务就是在统计手机流量时,将手机号码和非手机号输出到不同的文件中。我们的分区是按手机和非手机号码来分的,所以我们可以按该字段的长度来划分,如代码4.3所示。
  





  1 package partition;
  2
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6
  7 import org.apache.hadoop.conf.Configuration;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.LongWritable;
10 import org.apache.hadoop.io.Text;
11 import org.apache.hadoop.io.Writable;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.Reducer;
15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
19 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
20
21 public class KpiApp {
22     static final String INPUT_PATH = "hdfs://hadoop:9000/wlan";
23     static final String OUT_PATH = "hdfs://hadoop:9000/out";
24     public static void main(String[] args) throws Exception{
25         final Job job = new Job(new Configuration(), KpiApp.class.getSimpleName());
26         
27         job.setJarByClass(KpiApp.class);
28         
29         //1.1 指定输入文件路径
30         FileInputFormat.setInputPaths(job, INPUT_PATH);
31         job.setInputFormatClass(TextInputFormat.class);//指定哪个类用来格式化输入文件
32         
33         //1.2指定自定义的Mapper类
34         job.setMapperClass(MyMapper.class);        
35         job.setMapOutputKeyClass(Text.class);//指定输出的类型
36         job.setMapOutputValueClass(KpiWritable.class);
37         
38         //1.3 指定分区类
39         job.setPartitionerClass(KpiPartitioner.class);
40         job.setNumReduceTasks(2);
41                 
42         //2.2 指定自定义的reduce类
43         job.setReducerClass(MyReducer.class);
44         job.setOutputKeyClass(Text.class);//指定输出的类型
45         job.setOutputValueClass(KpiWritable.class);
46         
47         //2.3 指定输出到哪里
48         FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
49         job.setOutputFormatClass(TextOutputFormat.class);//设定输出文件的格式化类
50         job.waitForCompletion(true);//把代码提交给JobTracker执行
51     }
52
53     static class MyMapper extends Mapper{
54         protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException ,InterruptedException {
55             final String[] splited = value.toString().split("\t");
56             final String msisdn = splited[1];
57             final Text k2 = new Text(msisdn);
58             final KpiWritable v2 = new KpiWritable(splited[6],splited[7],splited[8],splited[9]);
59             context.write(k2, v2);
60         };
61     }
62     
63     static class MyReducer extends Reducer{
64         /**
65          * @param    k2    表示整个文件中不同的手机号码   
66          * @param    v2s    表示该手机号在不同时段的流量的集合
67          */
68         protected void reduce(Text k2, java.lang.Iterable v2s, org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException ,InterruptedException {
69             long upPackNum = 0L;
70             long downPackNum = 0L;
71             long upPayLoad = 0L;
72             long downPayLoad = 0L;
73            
74             for (KpiWritable kpiWritable : v2s) {
75                 upPackNum += kpiWritable.upPackNum;
76                 downPackNum += kpiWritable.downPackNum;
77                 upPayLoad += kpiWritable.upPayLoad;
78                 downPayLoad += kpiWritable.downPayLoad;
79             }
80            
81             final KpiWritable v3 = new KpiWritable(upPackNum+"", downPackNum+"", upPayLoad+"", downPayLoad+"");
82             context.write(k2, v3);
83         };
84     }
85     
86     static class KpiPartitioner extends HashPartitioner{
87         @Override
88         public int getPartition(Text key, KpiWritable value, int numReduceTasks) {
89             return (key.toString().length()==11)?0:1;
90         }
91     }
92 }
93
94 class KpiWritable implements Writable{
95     long upPackNum;
96     long downPackNum;
97     long upPayLoad;
98     long downPayLoad;
99     
100     public KpiWritable(){}
101     
102     public KpiWritable(String upPackNum, String downPackNum, String upPayLoad, String downPayLoad){
103         this.upPackNum = Long.parseLong(upPackNum);
104         this.downPackNum = Long.parseLong(downPackNum);
105         this.upPayLoad = Long.parseLong(upPayLoad);
106         this.downPayLoad = Long.parseLong(downPayLoad);
107     }
108     
109     
110     @Override
111     public void readFields(DataInput in) throws IOException {
112         this.upPackNum = in.readLong();
113         this.downPackNum = in.readLong();
114         this.upPayLoad = in.readLong();
115         this.downPayLoad = in.readLong();
116     }
117
118     @Override
119     public void write(DataOutput out) throws IOException {
120         out.writeLong(upPackNum);
121         out.writeLong(downPackNum);
122         out.writeLong(upPayLoad);
123         out.writeLong(downPayLoad);
124     }
125     
126     @Override
127     public String toString() {
128         return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t" + downPayLoad;
129     }
130 }
View Code  代码 4.3
    注意:分区的例子必须打成jar运行,运行结果如下图4.3,4.4所示,4.3表示手机号码流量,4.4为非手机号流量。
  
  图 4.3
  
  图4.4
  我们知道一个分区对应一个Reducer任务是否是这样呢,我可以通过访问50030MapReduce端口来验证,在浏览器输入”http://hadoop:50030"可以看到MapReduce界面,如图4.5,4.6所示。

  图 4.5

  图4.6
  从图中可以知道,该MapReduce任务有一个Mapper任务,两个Reducer任务,那么我们细看一下Reducer的两个任务到底是什么?如图4.7,4.8,4.9所示。task_201410070239_0002_r_000000表示第一个分区的输出,有20条记录,task_201410070239_0002_r_000001表示第二分区,有一条输出记录。和我们程序运行结果一样。


  图 4.7

  图 4.8 第一分区

  图 4.9 第二分区
  综上一些列分析,分区的用处如下:
    1.根据业务需要,产生多个输出文件
    2.多个reduce任务在并发运行,提高整体job的运行效率
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-85921-1-1.html 上篇帖子: [hadoop源码阅读][5]-counter的使用和默认counter的含义 下篇帖子: Hadoop集群配置【六、thrift安装】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表