本节我们来学习MapReduce编程框架中的Partitioner接口和其他相关的信息。
Partitioner的作用就是对Mapper产生的中间数据进行分片,以便将同一分片的数据交给同一个Reducer处理,该过程是MapReduce的shuffle过程,特别是Map端的shuffle的一部分。 Partitioner它直接影响Reduce阶段的负责均衡。在老版中,Partitioner是一个接口,继承了JobConfigurable接口,代码如下:
/**
* Partitions the key space.
*
* <p><code>Partitioner</code> controls the partitioning of the keys of the
* intermediate map-outputs. The key (or a subset of the key) is used to derive
* the partition, typically by a hash function. The total number of partitions
* is the same as the number of reduce tasks for the job. Hence this controls
* which of the <code>m</code> reduce tasks the intermediate key (and hence the
* record) is sent for reduction.</p>
*
* @see Reducer
*/
public interface Partitioner<K2, V2> extends JobConfigurable {
/**
* Get the paritition number for a given key (hence record) given the total
* number of partitions i.e. number of reduce-tasks for the job.
*
* <p>Typically a hash function on a all or a subset of the key.</p>
*
* @param key 基于该key对数据分组儿.
* @param value the entry value.
* @param numPartitions 每个Mapper的分片数.
* @return the partition number for the <code>key</code>.
*/
int getPartition(K2 key, V2 value, int numPartitions);
}
MapReduce提供了两个Partitioner实现类:HasdPartitioner和TotalOrderPartitioner。
默认情况下,MapReduce分布式计算模型使用的是HasdPartitioner,它实现了一种基于Hash值的分片算法:
/** Partition keys by their {@link Object#hashCode()}.
*/
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
public void configure(JobConf job) {}
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K2 key, V2 value,int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}