设为首页 收藏本站
查看: 2561|回复: 0

[经验分享] MongoDB集群与LBS应用系列(二)--与Hadoop集成

[复制链接]

尚未签到

发表于 2015-7-6 01:30:28 | 显示全部楼层 |阅读模式
  长期以来,我每开个系列,只有兴趣写一篇,很难持之与恒。为了克服这个长久以来的性格弱点,以及梳理工作半年的积累。最近一个月会写两篇关于Mongo在地理大数据方面的实践和应用,一篇关于推荐系统的初期准备过程,一篇用户行为矩阵的可视化。希望能够立言为证,自我监督。

1.驱动准备
  言归正传,前文MongoDB集群部署完毕之后,CRUD就是主要需求。NoSQL与普通关系数据库不同的是,避免采用ORM框架对数据库做操作,这样会带来明显的性能下降[1]。使用原生的Driver是一个较为合理的选择,Mongo支持的语言非常多,包括JS,Java,C,C++,Python,Scala等[2]。
  如果是单纯的MongoDB项目,我们会用NodeJS Driver,方便快捷,示例规范,值得推荐。在本文我使用Java Driver,主要是集成Hadoop工程方便。同时还会用到Mongo Hadoop Adapter 可以选择到Github 下载源码编译,或者直接根据自己Hadoop集群版本选择下载Jar包,添加到Hadoop安装目录的lib文件夹下[3]。但是在不少公有云平台上,普通用户是没有修改Hadoop系统的权限,无法添加Jar包,所以在本文的示例代码中,采用分布式缓存的方法添加这两个Jar包。

2.实现原理与过程
  其实Hadoop和MongoDB的集成,很大程度上是将Mongo作为Hadoop的输入和输出源,而Mongo Hadoop Adapter也是主要实现了BSONWritable,MongoInputformat等这些类,也就是说需要自定义Hadoop的序列化类以及输入输出格式。

2.1 Hadoop序列化与反序列化
  序列化(serialization)将结构化对象转化为二进制字节流,以便网络传输和写入磁盘。反序列化(deserialization)则是它的逆过程,将字节流转化为结构化对象。分布式系统通常在进程通讯和持久化时候会使用序列化。Hadoop系统节点进程通信使用RPC,该协议存活时间非常短,因此需要其序列化格式具备以下特点:紧凑、快速、可扩展等。Hadoop提供了Writable接口,它定义了对数据的IO流,即需要实现readFields 和 Write两个方法[4]。

2.2 Mongo Adapter的源码实现
  Mongo Hadoop Adater所实现的BSONWritable等类,源码实现体现了上述的规范:



