afox123 发表于 2017-5-21 10:09:45

7.分布式搜索elasticsearch java API ------与MongoDB同步数据

 elasticsearch提供river这个模块来读取数据源中的数据到es中,es官方有提供couchDB的同步插件,因为项目用到的是mongodb,所以在找mongodb方面的同步插件,在git上找到了elasticsearch-river-mongodb。
       这个插件最初是由aparo写的,最开始的功能就是读取mongodb里面的表,记录最后一条数据的id,根据时间间隔不断访问mongodb,看看有没有大于之前记录的id的数据,有的话就索引数据,这种做法的缺点就是只能同步最新的数据,修改或删除的就不能同步。后来又由richardwilly98等人修改成通过读取mongodb的oplog来同步数据。因为mongodb是通过oplog这个表来使集群中的不同机器数据同步的,这样做的话可以保证es里面的数据和mongodb里面的是一样的,因为mongodb中的数据一有改变,都会通过oplog反映到monogodb中。他们还添加了个索引mongodb gridfs里文件的功能,非常好。
        但他们修改完后的插件还是有些不满意的地方。他把local库(放oplog的)和普通库的访问密码都设置成同一个,如果local库和普通库的用户名和密码不同那这个插件就不能用了。还有一个就是同步时会把mongodb的表中所有的字段都同步过去,但是有些字段我们并不想把它放到索引中,于是对这个插件再作修改,把local库和普通库的鉴权分开,添加可选字段功能。
 
运行环境:Elasticsearch 0.19.X
                  集群环境下的MongoDB 2.X
注意:该插件只支持集群环境下的mongodb,因为集群环境下的mongodb才有oplog这个表。
安装方法:
安装elasticsearch-mapper-attachments插件(用于索引gridfs里的文件)
%ES_HOME%\bin\plugin.bat -install elasticsearch/elasticsearch-mapper-attachments/1.4.0
安装elasticsearch-river-mongodb(同步插件)
%ES_HOME%\bin\plugin.bat -install laigood/elasticsearch-river-mongodb/laigoodv1.0.0
 
创建river方法:
curl方式:
 view plaincopy



[*]$ curl -XPUT "localhost:9200/_river/mongodb/_meta" -d '  
[*]{  
[*]  type: "mongodb",  
[*]  mongodb: {   
[*]    db: "test",   
[*]    host: "localhost",   
[*]    port: "27017",   
[*]    collection: "testdb",  
[*]    fields:"title,content",  
[*]    gridfs: "true",  
[*]    local_db_user: "admin",  
[*]    local_db_password:"admin",  
[*]    db_user: "user",  
[*]    db_password:"password"  
[*]  },   
[*]  index: {   
[*]    name: "test",   
[*]    type: "type",  
[*]    bulk_size: "1000",   
[*]    bulk_timeout: "30"  
[*]  }  
[*]}  

db为同步的数据库名, 
host mongodb的ip地址(默认为localhost), 
port mongodb的端口,
collection 要同步的表名
fields 要同步的字段名(用逗号隔开,默认全部)
gridfs 是否是gridfs文件(如果collection是gridfs的话就设置成true)
local_db_user local数据库的用户名(没有的话不用写)
local_db_password local数据库的密码(没有的话不用写)
db_user 要同步的数据库的密码(没有的话不用写)
db_password 要同步的数据库的密码(没有的话不用写)
name 索引名(不能之前存在)
type 类型
bulk_size 批量添加的最大数
bulk_timeout 批量添加的超时时间
java api方式:
 view plaincopy



[*]client.prepareIndex("_river", "testriver", "_meta")  
[*]        .setSource(  
[*]            jsonBuilder().startObject()  
[*]                .field("type", "mongodb")  
[*]                .startObject("mongodb")  
[*]                        .field("host","localhost")  
[*]                        .field("port",27017)  
[*]                        .field("db","testdb")  
[*]                        .field("collection","test")  
[*]                        .field("fields","title,content")  
[*]                        .field("db_user","user")  
[*]              <span style="white-space:pre">                </span>.field("db_password","password")  
[*]                        .field("local_db_user","admin")  
[*]                  <span style="white-space:pre">        </span>.field("local_db_password","admin")  
[*]                        .endObject()                                              
[*]                 .startObject("index")  
[*]                        .field("name","test")  
[*]                        .field("type","test")  
[*]                        .field("bulk_size","1000")  
[*]                        .field("bulk_timeout","30")  
[*]                        .endObject()  
[*]                 .endObject()  
[*]        ).execute().actionGet();  

 
本插件git地址:https://github.com/laigood/elasticsearch-river-mongodb
参考资料:http://www.searchtech.pro/articles/2013/02/18/1361191176552.html
页: [1]
查看完整版本: 7.分布式搜索elasticsearch java API ------与MongoDB同步数据