设为首页 收藏本站
查看: 1174|回复: 0

[经验分享] Hadoop Mapreduce 连接(Join)之二:复制连接(Replication join)

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2014-3-6 11:08:42 | 显示全部楼层 |阅读模式
本帖最后由 4rrr 于 2014-3-6 11:13 编辑

4.1.2 复制连接(Replication join)
复制连接是map端的连接。复制连接得名于它的具体实现:连接中最小的数据集将会被复制到所有的map主机节点。复制连接有一个假设前提:在被连接的数据集中,有一个数据集足够小到可以缓存在内存中。
如图4.5所示,MapReduce复制连接工作原理如下:
  • 我们将使用分布式缓存(Districubted cache)将这个小数据集复制到所有运行map任务的节点。
  • 用各个map任务初始化方法将这个小数据集装载到一个哈希表(hashtable)中。
  • 逐条用大数据集中的记录遍历这个哈希表,逐个判断是否符合连接条件。
  • 输出符合连接条件的结果。

032044210022764.jpg

复制连接的实现非常直接明了。更具体的内容可以参考《Hadoop in Action》。附录D提供了一个通用的框架来实现复制连接。这个框架支持任意类型的InputFormat和OutputFormat的数据。(我们将在下一个技术中使用这个框架。)复制连接框架根据内存足迹的大小从分布式缓存的内容和输入块(input split)两者中动态地决定需要缓存的对象。
如果所有的输入数据集都不能够小到可以放到缓存中,那有没有办法来优化map端连接呢?那就到了看半连接(semi-join)的时间了。
附录D.2 一个复制连接框架
复制连接是map端连接,得名于它的具体实现:连接中最小的数据集将会被复制到所有的map主机节点。复制连接的实现非常直接明了。更具体的内容可以参考Chunk Lam的《Hadoop in Action》。
这个部分的目标是:创建一个可以支持任意类型的数据集的通用的复制连接框架。这个框架中提供了一个优化的小功能:动态监测分布式缓存内容和输入块的大小,并判断哪个更大。如果输入块较小,那么你就需要将map的输入块放到内存缓冲中,然后在mapper的cleanup方法中执行连接操作了。
图D.4是这个框架的类图,这里我提供了连接类( GenericReplicatedJoin)的具体实现,而不仅仅是一个抽象类。在这个框架外,这个类将和 KeyValueTextInputFormat 及 TextOutputFormat 协作。这里有一个假设前提:每个数据文件的第一个标记是连接键。此外,连接类也可以被继承扩展来支持任意类型的输入和输出。

032112335749651.jpg

图D.5是连接框架的算法。Mapper的setup方法判断在map的输入块和分布式缓存的内容中哪个大。如果分布式缓存的内容比较小,那么它将被装载到内存缓存中。Map函数开始连接操作。如果输入块比较小,map函数将输入块的键\值对装载到内存缓存中。Map的cleanup方法将从分布式缓存中读取记录,逐条记录和在内存缓存中的键\值对进行连接操作。

032124553098388.jpg

以下代码的 GenericReplicatedJoin 中的setup方法是在map的初始化阶段调用的。这个方法判断分布式缓存中的文件和输入块哪个大。如果文件比较小,则将文件装载到HashMap中。

@Override
protected void setup(Context context)
    throws IOException, InterruptedException {

    distributedCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
    int distCacheSizes = 0;

    for (Path distFile : distributedCacheFiles) {
        File distributedCacheFile = new File(distFile.toString());
        distCacheSizes += distributedCacheFile.length();
    }

    if(context.getInputSplit() instanceof FileSplit) {
        FileSplit split = (FileSplit) context.getInputSplit();
        long inputSplitSize = split.getLength();
        distributedCacheIsSmaller = (distCacheSizes < inputSplitSize);
    } else {
        distributedCacheIsSmaller = true;
    }

    if (distributedCacheIsSmaller) {
        for (Path distFile : distributedCacheFiles) {
            File distributedCacheFile = new File(distFile.toString());
            DistributedCacheFileReader reader = getDistributedCacheReader();
            reader.init(distributedCacheFile);

            for (Pair p : (Iterable<Pair>) reader) {
                addToCache(p);
            }

            reader.close();
        }
    }
}


