package org.apache.hadoop.mapred;
/**
* Partitions the key space.
*
* Partitioner controls the partitioning of the keys of the
* intermediate map-outputs. The key (or a subset of the key) is used to derive
* the partition, typically by a hash function. The total number of partitions
* is the same as the number of reduce tasks for the job. Hence this controls
* which of the m reduce tasks the intermediate key (and hence the
* record) is sent for reduction.
*
* @see Reducer
* @deprecated Use {@link org.apache.hadoop.mapreduce.Partitioner} instead.
*/
@Deprecated
public interface Partitioner extends JobConfigurable {
/**
* Get the paritition number for a given key (hence record) given the total
* number of partitions i.e. number of reduce-tasks for the job.
*
* Typically a hash function on a all or a subset of the key.
*
* @param key the key to be paritioned.
* @param value the entry value.
* @param numPartitions the total number of partitions.
* @return the partition number for the key.
*/
int getPartition(K2 key, V2 value, int numPartitions);
}
2. HashPartitioner是mapreduce的默认partitioner。源代码如下:
package org.apache.hadoop.mapreduce.lib.partition;
import org.apache.hadoop.mapreduce.Partitioner;
/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner extends Partitioner {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
3. BinaryPatitioner继承于Partitioner,是Partitioner的偏特化子类。该类提供leftOffset和rightOffset,在计算which reducer时仅对键值K的[rightOffset,leftOffset]这个区间取hash。
package org.apache.hadoop.mapred.lib;
import java.io.UnsupportedEncodingException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.lib.KeyFieldHelper.KeyDescription;
/**
* Defines a way to partition keys based on certain key fields (also see
* {@link KeyFieldBasedComparator}.
* The key specification supported is of the form -k pos1[,pos2], where,
* pos is of the form f[.c][opts], where f is the number
* of the key field to use, and c is the number of the first character from
* the beginning of the field. Fields and character posns are numbered
* starting with 1; a character position of zero in pos2 indicates the
* field's last character. If '.c' is omitted from pos1, it defaults to 1
* (the beginning of the field); if omitted from pos2, it defaults to 0
* (the end of the field).
*
*/
public class KeyFieldBasedPartitioner implements Partitioner {
private static final Log LOG = LogFactory.getLog(KeyFieldBasedPartitioner.class.getName());
private int numOfPartitionFields;
private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
public void configure(JobConf job) {
String keyFieldSeparator = job.get("map.output.key.field.separator", "\t");
keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
if (job.get("num.key.fields.for.partition") != null) {
LOG.warn("Using deprecated num.key.fields.for.partition. " +
"Use mapred.text.key.partitioner.options instead");
this.numOfPartitionFields = job.getInt("num.key.fields.for.partition",0);
keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields);
} else {
String option = job.getKeyFieldPartitionerOption();
keyFieldHelper.parseOption(option);
}
}
public int getPartition(K2 key, V2 value,
int numReduceTasks) {
byte[] keyBytes;
List allKeySpecs = keyFieldHelper.keySpecs();
if (allKeySpecs.size() == 0) {
return getPartition(key.toString().hashCode(), numReduceTasks);
}
try {
keyBytes = key.toString().getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("The current system does not " +
"support UTF-8 encoding!", e);
}
// return 0 if the key is empty
if (keyBytes.length == 0) {
return 0;
}
int []lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0,
keyBytes.length);
int currentHash = 0;
for (KeyDescription keySpec : allKeySpecs) {
int startChar = keyFieldHelper.getStartOffset(keyBytes, 0, keyBytes.length,
lengthIndicesFirst, keySpec);
// no key found! continue
if (startChar < 0) {
continue;
}
int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length,
lengthIndicesFirst, keySpec);
currentHash = hashCode(keyBytes, startChar, endChar,
currentHash);
}
return getPartition(currentHash, numReduceTasks);
}
protected int hashCode(byte[] b, int start, int end, int currentHash) {
for (int i = start; i = k && comparator.compare(samples[last], samples[k]) == 0) {
++k;
}
writer.append(samples[k], nullValue);
last = k;
}
writer.close();
}
/**
* Driver for InputSampler from the command line.
* Configures a JobConf instance and calls {@link #writePartitionFile}.
*/
public int run(String[] args) throws Exception {
JobConf job = (JobConf) getConf();
ArrayList otherArgs = new ArrayList();
Sampler sampler = null;
for(int i=0; i < args.length; ++i) {
try {
if ("-r".equals(args)) {
job.setNumReduceTasks(Integer.parseInt(args[++i]));
} else if ("-inFormat".equals(args)) {
job.setInputFormat(
Class.forName(args[++i]).asSubclass(InputFormat.class));
} else if ("-keyClass".equals(args)) {
job.setMapOutputKeyClass(
Class.forName(args[++i]).asSubclass(WritableComparable.class));
} else if ("-splitSample".equals(args)) {
int numSamples = Integer.parseInt(args[++i]);
int maxSplits = Integer.parseInt(args[++i]);
if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
sampler = new SplitSampler(numSamples, maxSplits);
} else if ("-splitRandom".equals(args)) {
double pcnt = Double.parseDouble(args[++i]);
int numSamples = Integer.parseInt(args[++i]);
int maxSplits = Integer.parseInt(args[++i]);
if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
sampler = new RandomSampler(pcnt, numSamples, maxSplits);
} else if ("-splitInterval".equals(args)) {
double pcnt = Double.parseDouble(args[++i]);
int maxSplits = Integer.parseInt(args[++i]);
if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
sampler = new IntervalSampler(pcnt, maxSplits);
} else {
otherArgs.add(args);
}
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of " + args);
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " +
args[i-1]);
return printUsage();
}
}
if (job.getNumReduceTasks()