public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
重写改方法即可:
private static class MyPartitioner extends Partitioner<Text,IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numReduceTasks) {
if (key.toString().compareTo("aa") >= 0 && key.toString().compareTo("kk") <= 0) {
return 0;
} else {
return 1;
}
}
}
设定conf和job参数: