|
Mongo批处理工具类:
package com.saike.solr.server.util;
import java.net.UnknownHostException;
import java.util.ArrayList;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.MongoException;
import com.mongodb.MongoOptions;
/**
* 批处理工具类
* @author xieyong
*
*/
public class UtileMongDB {
UtilThreadLocal localBatch;
/**mongo单例对象 根据官方文档mongojava是线程安全的*/
private static Mongo mongo;
private static DBCollection coll;
//private static Log log = LogFactory.getLog(UtileMongDB.class);
private static DB db;
static{
/** 实例化db*/
MongoOptions options = new MongoOptions();
options.autoConnectRetry = true;
options.connectionsPerHost = 1000;
options.maxWaitTime = 5000;
options.socketTimeout = 0;
options.connectTimeout = 15000;
options.threadsAllowedToBlockForConnectionMultiplier = 5000;
try {
mongo = new Mongo(MongoDBConstant.MONGO_HOST,MongoDBConstant.MONGO_PORT);
} catch (UnknownHostException | MongoException e) {
e.printStackTrace();
}
// boolean auth = db.authenticate(myUserName, myPassword);
}
public UtileMongDB(){
try {
localBatch = new UtilThreadLocal(ArrayList.class);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 返回db对象
* @return db
*/
public static DB getDB(){
if(db==null){
db = mongo.getDB(MongoDBConstant.MONGO_DB);
}
return db;
}
/**
* 返回mongo
* @return mongo连接池
*/
public static Mongo getMong(){
return mongo;
}
/**
* 读取集合
* @return mongo集合
* */
public static DBCollection getColl(String collname){
return getDB().getCollection(collname);
}
public static DBCollection getColl(){
return getDB().getCollection(MongoDBConstant.MONGO_COLLECTION);
}
/** crud操作 */
public void addBatch(String key,String value){
BasicDBObject basicDB = new BasicDBObject();
basicDB.put(key, value);
/** 这里用线程本地变量,不用会存在竞技条件*/
localBatch.newGet().add(basicDB);
}
/**
* 执行批处理
* */
public void executeInsertBatch(){
getColl().insert(localBatch.get());
localBatch.get().clear();
}
/**
* 执行批量删除
*/
public void executeDeleteBatch(){
ArrayList array = localBatch.get();
for(DBObject obj:array){
getColl().remove(obj);
}
localBatch.get().clear();
}
public DBCursor query(String key,String value){
BasicDBObject basicDBObject = new BasicDBObject();
basicDBObject.put(key,value);
return getColl().find(basicDBObject);
}
}
ThreadLocal的封装:
package com.saike.solr.server.util;
import java.lang.reflect.Constructor;
/**
*
* @author xieyong
*
* @param 本地线程变量对象了类型
*/
public class UtilThreadLocal extends ThreadLocal {
/**参数集合*/
Object[] obj;
/**实例化构造函数*/
Constructor construct;
/**
*
* @param clazz 本地变量的class
* @param args 构造函数的参数
* @throws NoSuchMethodException
* @throws SecurityException
*/
public UtilThreadLocal(Class clazz,Object... args) throws NoSuchMethodException, SecurityException{
this.obj = obj;
Class[] clazzs = null;
/** new 获取参数class供获取构造函数用*/
if(args != null)
if(args.length !=0){
clazzs = new Class[args.length];
for(int i = 0;i |
|
|