设为首页 收藏本站
查看: 1042|回复: 0

[经验分享] Hadoop中Partition深度解析

[复制链接]

尚未签到

发表于 2015-7-11 07:35:35 | 显示全部楼层 |阅读模式
  本文地址:http://www.iyunv.com/archimedes/p/hadoop-partitioner.html,转载请注明源地址。

旧版 API 的 Partitioner 解析
  Partitioner 的作用是对 Mapper 产生的中间结果进行分片,以便将同一分组的数据交给同一个 Reducer 处理,它直接影响 Reduce 阶段的负载均衡。旧版 API 中 Partitioner 的类图如图所示。它继承了JobConfigurable,可通过 configure 方法初始化。它本身只包含一个待实现的方法 getPartition。 该方法包含三个参数, 均由框架自动传入,前面两个参数是key/value,第三个参数 numPartitions 表示每个 Mapper 的分片数,也就是 Reducer 的个数。
DSC0000.jpg
  MapReduce 提供了两个Partitioner 实 现:HashPartitioner和TotalOrderPartitioner。其中 HashPartitioner 是默认实现,它实现了一种基于哈希值的分片方法,代码如下:



public int getPartition(K2 key, V2 value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
  TotalOrderPartitioner 提供了一种基于区间的分片方法,通常用在数据全排序中。在MapReduce 环境中,容易想到的全排序方案是归并排序,即在 Map 阶段,每个 Map Task进行局部排序;在 Reduce 阶段,启动一个 Reduce Task 进行全局排序。由于作业只能有一个 Reduce Task,因而 Reduce 阶段会成为作业的瓶颈。为了提高全局排序的性能和扩展性,MapReduce 提供了 TotalOrderPartitioner。它能够按照大小将数据分成若干个区间(分片),并保证后一个区间的所有数据均大于前一个区间数据,这使得全排序的步骤如下:
步骤1:数据采样。在 Client 端通过采样获取分片的分割点。Hadoop 自带了几个采样算法,如 IntercalSampler、 RandomSampler、 SplitSampler 等(具体见org.apache.hadoop.mapred.lib 包中的 InputSampler 类)。 下面举例说明。
采样数据为: b, abc, abd, bcd, abcd, efg, hii, afd, rrr, mnk
经排序后得到: abc, abcd, abd, afd, b, bcd, efg, hii, mnk, rrr
如果 Reduce Task 个数为 4,则采样数据的四等分点为 abd、 bcd、 mnk,将这 3 个字符串作为分割点。
步骤2:Map 阶段。本阶段涉及两个组件,分别是 Mapper 和 Partitioner。其中,Mapper 可采用 IdentityMapper,直接将输入数据输出,但 Partitioner 必须选用TotalOrderPartitioner,它将步骤 1 中获取的分割点保存到 trie 树中以便快速定位任意一个记录所在的区间,这样,每个 Map Task 产生 R(Reduce Task 个数)个区间,且区间之间有序。TotalOrderPartitioner 通过 trie 树查找每条记录所对应的 Reduce Task 编号。 如图所示, 我们将分割点 保存在深度为 2 的 trie 树中, 假设输入数据中 有两个字符串“ abg”和“ mnz”, 则字符串“ abg” 对应 partition1, 即第 2 个 Reduce Task, 字符串“ mnz” 对应partition3, 即第 4 个 Reduce Task。
DSC0001.jpg
步骤 3:Reduce 阶段。每个 Reducer 对分配到的区间数据进行局部排序,最终得到全排序数据。从以上步骤可以看出,基于 TotalOrderPartitioner 全排序的效率跟 key 分布规律和采样算法有直接关系;key 值分布越均匀且采样越具有代表性,则 Reduce Task 负载越均衡,全排序效率越高。TotalOrderPartitioner 有两个典型的应用实例: TeraSort 和 HBase 批量数据导入。 其中,TeraSort 是 Hadoop 自 带的一个应用程序实例。 它曾在 TB 级数据排序基准评估中 赢得第一名,而 TotalOrderPartitioner正是从该实例中提炼出来的。HBase 是一个构建在 Hadoop之上的 NoSQL 数据仓库。它以 Region为单位划分数据,Region 内部数据有序(按 key 排序),Region 之间也有序。很明显,一个 MapReduce 全排序作业的 R 个输出文件正好可对应 HBase 的 R 个 Region。

新版 API 的 Partitioner 解析
  新版 API 中的Partitioner类图如图所示。它不再实现JobConfigurable 接口。当用户需要让 Partitioner通过某个JobConf 对象初始化时,可自行实现Configurable 接口,如:



public class TotalOrderPartitioner extends Partitioner implements Configurable
DSC0002.jpg

Partition所处的位置
DSC0003.jpg
  Partition主要作用就是将map的结果发送到相应的reduce。这就对partition有两个要求:
  1)均衡负载,尽量的将工作均匀的分配给不同的reduce。
  2)效率,分配速度一定要快。

Mapreduce提供的Partitioner
DSC0004.jpg



patition类结构
  1. Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。源代码如下:



package org.apache.hadoop.mapred;
/**
* Partitions the key space.
*
* Partitioner controls the partitioning of the keys of the
* intermediate map-outputs. The key (or a subset of the key) is used to derive
* the partition, typically by a hash function. The total number of partitions
* is the same as the number of reduce tasks for the job. Hence this controls
* which of the m reduce tasks the intermediate key (and hence the
* record) is sent for reduction.
*
* @see Reducer
* @deprecated Use {@link org.apache.hadoop.mapreduce.Partitioner} instead.
*/
@Deprecated
public interface Partitioner extends JobConfigurable {
/**
* Get the paritition number for a given key (hence record) given the total
* number of partitions i.e. number of reduce-tasks for the job.
*   
* Typically a hash function on a all or a subset of the key.
*
* @param key the key to be paritioned.
* @param value the entry value.
* @param numPartitions the total number of partitions.
* @return the partition number for the key.
*/
int getPartition(K2 key, V2 value, int numPartitions);
}
  2. HashPartitioner是mapreduce的默认partitioner。源代码如下:



