zhendeaini123 发表于 2017-4-10 10:23:08

MongoDB的MapReduce用法及php示例代码

MongoDB虽然不像我们常用的mysql,sqlserver,oracle等关系型数据库有group by函数那样方便分组,但是MongoDB要实现分组也有3个办法:
 
 * Mongodb三种分组方式:
 * 1、group(先筛选再分组,不支持分片,对数据量有所限制,效率不高)
 * 2、mapreduce(基于js引擎,单线程执行,效率较低,适合用做后台统计等)
 * 3、aggregate(推荐) (如果你的PHP的mongodb驱动版本需>=1.3.0,推荐你使用aggregate,性能要高很多,并且使用上要简单些,不过1.3的目前还不支持账户认证模式,可以通过http://pecl.php.net/package/mongo查看更新日志和Bug)
 
下面就来看下mapreduce方式:
 
Mongodb官网对MapReduce介绍:
Map/reduce in MongoDB is useful for batch processing of data and aggregation operations. It is similar in spirit to using something like Hadoop with all input coming from a collection and output going to a collection. Often, in a situation where you would have used GROUP BY in SQL, map/reduce is the right tool in MongoDB.
 
大致意思是:Mongodb中的Map/reduce主要是用来对数据进行批量处理和聚合操作,有点类似于使用Hadoop对集合数据进行处理,所有输入数据都是从集合中获取,而MapReduce后输出的数据也都会写入到集合中。通常类似于我们在SQL中使用Group By语句一样。
使用MapReduce要实现两个函数:Map和Reduce。Map函数调用emit(key,value)遍历集合中所有的记录,将key与value传给Reduce函数进行处理。Map函数和Reduce函数是使用Javascript编写的,并可以通过db.runCommand或mapreduce命令来执行MapReduce操作。

 
MapReduce命令如下:
 view plaincopy



[*]db.runCommand(  
[*]{ mapreduce : <collection>,  
[*]   map : <mapfunction>,  
[*]   reduce : <reducefunction>  
[*]   [, query : <query filter object>]  
[*]   [, sort : <sort the query.  useful for optimization>]  
[*]   [, limit : <number of objects to return from collection>]  
[*]   [, out : <output-collection name>]  
[*]   [, keeptemp: <true|false>]  
[*]   [, finalize : <finalizefunction>]  
[*]   [, scope : <object where fields go into javascript global scope >]  
[*]   [, verbose : true]  
[*] }  
[*]);  


参数说明:
mapreduce:要操作的目标集合
map:映射函数(生成键值对序列,作为Reduce函数的参数) 
reduce:统计函数
query:目标记录过滤
sort:对目标记录排序
limit:限制目标记录数量
out:统计结果存放集合(如果不指定则使用临时集合,在客户端断开后自动删除)
keeptemp:是否保留临时集合
finalize:最终处理函数(对reduce返回结果执行最终整理后存入结果集合)
scope:向map、reduce、finalize导入外部变量
verbose:显示详细的时间统计信息

map函数
map函数调用当前对象,并处里对象的属性,传值给reduce,map方法使用this来操作当前对象,最少调用一次emit(key,value)方法来向reduce提供参数,其中emit的key为最终数据的id。

 
reduce函数
接收一个值和数组,根据需要对数组进行合并分组等处理,reduce的key就是emit(key,value)的key,value_array是同个key对应的多个value数组。

 
Finalize函数
此函数为可选函数,可在执行完map和reduce后执行,对最后的数据进行统一处理。

 
看完基本介绍,我们再来看一个实例:
 
已知集合feed,测试数据如下:
 view plaincopy



[*]{  
[*]   "_id": ObjectId("50ccb3f91e937e2927000004"),  
[*]   "feed_type": 1,  
[*]   "to_user": 234,  
[*]   "time_line": "2012-12-16 01:26:00"  
[*]}  
[*]  
[*]{  
[*]   "_id": ObjectId("50ccb3ef1e937e0727000004"),  
[*]   "feed_type": 8,  
[*]   "to_user": 123,  
[*]   "time_line": "2012-12-16 01:26:00"  
[*]}  
[*]  
[*]{  
[*]   "_id": ObjectId("50ccb3e31e937e0a27000003"),  
[*]   "feed_type": 1,  
[*]   "to_user": 123,  
[*]   "time_line": "2012-12-16 01:26:00"  
[*]}  
[*]  
[*]{  
[*]   "_id": ObjectId("50ccb3d31e937e0927000001"),  
[*]   "feed_type": 1,  
[*]   "to_user": 123,  
[*]   "time_line": "2012-12-16 01:26:00"  
[*]}  


我们按动态类型feed_type和用户to_user进行分组统计,实现结果:
feed_typeto_usercout123418123111232
 
 
 
 
 
 
 
实现代码:
 
 view plaincopy



[*]//编写map函数  
[*]$map = '  
[*]     function() {  
[*]      var key = {to_user:this.to_user,feed_type:this.feed_type};  
[*]      var value = {count:1};  
[*]      emit(key,value);  
[*]    } ';   
[*]  
[*]//reduce 函数  
[*]$reduce = '  
[*]     function(key, values) {  
[*]         var ret = {count:0};  
[*]     for(var i in values) {  
[*]          ret.count += 1;  
[*]      }  
[*]      return ret;  
[*]      }';  
[*]  
[*]//查询条件  
[*]$query = null;  //本实例中没有查询条件,设置为null  

 view plaincopy



[*]$mongo = new Mongo('mongodb://root:root@127.0.0.1: 28017/'); //链接mongodb,账号和密码为root,root  
[*]$instance = $mongo->selectDB("testdb");  
[*]  
[*]//执行此命令后,会创建feed_temp_res的临时集合,并将统计后的数据放在该集合中  
[*]$cmd = $instance->command(array(  
[*]        'mapreduce' => 'feed',  
[*]        'map'       => $map,  
[*]        'reduce'    => $reduce,  
[*]        'query' => $query,  
[*]        'out' => 'feed_temp_res'  
[*]));  
[*]  
[*]//查询临时集合中的统计数据,验证统计结果是否和预期结果一致  
[*]$cursor = $instance->selectCollection('feed_temp_res')->find();  
[*]$result = array();  
[*]try {  
[*]    while ($cursor->hasNext())  
[*]    {  
[*]        $result[] = $cursor->getNext();  
[*]    }  
[*]}  
[*]catch (MongoConnectionException $e)  
[*]{  
[*]    echo $e->getMessage();  
[*]}  
[*]catch (MongoCursorTimeoutException $e)  
[*]{  
[*]    echo $e->getMessage();  
[*]}  
[*]catch(Exception $e){  
[*]    echo $e->getMessage();  
[*]}  
[*]  
[*]//test  
[*]var_dump($result);  


下面是输出的结果,和预期结果一致
 
 view plaincopy



[*]{  
[*]   "_id": {  
[*]     "to_user": 234,  
[*]     "feed_type": 1   
[*]  },  
[*]   "value": {  
[*]     "count": 1   
[*]  }   
[*]}  
[*]  
[*]{  
[*]   "_id": {  
[*]     "to_user": 123,  
[*]     "feed_type": 8   
[*]  },  
[*]   "value": {  
[*]     "count": 1   
[*]  }   
[*]}  
[*]  
[*]{  
[*]   "_id": {  
[*]     "to_user": 123,  
[*]     "feed_type": 1   
[*]  },  
[*]   "value": {  
[*]     "count": 2   
[*]  }   
[*]}  




以上只是简单的统计实现,你可以实现复杂的条件统计编写复杂的reduce函数,可以增加查询条件,排序等等。
 
附上mapReduce数据库处理函数(简单封装)
 view plaincopy



[*]/** 
[*] * mapReduce分组 
[*] *  
[*] * @param string $table_name 表名(要操作的目标集合名) 
[*] * @param string $map 映射函数(生成键值对序列,作为 reduce 函数参数)  
[*] * @param string $reduce 统计处理函数 
[*] * @param array  $query 过滤条件 如:array('uid'=>123) 
[*] * @param array  $sort 排序 
[*] * @param number $limit 限制的目标记录数 
[*] * @param string $out 统计结果存放集合 (不指定则使用tmp_mr_res_$table_name, 1.8以上版本需指定) 
[*] * @param bool   $keeptemp 是否保留临时集合 
[*] * @param string $finalize 最终处理函数 (对reduce返回结果进行最终整理后存入结果集合) 
[*] * @param string $scope 向 map、reduce、finalize 导入外部js变量 
[*] * @param bool   $jsMode 是否减少执行过程中BSON和JS的转换,默认true(注:false时 BSON-->JS-->map-->BSON-->JS-->reduce-->BSON,可处理非常大的mapreduce,//true时BSON-->js-->map-->reduce-->BSON) 
[*] * @param bool   $verbose 是否产生更加详细的服务器日志 
[*] * @param bool   $returnresult 是否返回新的结果集 
[*] * @param array  &$cmdresult 返回mp命令执行结果 array("errmsg"=>"","code"=>13606,"ok"=>0) ok=1表示执行命令成功 
[*] * @return  
[*] */  
[*]function mapReduce($table_name,$map,$reduce,$query=null,$sort=null,$limit=0,$out='',$keeptemp=true,$finalize=null,$scope=null,$jsMode=true,$verbose=true,$returnresult=true,&$cmdresult){  
[*]    if(empty($table_name) || empty($map) || empty($reduce)){  
[*]        return null;  
[*]    }  
[*]    $map = new MongoCode($map);  
[*]    $reduce = new MongoCode($reduce);  
[*]    if(empty($out)){  
[*]        $out = 'tmp_mr_res_'.$table_name;  
[*]    }  
[*]    $cmd = array(  
[*]            'mapreduce' => $table_name,  
[*]            'map'       => $map,  
[*]            'reduce'    => $reduce,  
[*]            'out'       =>$out  
[*]    );  
[*]    if(!empty($query) && is_array($query)){  
[*]        array_push($cmd, array('query'=>$query));  
[*]    }  
[*]    if(!empty($sort) && is_array($sort)){  
[*]        array_push($cmd, array('sort'=>$query));  
[*]    }  
[*]    if(!empty($limit) && is_int($limit) && $limit>0){  
[*]        array_push($cmd, array('limit'=>$limit));  
[*]    }  
[*]    if(!empty($keeptemp) && is_bool($keeptemp)){  
[*]        array_push($cmd, array('keeptemp'=>$keeptemp));  
[*]    }  
[*]    if(!empty($finalize)){  
[*]        $finalize = new Mongocode($finalize);  
[*]        array_push($cmd, array('finalize'=>$finalize));  
[*]    }  
[*]    if(!empty($scope)){  
[*]        array_push($cmd, array('scope'=>$scope));  
[*]    }  
[*]    if(!empty($jsMode) && is_bool($jsMode)){  
[*]        array_push($cmd, array('jsMode'=>$jsMode));  
[*]    }  
[*]    if(!empty($verbose) && is_bool($verbose)){  
[*]        array_push($cmd, array('verbose'=>$verbose));  
[*]    }  
[*]    $dbname = $this->curr_db_name;  
[*]    $cmdresult = $this->mongo->$dbname->command($cmd);  
[*]    if($returnresult){  
[*]        if($cmdresult && $cmdresult['ok']==1){  
[*]            $result = $this->find($out, array());  
[*]        }  
[*]    }  
[*]    if($keeptemp==false){  
[*]        //删除集合  
[*]        $this->mongo->$dbname->dropCollection($out);  
[*]    }  
[*]    return $result;  
[*]}  




MongoDB官方网站介绍:

MapReduce介绍 http://docs.mongodb.org/manual/core/map-reduce/


Aggregation介绍 http://docs.mongodb.org/manual/aggregation/
 
http://blog.csdn.net/slimboy123/article/details/8909974
页: [1]
查看完整版本: MongoDB的MapReduce用法及php示例代码