分析家 发表于 2018-11-1 13:16:16

solr导入数据的高效方法

public class SolrRecordHandler implements Runnable{  
//生产者-消费者 solr doc
  
    private ArrayBlockingQueue docs=new ArrayBlockingQueue(5000);
  
      public void wrap(ResultSet rs){
  
SolrInputDocument doc=new SolrInputDocument();
  
      try {
  
             ResultSetMetaData rsm = rs.getMetaData();
  
             int numColumns = rsm.getColumnCount();
  
             for (int i = 1; i < (numColumns + 1); i++)
  
             {
  
                  doc.addField(rsm.getColumnName(i), rs.getObject(i));
  
             }
  

  
      } catch (Exception e) {
  
            e.printStackTrace();
  
            return null;
  
      }
  
      docs.add(doc);
  
      }
  
@Override
  
    public void run() {
  
      logger.info("solr 写线程启动开始。。。。");
  
      SolrCore core = cores.getCore("review");
  
      UpdateRequestProcessorChain chain=core.getUpdateProcessingChain(null);
  
      SolrParams param=new ModifiableSolrParams();
  
      SolrQueryRequestBase req=new SolrQueryRequestBase(core,param){};
  
      SolrQueryResponse rsp=new SolrQueryResponse();
  
      UpdateRequestProcessor processor=chain.createProcessor(req, rsp);
  
      //不停地从队列中读取元素,直到任务结束
  
      SolrInputDocument doc;
  
      AddUpdateCommand acmd=new AddUpdateCommand(req);
  
      while(true){
  
            try {
  
                doc=docs.take();
  
                //读取到一个空的doc,则表明任务结束
  
                if(doc.isEmpty()){
  
                  break;
  
                }
  
                acmd.solrDoc=doc;
  
                processor.processAdd(acmd);
  
            } catch (Exception e) {
  
                e.printStackTrace();
  
            }
  
      }
  
      logger.info("solr index thread finished!");
  
      //任务完成,则提交
  
      try {
  
            CommitUpdateCommand cmd=new CommitUpdateCommand(req, false);
  
            processor.processCommit(cmd);
  
      } catch (IOException e) {
  
            e.printStackTrace();
  
      }finally{
  
            try {
  
                processor.finish();
  
            } catch (IOException e) {
  
                e.printStackTrace();
  
            }
  
      }
  
      isfinished.set(true);
  
    }
  
}


页: [1]
查看完整版本: solr导入数据的高效方法