设为首页 收藏本站
查看: 820|回复: 0

[经验分享] hadoop的mapreduce的join操作原理

[复制链接]

尚未签到

发表于 2016-12-11 07:34:56 | 显示全部楼层 |阅读模式
 


1. 概述


如果我们有如下的两个文件:
person.txt(字段是id nameaddressId):
1       tom     100
2       jme     101
3       kite    102
4       jack    100
5       tim     101
address.txt(字段是idname)
100     Beijing
101     Shanghai
102     Guangzhou
103     Shenzhen
最后需要输出person所在的位置信息,形式如下:
100         1     tom  Beijing
100         4     jack Beijing
101         2     jme  Shanghai
101         5     tim   Shanghai
102         3     kite  Guangzhou
 


2. 实现的原理


上述两个文件,若看成两张表,则连接字段是person.addressId= address.id,此处讨论的实现方式INNER JOIN


2.1. reduce side join的方式


 reduce side join是一种最简单的join方式,其主要思想如下:在map阶段,统一读取所有源的数据,例如,这里读取person.txtaddress.txtmap的输出的keyjoin的字段对应的值,输出的value是每条记录的其他字段值,为了对数据来源做区分,每个value值的基础上还需要增加一个标记值(tag),例如,来自person.txttag设置为0,来自address.txttag设置为1。相比普通的mapreduce任务,join操作的map输出的value值增加了一个tag标记值。在reduce阶段,reduce函数获取key对应的value列表,将value列表根据来源的tag进行分组,最后执行笛卡尔积进行输出,即reduce阶段进行实际的连接操作。
hadoopcontrib/data_join org.apache.hadoop.contrib.utils.join中已经提供了此种join方式的实现,基于上述包完成的join操作的代码如下:
SampleTaggedMapOutput.java
 

public class SampleTaggedMapOutput extends TaggedMapOutput {
private Text data;
public SampleTaggedMapOutput() {
this.data = new Text("");
}
public SampleTaggedMapOutput(Text data) {
this.data = data;
}
public Writable getData() {
return data;
}
public void write(DataOutput out) throws IOException {
this.tag.write(out);
this.data.write(out);
}
public void readFields(DataInput in) throws IOException {
this.tag.readFields(in);
this.data.readFields(in);
}
}
 
SampleDataJoinMapper.java
 

public class SampleDataJoinMapper extends DataJoinMapperBase {

@Override
protected Text generateInputTag(String filename) {
if (filename.contains("person")) {
return new Text("0");
} else {
return new Text("1");
}
}
protected Text generateGroupKey(TaggedMapOutput aRecord) {
String line = ((Text) aRecord.getData()).toString();
String groupKey = "";
String[] tokens = line.split("\\t");
if (this.inputTag.toString().equals("0")) {
groupKey = tokens[2];
} else {
groupKey = tokens[0];
}
return new Text(groupKey);
}
protected TaggedMapOutput generateTaggedMapOutput(Object value) {
TaggedMapOutput retv = new SampleTaggedMapOutput((Text) value);
retv.setTag(new Text(this.inputTag));
return retv;
}
}

 
SampleDataJoinReducer.java:
 

public class SampleDataJoinReducer extends DataJoinReducerBase {
/**
*
* @param tags
*          a list of source tags
* @param values
*          a value per source
* @return combined value derived from values of the sources
*/
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
// eliminate rows which didnot match in one of the two tables (for INNER JOIN)
if (tags.length < 2)
return null;
String joinedStr = "";
String [] p = null, a = null;
Text t = (Text) tags[0];
if (t.toString().equals("0")) {
p =  ((Text) (((TaggedMapOutput) values[0]).getData())).toString().split("\t");
a = ((Text) (((TaggedMapOutput) values[1]).getData())).toString().split("\t");
} else {
p =  ((Text) (((TaggedMapOutput) values[1]).getData())).toString().split("\t");
a = ((Text) (((TaggedMapOutput) values[0]).getData())).toString().split("\t");
}
joinedStr = p[0] + "\t" + p[1] + "\t" + a[1];
TaggedMapOutput retv = new SampleTaggedMapOutput(new Text(joinedStr));
retv.setTag((Text) tags[0]);
return retv;
}
}
 
小结:reduce side join技术是灵活的,但是大部分情况它会变得效率极低。由于join直到reduce()阶段才会开始,我们将会在网络中传递shuffle所有数据(执行copysort等动作),然而在大多数情况下,join阶段会丢掉大多数传递的数据。因此,得到的启示有两点:
1)如果确定是reduce-sidejoin,那么参与join的文件在map端尽可能先过滤掉无关的数据,例如针对特定的文件的projection/filtering,而不是传递到reduce节点后,在join时才做
2)是否可以直接在map端就完成join操作,答案是肯定的。


2.2 map side join的方式