Map方法将会根据setup方法是否将分布式缓存的内容装载到内存的缓存中来选择行为。如果分布式缓存中的内容被装载到内存中,那么map方法就将输入块的记录和内存中的缓存做连接操作。如果分布式缓存中的内容没有被装载到内存中,那么map方法就将输入块的记录装载到内存中,然后在cleanup方法中使用。

@Override
protected void map(Object key, Object value, Context context)
    throws IOException, InterruptedException {
    Pair pair = readFromInputFormat(key, value);

    if (distributedCacheIsSmaller) {
        joinAndCollect(pair, context);
    } else {
        addToCache(pair);
    }
}

public void joinAndCollect(Pair p, Context context)
    throws IOException, InterruptedException {
    List<Pair> cached = cachedRecords.get(p.getKey());

    if (cached != null) {
        for (Pair cp : cached) {
            Pair result;

            if (distributedCacheIsSmaller) {
                result = join(p, cp);
            } else {
                result = join(cp, p);
            }

            if (result != null) {
                context.write(result.getKey(), result.getData());
            }
        }
    }
}

public Pair join(Pair inputSplitPair, Pair distCachePair) {
    StringBuilder sb = new StringBuilder();

    if (inputSplitPair.getData() != null) {
        sb.append(inputSplitPair.getData());
    }

    sb.append("\t");

    if (distCachePair.getData() != null) {
        sb.append(distCachePair.getData());
    }

    return new Pair<Text, Text>(
                new Text(inputSplitPair.getKey().toString()),
                new Text(sb.toString()));
}


当所有的记录都被传输给map方法后,MapReduce将会调用cleanup方法。如果分布式缓存中的内容比输入块大,连接将会在cleanup中进行。连接的对象是map函数的缓存中的输入块的记录和分布式缓存中的记录。

@Override
protected void cleanup(Context context)
    throws IOException, InterruptedException {

    if (!distributedCacheIsSmaller) {

        for (Path distFile : distributedCacheFiles) {
            File distributedCacheFile = new File(distFile.toString());
            DistributedCacheFileReader reader = getDistributedCacheReader();
            reader.init(distributedCacheFile);

            for (Pair p : (Iterable<Pair>) reader) {
                joinAndCollect(p, context);
            }

            reader.close();
        }
    }
}


最后,作业的驱动代码必须指定需要装载到分布式缓存中的文件。以下的代码可以处理一个文件,也可以处理MapReduce输入结果的一个目录。

Configuration conf = new Configuration();

FileSystem fs = smallFilePath.getFileSystem(conf);
FileStatus smallFilePathStatus = fs.getFileStatus(smallFilePath);

if(smallFilePathStatus.isDir()) {
    for(FileStatus f: fs.listStatus(smallFilePath)) {
        if(f.getPath().getName().startsWith("part")) {
            DistributedCache.addCacheFile(f.getPath().toUri(), conf);
        }
    }
} else {
    DistributedCache.addCacheFile(smallFilePath.toUri(), conf);
}

这个框架假设分布式缓存中的内容和输入块的内容都可以被装载到内存中。这个框架的优点在于它只会将两者之中较小的装载到内存中。
在论文《A Comparison of Join Algorithms for Log Processing in MapReduce》中,你可以看到这个方法对于分布式缓存中的内容较大时的场景的更进一步的优化。在他们的优化中,他们将分布式缓存分成N个分区,并将输入块放入N个哈希表。这样在cleanup方法中的优化就更加高效。
在map端的复制连接的问题在于,map任务必须在启动时读取分布式缓存。上述论文提到的另一个优化方案是重载FileInputFormat的splitting。将存在于同一个主机上的输入块合并成一个块。然后就可以减少需要装载分布式缓存的map任务的个数了。
最后一个说明,Hadoop在org.apache.hadoop.mapred.join包中自带了map端的连接。但是它需要有序的待连接的数据集的输入文件,并要求将其分发到相同的分区中。这样就造成了繁重的预处理工作。


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-15506-1-1.html 上篇帖子: Hadoop Mapreduce 连接(Join)之一:重分区连接(Repartition join) 下篇帖子: Hadoop Mapreduce 连接(Join)之三:半连接(Semi-join)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表