设为首页 收藏本站
查看: 818|回复: 0

[经验分享] MongoDB 的 MapReduce 大数据统计统计挖掘

[复制链接]

尚未签到

发表于 2015-7-8 12:05:06 | 显示全部楼层 |阅读模式
  
  MongoDB虽然不像我们常用的mysql,sqlserver,oracle等关系型数据库有group by函数那样方便分组,但是MongoDB要实现分组也有3个办法:
  
  
  
* Mongodb三种分组方式:
  
* 1、group(先筛选再分组,不支持分片,对数据量有所限制,效率不高)
  
* 2、mapreduce(基于js引擎,单线程执行,效率较低,适合用做后台统计等)
  
* 3、aggregate(推荐) (如果你的PHP的mongodb驱动版本需>=1.3.0,推荐你使用aggregate,性能要高很多,并且使用上要简单些,不过1.3的目前还不支持账户认证模式,可以通过http://pecl.php.net/package/mongo查看更新日志和Bug)
  

  
下面就来看下mapreduce方式:
  

  
  Mongodb官网对MapReduce介绍:
  
  Map/reduce in MongoDB is useful for batch processing of data and aggregation operations. It is similar in spirit to using something like Hadoop with all input coming from a collection and output going to a collection. Often, in a situation where you would have used GROUP BY in SQL, map/reduce is the right tool in MongoDB.
  

  


大致意思是:Mongodb中的Map/reduce主要是用来对数据进行批量处理和聚合操作,有点类似于使用Hadoop对集合数据进行处理,所有输入数据都是从集合中获取,而MapReduce后输出的数据也都会写入到集合中。通常类似于我们在SQL中使用Group By语句一样。  


使用MapReduce要实现两个函数:Map和Reduce。Map函数调用emit(key,value)遍历集合中所有的记录,将key与value传给Reduce函数进行处理。Map函数和Reduce函数是使用Javascript编写的,并可以通过db.runCommand或mapreduce命令来执行MapReduce操作。
  


  


MapReduce命令如下:  








[javascript] view plaincopy



  • db.runCommand(  
  • { mapreduce : ,  
  •    map : ,  
  •    reduce :   
  •    [, query : ]  
  •    [, sort : ]  
  •    [, limit : ]  
  •    [, out : ]  
  •    [, keeptemp: ]  
  •    [, finalize : ]  
  •    [, scope : ]  
  •    [, verbose : true]  
  • }  
  • );  

  参数说明:
  mapreduce:要操作的目标集合
  map:映射函数(生成键值对序列,作为Reduce函数的参数)
  reduce:统计函数
  query:目标记录过滤
  sort:对目标记录排序
  limit:限制目标记录数量
  out:统计结果存放集合(如果不指定则使用临时集合,在客户端断开后自动删除)
  keeptemp:是否保留临时集合
  finalize:最终处理函数(对reduce返回结果执行最终整理后存入结果集合)
  scope:向map、reduce、finalize导入外部变量
  verbose:显示详细的时间统计信息
  


map函数
map函数调用当前对象,并处里对象的属性,传值给reduce,map方法使用this来操作当前对象,最少调用一次emit(key,value)方法来向reduce提供参数,其中emit的key为最终数据的id。
  


  


reduce函数
接收一个值和数组,根据需要对数组进行合并分组等处理,reduce的key就是emit(key,value)的key,value_array是同个key对应的多个value数组。
  


  


Finalize函数
此函数为可选函数,可在执行完map和reduce后执行,对最后的数据进行统一处理。
  


  


看完基本介绍,我们再来看一个实例:  


  


已知集合feed,测试数据如下:  








[javascript] view plaincopy



  • {  
  •    "_id": ObjectId("50ccb3f91e937e2927000004"),  
  •    "feed_type": 1,  
  •    "to_user": 234,  
  •    "time_line": "2012-12-16 01:26:00"  
  • }  
  •   
  • {  
  •    "_id": ObjectId("50ccb3ef1e937e0727000004"),  
  •    "feed_type": 8,  
  •    "to_user": 123,  
  •    "time_line": "2012-12-16 01:26:00"  
  • }  
  •   
  • {  
  •    "_id": ObjectId("50ccb3e31e937e0a27000003"),  
  •    "feed_type": 1,  
  •    "to_user": 123,  
  •    "time_line": "2012-12-16 01:26:00"  
  • }  
  •   
  • {  
  •    "_id": ObjectId("50ccb3d31e937e0927000001"),  
  •    "feed_type": 1,  
  •    "to_user": 123,  
  •    "time_line": "2012-12-16 01:26:00"  
  • }  

我们按动态类型feed_type和用户to_user进行分组统计,实现结果:  



feed_typeto_usercout
12341
81231
11232
  


  


  


  


  


  


  


  


实现代码:  


  








[php] view plaincopy



  • //编写map函数  
  • $map = '  
  •      function() {  
  •       var key = {to_user:this.to_user,feed_type:this.feed_type};  
  •       var value = {count:1};  
  •       emit(key,value);  
  •     } ';   
  •   
  • //reduce 函数  
  • $reduce = '  
  •      function(key, values) {  
  •          var ret = {count:0};  
  •      for(var i in values) {  
  •           ret.count += 1;  
  •       }  
  •       return ret;  
  •       }';  
  •   
  • //查询条件  
  • $query = null;  //本实例中没有查询条件,设置为null  






[php] view plaincopy



  • $mongo = new Mongo('mongodb://root:root@127.0.0.1: 28017/'); //链接mongodb,账号和密码为root,root  
  • $instance = $mongo->selectDB("testdb");  
  •   
  • //执行此命令后,会创建feed_temp_res的临时集合,并将统计后的数据放在该集合中  
  • $cmd = $instance->command(array(  
  •         'mapreduce' => 'feed',  
  •         'map'       => $map,  
  •         'reduce'    => $reduce,  
  •         'query' => $query,  
  •         'out' => 'feed_temp_res'  
  • ));  
  •   
  • //查询临时集合中的统计数据,验证统计结果是否和预期结果一致  
  • $cursor = $instance->selectCollection('feed_temp_res')->find();  
  • $result = array();  
  • try {  
  •     while ($cursor->hasNext())  
  •     {  
  •         $result[] = $cursor->getNext();  
  •     }  
  • }  
  • catch (MongoConnectionException $e)  
  • {  
  •     echo $e->getMessage();  
  • }  
  • catch (MongoCursorTimeoutException $e)  
  • {  
  •     echo $e->getMessage();  
  • }  
  • catch(Exception $e){  
  •     echo $e->getMessage();  
  • }  
  •   
  • //test  
  • var_dump($result);  

下面是输出的结果,和预期结果一致  


  








[javascript] view plaincopy



  • {  
  •    "_id": {  
  •      "to_user": 234,  
  •      "feed_type": 1   
  •   },  
  •    "value": {  
  •      "count": 1   
  •   }   
  • }  
  •   
  • {  
  •    "_id": {  
  •      "to_user": 123,  
  •      "feed_type": 8   
  •   },  
  •    "value": {  
  •      "count": 1   
  •   }   
  • }  
  •   
  • {  
  •    "_id": {  
  •      "to_user": 123,  
  •      "feed_type": 1   
  •   },  
  •    "value": {  
  •      "count": 2   
  •   }   
  • }  


  


以上只是简单的统计实现,你可以实现复杂的条件统计编写复杂的reduce函数,可以增加查询条件,排序等等。  


  


附上mapReduce数据库处理函数(简单封装)  








[php] view plaincopy



  • /**
  • * mapReduce分组
  • *  
  • * @param string $table_name 表名(要操作的目标集合名)
  • * @param string $map 映射函数(生成键值对序列,作为 reduce 函数参数)  
  • * @param string $reduce 统计处理函数
  • * @param array  $query 过滤条件 如:array('uid'=>123)
  • * @param array  $sort 排序
  • * @param number $limit 限制的目标记录数
  • * @param string $out 统计结果存放集合 (不指定则使用tmp_mr_res_$table_name, 1.8以上版本需指定)
  • * @param bool   $keeptemp 是否保留临时集合
  • * @param string $finalize 最终处理函数 (对reduce返回结果进行最终整理后存入结果集合)
  • * @param string $scope 向 map、reduce、finalize 导入外部js变量
  • * @param bool   $jsMode 是否减少执行过程中BSON和JS的转换,默认true(注:false时 BSON-->JS-->map-->BSON-->JS-->reduce-->BSON,可处理非常大的mapreduce,//true时BSON-->js-->map-->reduce-->BSON)
  • * @param bool   $verbose 是否产生更加详细的服务器日志
  • * @param bool   $returnresult 是否返回新的结果集
  • * @param array  &$cmdresult 返回mp命令执行结果 array("errmsg"=>"","code"=>13606,"ok"=>0) ok=1表示执行命令成功
  • * @return  
  • */  
  • function mapReduce($table_name,$map,$reduce,$query=null,$sort=null,$limit=0,$out='',$keeptemp=true,$finalize=null,$scope=null,$jsMode=true,$verbose=true,$returnresult=true,&$cmdresult){  
  •     if(empty($table_name) || empty($map) || empty($reduce)){  
  •         return null;  
  •     }  
  •     $map = new MongoCode($map);  
  •     $reduce = new MongoCode($reduce);  
  •     if(empty($out)){  
  •         $out = 'tmp_mr_res_'.$table_name;  
  •     }  
  •     $cmd = array(  
  •             'mapreduce' => $table_name,  
  •             'map'       => $map,  
  •             'reduce'    => $reduce,  
  •             'out'       =>$out  
  •     );  
  •     if(!empty($query) && is_array($query)){  
  •         array_push($cmd, array('query'=>$query));  
  •     }  
  •     if(!empty($sort) && is_array($sort)){  
  •         array_push($cmd, array('sort'=>$query));  
  •     }  
  •     if(!empty($limit) && is_int($limit) && $limit>0){  
  •         array_push($cmd, array('limit'=>$limit));  
  •     }  
  •     if(!empty($keeptemp) && is_bool($keeptemp)){  
  •         array_push($cmd, array('keeptemp'=>$keeptemp));  
  •     }  
  •     if(!empty($finalize)){  
  •         $finalize = new Mongocode($finalize);  
  •         array_push($cmd, array('finalize'=>$finalize));  
  •     }  
  •     if(!empty($scope)){  
  •         array_push($cmd, array('scope'=>$scope));  
  •     }  
  •     if(!empty($jsMode) && is_bool($jsMode)){  
  •         array_push($cmd, array('jsMode'=>$jsMode));  
  •     }  
  •     if(!empty($verbose) && is_bool($verbose)){  
  •         array_push($cmd, array('verbose'=>$verbose));  
  •     }  
  •     $dbname = $this->curr_db_name;  
  •     $cmdresult = $this->mongo->$dbname->command($cmd);  
  •     if($returnresult){  
  •         if($cmdresult && $cmdresult['ok']==1){  
  •             $result = $this->find($out, array());  
  •         }  
  •     }  
  •     if($keeptemp==false){  
  •         //删除集合  
  •         $this->mongo->$dbname->dropCollection($out);  
  •     }  
  •     return $result;  
  • }  
  
  -------------------------------------------------------------------------------------------
  MongoDB 的 MapReduce 相当于 Mysql 中的"group by", 所以在 MongoDB 上使用 Map/Reduce 进行并行"统计"很容易。
  使用 MapReduce 要实现两个函数 Map 函数和 Reduce 函数,Map 函数调用 emit(key, value), 遍历collection 中所有记录, key 与 value 传递给 Reduce 函数进行处理。 函数和 Reduce 将 Map 函数可以使用 JavaScript 来实现,可以通db.runCommand 或 mapReduce 命令来执行一个 MapReduce的操作。
  
  MapReduce函数的参数列表如下
  

db.runCommand(
{ mapreduce : ,
map : ,
reduce :
[, query : ]
[, sort : ]
[, limit : ]
[, out : ]
[, keeptemp: ]
[, finalize : ]
[, scope : ]
[, verbose : true]
}
);
  或者这么写:

db.collection.mapReduce(
,
,
{
,
,
,
,
,
                          ,
,
,

}
)

  • mapreduce:指定要进行mapreduce处理的collection
  • map:map函数
  • reduce:reduce函数
  • out:输出结果的collection的名字,不指定会默认创建一个随机名字的collection(如果使用了out选项,就不必指定keeptemp:true了,因为已经隐含在其中了)
  • query:一个筛选条件,只有满足条件的文档才会调用map函数。(query。limit,sort可以随意组合)
  • sort:和limit结合的sort排序参数(也是在发往map函数前给文档排序),可以优化分组机制
  • limit:发往map函数的文档数量的上限(要是没有limit,单独使用sort的用处不大)
  • keytemp:true或false,表明结果输出到的collection是否是临时的,如果想在连接关闭后仍然保留这个集合,就要指定keeptemp为true,如果你用的是MongoDB的mongo客户端连接,那必须exit后才会删除。如果是脚本执行,脚本退出或调用close会自动删除结果collection
  • finalize:是函数,它会在执行完map、reduce后再对key和value进行一次计算并返回一个最终结果,这是处理过程的最后一步,所以finalize就是一个计算平均数,剪裁数组,清除多余信息的恰当时机
  • scope:javascript代码中要用到的变量,在这里定义的变量在map,reduce,finalize函数中可见
  • verbose:用于调试的详细输出选项,如果想看MpaReduce的运行过程,可以设置其为true。也可以print把map,reduce,finalize过程中的信息输出到服务器日志上。
  
  其中重点的几个参数说明:

1、Map
  Map 函数必须调用 emit(key, value) 返回键值对,使用 this 访问当前待处理的 Document。
  m = function() { emit(this.classid, 1) }
  value 可以使用 JSON Object 传递 (支持多个属性值)。例如:
  emit(this.classid, {count:1})
  

2 Reduce
  Reduce 函数接收的参数类似 Group 效果,将 Map 返回的键值序列组合成 { key, [value1, value2, value3, value...] } 传递给 reduce。
  r = function(key, values) {
  var x = 0;
  values.forEach(function(v) { x += v });
  return x;
  }
  

3 Result
  res = db.runCommand({
  mapreduce:"students",
  map:m,
  reduce:r,
  out:"students_res"
  });
  mapReduce() 将结果存储在 "students_res" 表中。

8.4 Finalize
  利用 finalize() 我们可以对 reduce() 的结果做进一步处理。
  f = function(key, value) { return {classid:key, count:value}; }
  列名变与 “classid”和”count”了,这样的列表更容易理解。

8.5 Options
  可以添加更多的控制细节 。添加query、sort等。
  实例:







DSC0000.gif
DSC0001.gif publicvoid MapReduce() {
DSC0002.gif         Mongo mongo =new Mongo("localhost",27017);
        DB db = mongo.getDB("qimiguangdb");
        DBCollection coll = db.getCollection("collection1");
      
        String map ="function() { emit(this.name, {count:1});}";

        String reduce ="function(key, values) {";  
        reduce=reduce+"var total = 0;";  
        reduce=reduce+"for(var i=0;i

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-84476-1-1.html 上篇帖子: Mongodb 更多操作 下篇帖子: MongoDB笔记-查询
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表