设为首页 收藏本站
查看: 1131|回复: 0

[经验分享] MongoDB MapReduce 性能提升20倍的优化宝典

[复制链接]

尚未签到

发表于 2018-10-27 07:45:52 | 显示全部楼层 |阅读模式
  转自:http://www.iteye.com/news/28013
  自从MongoDB被越来越多的大型关键项目采用后,数据分析也成为了越来越重要的话题。人们似乎已经厌倦了使用不同的软件来进行分析(这都利用到了Hadoop),因为这些方法往往需要大规模的数据传输,而这些成本相当昂贵。
  MongoDB提供了2种方式来对数据进行分析:Map Reduce(以下简称MR)和聚合框架(Aggregation Framework)。MR非常灵活且易于使用,它可以很好地与分片(sharding)结合使用,并允许大规模输出。尽管在MongoDB v2.4版本中,由于JavaScript引擎从Spider切换到了V8,使得MR的性能有了大幅改进,但是与Agg Framework(使用C++)相比,MR的速度还是显得比较慢。本文就来看看,有哪些方法可以让MR的速度有所提升。
  测试
  首先我们来做个测试,插入1000万文档,这些文档中包含了介于0和100万之间的单一整数值,这意味着,平均每10个文档具有相同的值。
  代码

  •   > for (var i = 0; i < 10000000; ++i){ db.uniques.insert({ dim0: Math.floor(Math.random()*1000000) });}
  •   > db.uniques.findOne()
  •   { "_id" : ObjectId("51d3c386acd412e22c188dec"), "dim0" : 570859 }
  •   > db.uniques.ensureIndex({dim0: 1})
  •   > db.uniques.stats()
  •   {
  •   "ns" : "test.uniques",
  •   "count" : 10000000,
  •   "size" : 360000052,
  •   "avgObjSize" : 36.0000052,
  •   "storageSize" : 582864896,
  •   "numExtents" : 18,
  •   "nindexes" : 2,
  •   "lastExtentSize" : 153874432,
  •   "paddingFactor" : 1,
  •   "systemFlags" : 1,
  •   "userFlags" : 0,
  •   "totalIndexSize" : 576040080,
  •   "indexSizes" : {
  •   "_id_" : 324456384,
  •   "dim0_1" : 251583696
  •   },
  •   "ok" : 1
  •   }
  这里我们想要得到文档中唯一值的计数,可以通过下面的MR任务来轻松完成:
  代码

  •   > db.runCommand(
  •   { mapreduce: "uniques",
  •   map: function () { emit(this.dim0, 1); },
  •   reduce: function (key, values) { return Array.sum(values); },
  •   out: "mrout" })
  •   {
  •   "result" : "mrout",
  •   "timeMillis" : 1161960,
  •   "counts" : {
  •   "input" : 10000000,
  •   "emit" : 10000000,
  •   "reduce" : 1059138,
  •   "output" : 999961
  •   },
  •   "ok" : 1
  •   }
  正如你看到的,输出结果大约需要1200秒(在EC2 M3实例上测试),共输出了1千万maps、100万reduces、999961个文档。结果类似于:
  代码

  •   > db.mrout.find()
  •   { "_id" : 1, "value" : 10 }
  •   { "_id" : 2, "value" : 5 }
  •   { "_id" : 3, "value" : 6 }
  •   { "_id" : 4, "value" : 10 }
  •   { "_id" : 5, "value" : 9 }
  •   { "_id" : 6, "value" : 12 }
  •   { "_id" : 7, "value" : 5 }
  •   { "_id" : 8, "value" : 16 }
  •   { "_id" : 9, "value" : 10 }
  •   { "_id" : 10, "value" : 13 }
  •   ...
  下面就来看看如何进行优化。
  使用排序
  我在之前的这篇文章中简要说明了使用排序对于MR的好处,这是一个鲜为人知的特性。在这种情况下,如果处理未排序的输入,意味着MR引擎将得到随机排序的值,基本上没有机会在RAM中进行reduce,相反,它将不得不通过一个临时collection来将数据写回磁盘,然后按顺序读取并进行reduce。
  下面来看看如果使用排序,会有什么帮助:
  代码

  •   > db.runCommand(
  •   { mapreduce: "uniques",
  •   map: function () { emit(this.dim0, 1); },
  •   reduce: function (key, values) { return Array.sum(values); },
  •   out: "mrout",
  •   sort: {dim0: 1} })
  •   {
  •   "result" : "mrout",
  •   "timeMillis" : 192589,
  •   "counts" : {
  •   "input" : 10000000,
  •   "emit" : 10000000,
  •   "reduce" : 1000372,
  •   "output" : 999961
  •   },
  •   "ok" : 1
  •   }
  现在时间降到了192秒,速度提升了6倍。其实reduces的数量是差不多的,但是它们在被写入磁盘之前已经在RAM中完成了。
  使用多线程
  在MongoDB中,一个单一的MR任务并不能使用多线程——只有在多个任务中才能使用多线程。但是目前的多核CPU非常有利于在单一服务器上进行并行化工作,就像Hadoop。我们需要做的是,将输入数据分割成若干块,并为每个块分配一个MR任务。splitVector命令可以帮助你非常迅速地找到分割点,如果你有更简单的分割方法更好。
  代码

  •   > db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32000000})
  •   {
  •   "timeMillis" : 6006,
  •   "splitKeys" : [
  •   {
  •   "dim0" : 18171
  •   },
  •   {
  •   "dim0" : 36378
  •   },
  •   {
  •   "dim0" : 54528
  •   },
  •   {
  •   "dim0" : 72717
  •   },
  •   …
  •   {
  •   "dim0" : 963598
  •   },
  •   {
  •   "dim0" : 981805
  •   }
  •   ],
  •   "ok" : 1
  •   }
  从1千万文档中找出分割点,使用splitVector命令只需要大约5秒,这已经相当快了。所以,下面我们需要做的是找到一种方式来创建多个MR任务。从应用服务器方面来说,使用多线程和$gt / $lt查询命令会非常方便。从shell方面来说,可以使用ScopedThread对象,它的工作原理如下:
  代码

  •   > var t = new ScopedThread(mapred, 963598, 981805)
  •   > t.start()
  •   > t.join()
  现在我们可以放入一些JS代码,这些代码可以产生4个线程,下面来等待结果显示:
  代码

  •   > var res = db.runCommand({splitVector: "test.uniques", keyPattern: {dim0: 1}, maxChunkSizeBytes: 32 *1024 * 1024 })
  •   > var keys = res.splitKeys
  •   > keys.length
  •   39
  •   > var mapred = function(min, max) {
  •   return db.runCommand({ mapreduce: "uniques",
  •   map: function () { emit(this.dim0, 1); },
  •   reduce: function (key, values) { return Array.sum(values); },
  •   out: "mrout" + min,
  •   sort: {dim0: 1},
  •   query: { dim0: { $gte: min, $lt: max } } }) }
  •   > var numThreads = 4
  •   > var inc = Math.floor(keys.length / numThreads) + 1
  •   > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
  •   min:0 max:274736
  •   min:274736 max:524997
  •   min:524997 max:775025
  •   min:775025 max:{ "$maxKey" : 1 }
  •   connecting to: test
  •   connecting to: test
  •   connecting to: test
  •   connecting to: test
  •   > for (var i in threads) { var t = threads; t.join(); printjson(t.returnData()); }
  •   {
  •   "result" : "mrout0",
  •   "timeMillis" : 205790,
  •   "counts" : {
  •   "input" : 2750002,
  •   "emit" : 2750002,
  •   "reduce" : 274828,
  •   "output" : 274723
  •   },
  •   "ok" : 1
  •   }
  •   {
  •   "result" : "mrout274736",
  •   "timeMillis" : 189868,
  •   "counts" : {
  •   "input" : 2500013,
  •   "emit" : 2500013,
  •   "reduce" : 250364,
  •   "output" : 250255
  •   },
  •   "ok" : 1
  •   }
  •   {
  •   "result" : "mrout524997",
  •   "timeMillis" : 191449,
  •   "counts" : {
  •   "input" : 2500014,
  •   "emit" : 2500014,
  •   "reduce" : 250120,
  •   "output" : 250019
  •   },
  •   "ok" : 1
  •   }
  •   {
  •   "result" : "mrout775025",
  •   "timeMillis" : 184945,
  •   "counts" : {
  •   "input" : 2249971,
  •   "emit" : 2249971,
  •   "reduce" : 225057,
  •   "output" : 224964
  •   },
  •   "ok" : 1
  •   }
  第1个线程所做的工作比其他的要多一点,但时间仍达到了190秒,这意味着多线程并没有比单线程快!
  使用多个数据库
  这里的问题是,线程之间存在太多锁争用。当锁时,MR不是非常无私(每1000次读取会进行yield)。由于MR任务做了大量写操作,线程之间结束时会等待彼此。由于MongoDB的每个数据库都有独立的锁,那么让我们来尝试为每个线程使用不同的输出数据库:
  代码

  •   > var mapred = function(min, max) {
  •   return db.runCommand({ mapreduce: "uniques",
  •   map: function () { emit(this.dim0, 1); },
  •   reduce: function (key, values) { return Array.sum(values); },
  •   out: { replace: "mrout" + min, db: "mrdb" + min },
  •   sort: {dim0: 1},
  •   query: { dim0: { $gte: min, $lt: max } } }) }
  •   > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
  •   min:0 max:274736
  •   min:274736 max:524997
  •   min:524997 max:775025
  •   min:775025 max:{ "$maxKey" : 1 }
  •   connecting to: test
  •   connecting to: test
  •   connecting to: test
  •   connecting to: test
  •   > for (var i in threads) { var t = threads; t.join(); printjson(t.returnData()); }
  •   ...
  •   {
  •   "result" : {
  •   "db" : "mrdb274736",
  •   "collection" : "mrout274736"
  •   },
  •   "timeMillis" : 105821,
  •   "counts" : {
  •   "input" : 2500013,
  •   "emit" : 2500013,
  •   "reduce" : 250364,
  •   "output" : 250255
  •   },
  •   "ok" : 1
  •   }
  •   ...
  所需时间减少到了100秒,这意味着与一个单独的线程相比,速度约提高2倍。尽管不如预期,但已经很不错了。在这里,我使用了4个核心,只提升了2倍,如果使用8核CPU,大约会提升4倍。
  使用纯JavaScript模式
  在线程之间分割输入数据时,有一些非常有趣的东西:每个线程只拥有约25万主键来输出,而不是100万。这意味着我们可以使用“纯JS模式”——通过jsMode:true来启用。开启后,MongoDB不会在JS和BSON之间反复转换,相反,它会从内部的一个50万主键的JS字典来reduces所有对象。下面来看看该操作是否对速度提升有帮助。
  代码

  •   > var mapred = function(min, max) {
  •   return db.runCommand({ mapreduce: "uniques",
  •   map: function () { emit(this.dim0, 1); },
  •   reduce: function (key, values) { return Array.sum(values); },
  •   out: { replace: "mrout" + min, db: "mrdb" + min },
  •   sort: {dim0: 1},
  •   query: { dim0: { $gte: min, $lt: max } },
  •   jsMode: true }) }
  •   > threads = []; for (var i = 0; i < numThreads; ++i) { var min = (i == 0) ? 0 : keys[i * inc].dim0; var max = (i * inc + inc >= keys.length) ? MaxKey : keys[i * inc + inc].dim0 ; print("min:" + min + " max:" + max); var t = new ScopedThread(mapred, min, max); threads.push(t); t.start() }
  •   min:0 max:274736
  •   min:274736 max:524997
  •   min:524997 max:775025
  •   min:775025 max:{ "$maxKey" : 1 }
  •   connecting to: test
  •   connecting to: test
  •   connecting to: test
  •   connecting to: test
  •   > for (var i in threads) { var t = threads; t.join(); printjson(t.returnData()); }
  •   ...
  •   {
  •   "result" : {
  •   "db" : "mrdb274736",
  •   "collection" : "mrout274736"
  •   },
  •   "timeMillis" : 70507,
  •   "counts" : {
  •   "input" : 2500013,
  •   "emit" : 2500013,
  •   "reduce" : 250156,
  •   "output" : 250255
  •   },
  •   "ok" : 1
  •   }
  •   ...
  现在时间降低到70秒。看来jsMode确实有帮助,尤其是当对象有很多字段时。该示例中是一个单一的数字字段,不过仍然提升了30%。
  MongoDB v2.6版本中的改进
  在MongoDB v2.6版本的开发中,移除了一段关于在JS函数调用时的一个可选“args”参数的代码。该参数是不标准的,也不建议使用,它由于历史原因遗留了下来(见SERVER-4654)。让我们从Git库中pull最新的MongoDB并编译,然后再次运行测试用例:
  代码

  •   ...
  •   {
  •   "result" : {
  •   "db" : "mrdb274736",
  •   "collection" : "mrout274736"
  •   },
  •   "timeMillis" : 62785,
  •   "counts" : {
  •   "input" : 2500013,
  •   "emit" : 2500013,
  •   "reduce" : 250156,
  •   "output" : 250255
  •   },
  •   "ok" : 1
  •   }
  •   ...
  从结果来看,时间降低到了60秒,速度大约提升了10-15%。同时,这种更改也改善了JS引擎的整体堆消耗量。
  结论
  回头来看,对于同样的MR任务,与最开始时的1200秒相比,速度已经提升了20倍。这种优化应该适用于大多数情况,即使一些技巧效果不那么理想(比如使用多个输出dbs /集合)。但是这些技巧可以帮助人们来提升MR任务的速度,未来这些特性也许会更加易用——比如,这个ticket 将会使splitVector命令更加可用,这个ticket将会改进同一数据库中的多个MR任务。


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-626928-1-1.html 上篇帖子: mongodb集群与分片的配置说明 下篇帖子: mongodb连接池
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表