//输出
public void write( DataOutput out ) throws IOException{
BSONEncoder enc = new BasicBSONEncoder();
BasicOutputBuffer buf = new BasicOutputBuffer();
enc.set( buf );
…………
}
//输入
public void readFields( DataInput in ) throws IOException{
BSONDecoder dec = new BasicBSONDecoder();
BSONCallback cb = new BasicBSONCallback();
// Read the BSON length from the start of the record
//字节流长度
byte[] l = new byte[4];
try {
in.readFully( l );
…………
byte[] data = new byte[dataLen + 4];
System.arraycopy( l, 0, data, 0, 4 );
in.readFully( data, 4, dataLen - 4 );
dec.decode( data, cb );
_doc = (BSONObject) cb.get();
………………
}
  
  因此我们在编写MapReduce程序的时候可以传递BsonWritable的key,value键值对,而Mongo构建于Bson之上,也就是说可以将MongoDB视为HDFS同性质的存储节点即可。

3. 代码实现
  在Mongo-Hadoop网站有数个例子,但是讲得不够详细,本文主要对它的金矿产量的例子做一个补充。完整的Hadoop项目一般包括Mapper,Reduceer,Job三个Java Class,以及一个一个配置文件(configuration.xml)来定义项目的输入输出等。Mongo-Hadoop项目会多一个mongo-defaults.xml,当然可以将两者融合起来。

3.1  数据准备
  从github中下载源码包,它会包含examples/treasury_yield/src/main/resources/yield_historical_in.json文件,将该json文件上传到Mongo所在的服务器,使用以下命令将它导入Mongo的testmr数据库中的example collection中。



mongoimport --host 127.0.0.1 --port 27017 -d testmr -c example --file ./yield_historical_in.json
  查看一下数据结构



use testmr
db.example.find().limit(1).pretty()
  如下:



{
"_id": ISODate("1990-01-25T19:00:00-0500"),
"dayOfWeek": "FRIDAY", "bc3Year": 8.38,
"bc10Year": 8.49,

}
3.2  Mapper和Reducer还有Job以及mongo-defaults.xml
  Mapper是从Mongo中读取BSONObject



public class MongoTestMapper extends Mapper
  以及处理读过来的键值对,并发到Reducer中汇总计算。注意value的类型。



public void map(final Object pkey, final BSONObject pvalue,final Context context)
{
final int year = ((Date)pvalue.get("_id")).getYear()+1990;
double bdyear  = ((Number)pvalue.get("bc10Year")).doubleValue();
try {
context.write( new IntWritable( year ), new DoubleWritable( bdyear ));
} catch (IOException e) {
// TODO Auto-generated catch block
                e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
                e.printStackTrace();
}
}
  Reducer会接受Mapper传过来的键值对



public class MongoTestReducer extends Reducer
  进行计算并将结果写入MongoDB.请注意输出的Value的类型是BSONWritable.



public void reduce( final IntWritable pKey,
final Iterable pValues,
final Context pContext ) throws IOException, InterruptedException{
int count = 0;
double sum = 0.0;
for ( final DoubleWritable value : pValues ){
sum += value.get();
count++;
}
final double avg = sum / count;
BasicBSONObject out = new BasicBSONObject();
out.put("avg", avg);
pContext.write(pKey, new BSONWritable(out));
}
  
  Job作为MapReudce主类,主要使用DistributedCache分布式缓存来添加驱动包,并定义了任务的输入配置等。如下所示:



//Using Distribute Cache,call it before job define.
        DistributedCache.createSymlink(conf);
//………………
//Using DistributedCache to add Driver Jar File
DistributedCache.addFileToClassPath(new Path("/user/amap/data/mongo/mongo-2.10.1.jar"), conf);
DistributedCache.addFileToClassPath(new Path("/user/amap/data/mongo/mongo-hadoop-core_cdh4.3.0-1.1.0.jar"), conf);
// job conf
Job job = new Job(conf,"VentLam:Mongo-Test-Job");
  mongo-defaults.xml 配置文件中定义了非常多的参数,我们只需要修改输入输出URI



   
mongo.input.uri
mongodb://127.0.0.1/testmr.example



mongo.output.uri
mongodb://127.0.0.1/testmr.mongotest


  
  将整个java项目打包为名为mongotest的jar包,上传到Hadoop集群,执行命令:



hadoop jar mongotest.jar org.ventlam.MongoTestJob
  以后会将我的博客涉及到源码都发布在https://github.com/ventlam/BlogDemo 中,这篇文章对应的是mongohadoop文件夹。

4.参考文献
  [1] What the overhead of Java ORM for MongoDB
  http://stackoverflow.com/questions/10600162/what-the-overhead-of-java-orm-for-mongodb
  [2] MongoDB Drivers and Client Libraries
  http://docs.mongodb.org/ecosystem/drivers/
  [3]Getting Started with Hadoop
  http://docs.mongodb.org/ecosystem/tutorial/getting-started-with-hadoop/
  [4] Interface Writable    http://hadoop.apache.org/docs/stable/api/

  本作品由VentLam创作,采用知识共享署名-非商业性使用-相同方式共享 2.5 中国大陆许可协议进行许可。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-83436-1-1.html 上篇帖子: MongoDB源码概述——日志 下篇帖子: 搭建高可用mongodb集群(二)—— 副本集
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表