|
3,剖析MapReduce程序
<1>hadoop的数据类型
实现Writable接口额类可以是值,而实现WritableComparable接口的类既可以是键也可以是值。
以下这些是常用的数据类型,均用于实现WritableComparable借口:
BooleanWritable
ByteWritable
DoubleWritable
FloatWritable
IntWritable
LongWritable
Text使用UTF8格式的文本封装
NullWritable无键值时的占位符
也可以自定义数据类型,但要实现Writable接口或者WritableComparable接口。
<2>Mapper
一个类要作为mapper,需要继承MapReduceBase基类和实现Mapper借口。
mapper和reducer的基类均为MapReduceBase,它包含类的构造和解构方法:
void configure(JobConf job);该函数提取XML配置文件或者应用程序主类中的参数,在数据处理之前调用该函。
void close();作为map任务结束前的最后一个操作,该函数完成所有的结尾工作,如关闭数据库、打开文件等。
Mapper借口负责数据处理阶段,采用的形式是Mapper<K1,V1,K2,V2>java泛型,键类和值类分别实现WritableComparable和Writable借口。
Mapper只有一个方法map用于处理一个单独的键值对。
void map(K1 key,V1 value,OutputCollector<K2,V2> output,Reporter reporter)
该函数处理一个给定的键值对(K1,V1),生成一个键值对(K2,V2)的列表。
OutputCollector接收这个映射过程的输入。
Reporter 可提供对Mapper相关附加信息的记录,形成任务进度。
Hadoop预定义的Mapper的实现:
IdentityMapper<K,V>
| 实现Mapper<K,V,K,V>就输入直接映射到输出
| InverseMapper<K,V>
| 实现Mapper<K,V,K,V>发转键值对
| RegexMapper<K>
| 实现Mapper<K,Text,Text,LongWritable>,为每个常规表达式的匹配项生成一个(mathc,1)对
| TokenCountMapper<K>
| 实现Mapper<K,Text,Text,LongWritable>,当输入的值为分词时,生成一个(token,1)对
| <3>Reducer
reducer的实现也必须继承MapReduceBase类和实现Reducer接口。
Reducer接口只有一个方法,及时reduce:
Void reduce (K2,key,Iterator<V2> values,OutputCollector<K3,V3> output,Reporter r)
Reducer任务接收来自各个mapper的输出时,按照键对输入数据进行排序,并将相同的键的值归并,然后调用reduce函数,并通过迭代处理那些与指定键相关联的值,生成一个列表<K3,V3>
OutputCollector接收reduce阶段的输出,并写入输出文件。
Reporter 可提供对Mapper相关附加信息的记录,形成任务进度。
Hadoop预定义的Reducer的实现:
IdentityReducer<K,V>
| 实现Reducer<K,V,K,V>,将输入直接映射到输出
| LongSumReducer<K>
| 实现<K,LongWritable,K,LongWritable>,计算与给定键相对应的所有值的和
| <4>Partitioner:重定向Mapper输出
其实在mapper和reducer之间有一个非常重要的环节,就是将mapper的结果输出给不同的reducer,这就是Partitioner的工作:(此工作常被称为:洗牌)
一般reducer是多个,那么mapper应该将键值的输出给谁呢?Hadoop默认的机制是对键进行散列来确定reducer,Hadoop通过HashPartitioner类强制执行此策略。但是有的时候HashPartitioner会使得程序出错,即他的分发策略不符合实际的要求,那么此时就需要我们定制自己的Partitioner:(例如针对自定义的数据类型Edge)
public class EdgePartitioner implements Partitioner<Edge,Writable>
{
@override
public int getPartition(Edge key,Writable value,int numPartitions)
{
return key.getDepartureNode().hashCode() % numPartitions;
}
@override
public void configure(JobConf conf)
{
}
}
| 自己定制的Partitioner只需要实现getPartition方法和configure方法即可。前者将Hadoop对作业的配置应用在Partitioner上,而后者返回一个介于0和reducer任务数之间的整数,指向键值对要发送到的reducer。
Partitioner的形象的表达:(如下图)
<5>Combiner:本地reduce
在分发mapper之前先做一下“本地reduce”,也被成为合并,后面详细讲述。
<6>预定义Mapper和Reducer
使用预定义Mapper和Reducer改写前面的统计单词数量的程序:WordCount2.java
public class WordCount2 {
public static void main(String args[])
{
JobClient client = new JobClient();
JobConf conf = new JobConf(WordCount2.class);
FileInputFormat.addInputPath(conf,new Path(args[0]));
FileOutputFormat.setOutputPath(conf,new Path(args[1]));
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(LongWritable.class);
conf.setMapperClass(TokenCountMapper.class);//hadoop自己的TokenCountMapper
conf.setCombinerClass(LongSumReducer.class);//hadoop自己的LongSumReducer
conf.setReducerClass(LongSumReducer.class);
client.setConf(conf);
try
{
JobClient.runJob(conf);
}catch(Exception e)
{
e.printStackTrace();
}
}
}
|
|
|