reduce side join是一种最简单的join方式,其主要思想如下:在map阶段,统一读取所有源的数据,例如,这里读取person.txt和address.txt;map的输出的key是join的字段对应的值,输出的value是每条记录的其他字段值,为了对数据来源做区分,每个value值的基础上还需要增加一个标记值(tag),例如,来自person.txt的tag设置为0,来自address.txt的tag设置为1。相比普通的mapreduce任务,join操作的map输出的value值增加了一个tag标记值。在reduce阶段,reduce函数获取key对应的value列表,将value列表根据来源的tag进行分组,最后执行笛卡尔积进行输出,即reduce阶段进行实际的连接操作。
hadoop的contrib/data_join 包org.apache.hadoop.contrib.utils.join中已经提供了此种join方式的实现,基于上述包完成的join操作的代码如下:
SampleTaggedMapOutput.java:
public class SampleTaggedMapOutput extends TaggedMapOutput {
private Text data;
public SampleTaggedMapOutput() {
this.data = new Text("");
}
public SampleTaggedMapOutput(Text data) {
this.data = data;
}
public Writable getData() {
return data;
}
public void write(DataOutput out) throws IOException {
this.tag.write(out);
this.data.write(out);
}
public void readFields(DataInput in) throws IOException {
this.tag.readFields(in);
this.data.readFields(in);
}
}
SampleDataJoinMapper.java:
public class SampleDataJoinMapper extends DataJoinMapperBase {
@Override
protected Text generateInputTag(String filename) {
if (filename.contains("person")) {
return new Text("0");
} else {
return new Text("1");
}
}
protected Text generateGroupKey(TaggedMapOutput aRecord) {
String line = ((Text) aRecord.getData()).toString();
String groupKey = "";
String[] tokens = line.split("\\t");
if (this.inputTag.toString().equals("0")) {
groupKey = tokens[2];
} else {
groupKey = tokens[0];
}
return new Text(groupKey);
}
protected TaggedMapOutput generateTaggedMapOutput(Object value) {
TaggedMapOutput retv = new SampleTaggedMapOutput((Text) value);
retv.setTag(new Text(this.inputTag));
return retv;
}
}
SampleDataJoinReducer.java:
public class SampleDataJoinReducer extends DataJoinReducerBase {
/**
*
* @param tags
* a list of source tags
* @param values
* a value per source
* @return combined value derived from values of the sources
*/
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
// eliminate rows which didnot match in one of the two tables (for INNER JOIN)
if (tags.length < 2)
return null;
String joinedStr = "";
String [] p = null, a = null;
Text t = (Text) tags[0];
if (t.toString().equals("0")) {
p = ((Text) (((TaggedMapOutput) values[0]).getData())).toString().split("\t");
a = ((Text) (((TaggedMapOutput) values[1]).getData())).toString().split("\t");
} else {
p = ((Text) (((TaggedMapOutput) values[1]).getData())).toString().split("\t");
a = ((Text) (((TaggedMapOutput) values[0]).getData())).toString().split("\t");
}
joinedStr = p[0] + "\t" + p[1] + "\t" + a[1];
TaggedMapOutput retv = new SampleTaggedMapOutput(new Text(joinedStr));
retv.setTag((Text) tags[0]);
return retv;
}
}
小结:reduce side join技术是灵活的,但是大部分情况它会变得效率极低。由于join直到reduce()阶段才会开始,我们将会在网络中传递shuffle所有数据(执行copy,sort等动作),然而在大多数情况下,join阶段会丢掉大多数传递的数据。因此,得到的启示有两点: (1)如果确定是reduce-side的join,那么参与join的文件在map端尽可能先过滤掉无关的数据,例如针对特定的文件的projection/filtering,而不是传递到reduce节点后,在join时才做 (2)是否可以直接在map端就完成join操作,答案是肯定的。