1. 其中一个join的文件比较小,能够完全放进内存(In-Memory Hash Join
其实这个类似数据库中classic hash join的方式,就是一个是build input阶段,一个是probe input阶段,这里都在map端完成。build input阶段,读取小文件的数据到内存,构建一个hashtableprobe input阶段,读取大文件,通过查询hashtable实现join
具体在hadoop中,已有相关的类支持:
1)使用DistributedCache类,原理上就是在集群中的每个DataNode上的LocalFS都帮复制一份需要的小文件,然后你的每个Mapper进程读的就都是本地的那个文件,之后建立hashtable
2)在mapper的类里查询hashtable,同时实现join
针对上面那个例子:mapper类的操作如下:
 

public class MapClass extends MapReduceBase implements
Mapper {
private Hashtable<String, String> joinData = new Hashtable<String, String>();
@Override
public void configure(JobConf conf) {
try {
Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
if (cacheFiles != null && cacheFiles.length > 0) {
String line;
String[] tokens;
BufferedReader joinReader = new BufferedReader(new FileReader(
cacheFiles[0].toString()));
try {
while ((line = joinReader.readLine()) != null) {
tokens = line.split("\\t", 2);
joinData.put(tokens[0], tokens[1]);
}
} finally {
joinReader.close();
}
}
} catch (IOException e) {
System.err.println("Exception reading DistributedCache: " + e);
}
}
public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException {
Text t = (Text) value;
String [] fields = t.toString().split("\\t");
String address = joinData.get(fields[2]);
if (address != null) {
output.collect(new Text(fields[2]), new Text(fields[0] + "\t" + fields[1] + "\t" + address));
}
}
}
 
提交任务的类可以这么写:
 

DistributedCache.addCacheFile(new Path(“/address.txt”).toUri(), conf);
FileInputFormat.addInputPath(conf, new Path(“/person.txt”));
FileOutputFormat.setOutputPath(conf, new Path(“/join_out”));
conf.setMapperClass(MapClass.class);
conf.setNumReduceTasks(0);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(TextOutputFormat.class);
JobClient.runJob(conf);
 
 
2. join的最小的文件不能完全放进内存,仍然要实现map-sidejoin
题外话:数据库中,对这种情况的处理是使用grace hash join的算法。例如:SQL Server分别将build inputprobe input切分成多个分区部分(partition),每个partition都包括一个独立的、成对匹配的build inputprobe input,这样就将一个大的hash join切分成多个独立、互相不影响的hash join,每一个分区的hash join都能够在内存中完成。SQL Server将切分后的partition文件保存在磁盘上,每次装载一个分区的build inputprobe input到内存中,进行一次hash join
回来正题:
第一步的思考:最小的文件整个数据放不进内存中,那么是否可以考虑可以满足join条件的数据能否放进内存,从而实现In-Memory Hash Join的方式,实现方式很简单:选取小文件(例如名称为small_file),将其参与joinkey的数据抽取出来,保存到文件small_file_temp中,small_file_temp这里保证很小,可以完全放到内存中,因此后续的工作时使用small_file_temp文件和大文件进行In-Memory Hash Join
进一步的思考:
其实保存数据的时候,可以先分区,就是可以根据数据的特征,先将数据划分为多个文件进行保存,实际处理时,只处理其中的一部分数据,这样预先就减少了处理的数据量。
 
如果已经分区了,但最终的小文件是还是放不进内存的,那么还可以怎么做?
方式一:特定的场景下,可以使用bloomfilter
条件1:小表对于大表中的数据而言,仅仅用于判断大表的数据是否满足join条件,不再从小表中获取其他数据
条件2:不要求数据100%准确,因为bloomfilter存在一定的误判。
可以看到,方式一的最终目的仍然是将操作转换为In-Memory Hash Join的方式。
 
方式二:能否参考数据库中的grace hash join的方式?答案是肯定的。实际的做法是,在保存数据的时候,在分区的基础上(partition),再分桶(bucket),小文件和大文件都拆成很多bucket,小文件和大文件之间的桶的个数只要相互之间是倍数关系即可(是倍数关系,即可相互映射),一般情况下,小文件的桶个数是大文件的2的倍数。这里以hive的实现进行说明:在提交任务之前,先在本地执行一个任务,生成每个大表的桶文件对应的小表的桶文件的hashtable,之后将这些hashtable文件分发到map节点,map节点在执行任务的,根据输入的大表的桶文件,加载进来相应的hashtable文件,之后执行join操作。
注意:分发数据本身就存在一定的代价的,因此若分发数据消耗的时间较多,也许此时reduce-side join的会更快。
 
3. sort merge join的实现
上述所讲的操作均没有提及数据是否有序,若数据有序的情况下,完全可以执行类似merge的操作,一边读取数据,一边做join操作。此种join方式不再进一步的探讨。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-312480-1-1.html 上篇帖子: hadoop中4种压缩格式的特征的比较 下篇帖子: ubuntu11.04上cloudera cdh3u0的hadoop和hbase分布式安装
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表