package org.apache.hadoop.mapreduce.lib.partition;
import org.apache.hadoop.mapreduce.Partitioner;
/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner extends Partitioner {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
  3. BinaryPatitioner继承于Partitioner,是Partitioner的偏特化子类。该类提供leftOffset和rightOffset,在计算which reducer时仅对键值K的[rightOffset,leftOffset]这个区间取hash。



reducer=(hash & Integer.MAX_VALUE) % numReduceTasks
  4. KeyFieldBasedPartitioner也是基于hash的个partitioner。和BinaryPatitioner不同,它提供了多个区间用于计算hash。当区间数为0时KeyFieldBasedPartitioner退化成HashPartitioner。 源代码如下:



package org.apache.hadoop.mapred.lib;
import java.io.UnsupportedEncodingException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.lib.KeyFieldHelper.KeyDescription;
/**   
*  Defines a way to partition keys based on certain key fields (also see
*  {@link KeyFieldBasedComparator}.
*  The key specification supported is of the form -k pos1[,pos2], where,
*  pos is of the form f[.c][opts], where f is the number
*  of the key field to use, and c is the number of the first character from
*  the beginning of the field. Fields and character posns are numbered
*  starting with 1; a character position of zero in pos2 indicates the
*  field's last character. If '.c' is omitted from pos1, it defaults to 1
*  (the beginning of the field); if omitted from pos2, it defaults to 0
*  (the end of the field).
*
*/
public class KeyFieldBasedPartitioner implements Partitioner {
private static final Log LOG = LogFactory.getLog(KeyFieldBasedPartitioner.class.getName());
private int numOfPartitionFields;
private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
public void configure(JobConf job) {
String keyFieldSeparator = job.get("map.output.key.field.separator", "\t");
keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
if (job.get("num.key.fields.for.partition") != null) {
LOG.warn("Using deprecated num.key.fields.for.partition. " +
"Use mapred.text.key.partitioner.options instead");
this.numOfPartitionFields = job.getInt("num.key.fields.for.partition",0);
keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields);
} else {
String option = job.getKeyFieldPartitionerOption();
keyFieldHelper.parseOption(option);
}
}
public int getPartition(K2 key, V2 value,
int numReduceTasks) {
byte[] keyBytes;
List  allKeySpecs = keyFieldHelper.keySpecs();
if (allKeySpecs.size() == 0) {
return getPartition(key.toString().hashCode(), numReduceTasks);
}
try {
keyBytes = key.toString().getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("The current system does not " +
"support UTF-8 encoding!", e);
}
// return 0 if the key is empty
if (keyBytes.length == 0) {
return 0;
}
int []lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0,
keyBytes.length);
int currentHash = 0;
for (KeyDescription keySpec : allKeySpecs) {
int startChar = keyFieldHelper.getStartOffset(keyBytes, 0, keyBytes.length,
lengthIndicesFirst, keySpec);
// no key found! continue
if (startChar < 0) {
continue;
}
int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length,
lengthIndicesFirst, keySpec);
currentHash = hashCode(keyBytes, startChar, endChar,
currentHash);
}
return getPartition(currentHash, numReduceTasks);
}
protected int hashCode(byte[] b, int start, int end, int currentHash) {
for (int i = start; i = k && comparator.compare(samples[last], samples[k]) == 0) {
++k;
}
writer.append(samples[k], nullValue);
last = k;
}
writer.close();
}
/**
* Driver for InputSampler from the command line.
* Configures a JobConf instance and calls {@link #writePartitionFile}.
*/
public int run(String[] args) throws Exception {
JobConf job = (JobConf) getConf();
ArrayList otherArgs = new ArrayList();
Sampler sampler = null;
for(int i=0; i < args.length; ++i) {
try {
if ("-r".equals(args)) {
job.setNumReduceTasks(Integer.parseInt(args[++i]));
} else if ("-inFormat".equals(args)) {
job.setInputFormat(
Class.forName(args[++i]).asSubclass(InputFormat.class));
} else if ("-keyClass".equals(args)) {
job.setMapOutputKeyClass(
Class.forName(args[++i]).asSubclass(WritableComparable.class));
} else if ("-splitSample".equals(args)) {
int numSamples = Integer.parseInt(args[++i]);
int maxSplits = Integer.parseInt(args[++i]);
if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
sampler = new SplitSampler(numSamples, maxSplits);
} else if ("-splitRandom".equals(args)) {
double pcnt = Double.parseDouble(args[++i]);
int numSamples = Integer.parseInt(args[++i]);
int maxSplits = Integer.parseInt(args[++i]);
if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
sampler = new RandomSampler(pcnt, numSamples, maxSplits);
} else if ("-splitInterval".equals(args)) {
double pcnt = Double.parseDouble(args[++i]);
int maxSplits = Integer.parseInt(args[++i]);
if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
sampler = new IntervalSampler(pcnt, maxSplits);
} else {
otherArgs.add(args);
}
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of " + args);
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " +
args[i-1]);
return printUsage();
}
}
if (job.getNumReduceTasks()

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-85302-1-1.html 上篇帖子: 通过ambari安装hadoop集群(一) 下篇帖子: Hadoop开发第3期---Hadoop的伪分布式安装
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表