色魔王子7 发表于 2015-7-17 10:59:31

solr创建索引源码解析

  先说下创建索引源码流程:
        源码类:
          1.CommonHttpSolrServer (SolrServer的子类)
          2.SolrServer(抽象类)
          3.SolrRequest (基类)
          4.AbstractUpdateRequest (抽象类、SolrRequest的子类)
          5.UpdateRequest(AbstractUpdateRequest的子类)
          6.SolrInputDocument(设置需要索引的名称和值、这个应该放在第一位)
  
  创建索引代码:
  

[*]       查询数据库数据,或者其他文档数据进行索引



view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg
[*]          private void updateBook(String sql, String url, String idColumn,
[*]            String timeColumn,BufferedWriter dataFile) throws Exception {
[*]      long start = System.currentTimeMillis();
[*]               SolrUtil solrUtil = new SolrUtil(url);//初始化索引
[*]      SolrDocument doc = SqlSh.getSolrMaxDoc(solrUtil, idColumn, timeColumn);
[*]      if (doc == null) {
[*]            CommonLogger.getLogger().error("solr no data.");
[*]            return;
[*]      }
[*]      int maxId = Integer.parseInt(doc.get(idColumn).toString());
[*]      long maxTime = Long.parseLong(doc.get(timeColumn).toString())*1000;
[*]      Date maxDate = new Date(maxTime);
[*]         
[*]      DateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
[*]                //获取数据库需要索引的数据
[*]      ResultSet rs = stmt_m.executeQuery(String.format(sql,
[*]                dateFormat2.format(maxDate)));
[*]      //获取需要创建索引的key
[*]                initColumeMeta(rs.getMetaData());
[*]
[*]                //解析数据并索引
[*]      parseRs(rs, solrUtil);
[*]
[*]      rs.close();
[*]               
[*]                //优化索引
[*]      solrUtil.server.optimize();
[*]
[*]      CommonLogger.getLogger().info(
[*]                "update book time:" + (System.currentTimeMillis() - start)
[*]                        / 1000 + "s");
[*]    }


[*]咱们看下上面代码的parseRs方法



view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg
[*]//下面是简单的解析数据方法并写索引         




view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg

[*] private void parseRs(ResultSet rs, SolrUtil solrUtil) throwsException {
[*]      Collection docs=new ArrayList();
[*]      SolrInputDocument doc = null;
[*]      int locBk = 0;
[*]      boolean flag=true;
[*]      StringBuilder sb=null;
[*]      String vl=null;
[*]      try {
[*]            while (rs.next()) {
[*]                doc = new SolrInputDocument();
[*]                for (int i = 0; i < ToolMain.columnNames.length; i++) {
[*]                  doc.addField(
[*]                        ToolMain.columnNames,
[*]                        getColumnValue(
[*]                            rs.getObject(ToolMain.columnNames),
[*]                            ToolMain.columnTypes));//此方法为设置一个域,可以添加一个参数来设置权重
[*]                }
[*]                docs.add(doc);
[*]                locBk++;
[*]                if (docs.size() >= 1000) {
[*]                  solrUtil.addDocList(docs);//创建索引和提交索引操作都在这里面
[*]                  docs.clear();
[*]                }
[*]            }
[*]            if (docs.size() > 0) {
[*]                solrUtil.addDocList(docs);
[*]                docs.clear();
[*]            }
[*]      } catch (Exception e) {
[*]            throw e;
[*]      } finally {
[*]            docs.clear();
[*]            docs = null;
[*]      }
[*]    }

3.下面来说明下SolrUtil类,此类主要是封装了CommonHttpSolrServer



view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg

[*]import java.util.Collection;
[*]
[*]import log.CommonLogger;
[*]
[*]import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
[*]import org.apache.solr.common.SolrInputDocument;
[*]
[*]public class SolrUtil {
[*]    public CommonsHttpSolrServer server = null;
[*]
[*]    public String url = "";//url为solr服务的地址
[*]    publicString shards = "";
[*]
[*]    public SolrUtil(String url) {
[*]      this.url = url;
[*]      initSolr();
[*]    }
[*]    public SolrUtil(String url,String shards) {
[*]      this.url = url;
[*]      this.shards=shards;
[*]      initSolr();
[*]    }
[*]       //初始化Server
[*]    private void initSolr() {
[*]      try {
[*]            server = new CommonsHttpSolrServer(url);
[*]            server.setSoTimeout(60*1000);
[*]            server.setConnectionTimeout(60*1000);
[*]            server.setDefaultMaxConnectionsPerHost(1000);
[*]            server.setMaxTotalConnections(1000);
[*]            server.setFollowRedirects(false);
[*]            server.setAllowCompression(true);
[*]      } catch (Exception e) {
[*]            e.printStackTrace();
[*]            System.exit(-1);
[*]      }
[*]    }
[*]    //封装了add、commit
[*]    public void addDocList(Collection docs) {
[*]      try {
[*]            server.add(docs);
[*]            server.commit();
[*]            docs.clear();//释放
[*]      } catch (Exception e) {
[*]            CommonLogger.getLogger().error("addDocList error.", e);
[*]      }
[*]    }
[*]      
[*]    public void deleteDocByQuery(String query) throws Exception {   
[*]      try {
[*]            server.deleteByQuery(query);
[*]            server.commit();
[*]      } catch (Exception e) {
[*]            CommonLogger.getLogger().error("deleteDocByQuery error.", e);
[*]            throw e;
[*]      }
[*]    }
[*]}

  4.现在来看看solr创建索引的源码
  其实源码执行的操作无非是 生成请求request返回response
  1.上面代码中的SolrInputDocument 类所做的操作



view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg

[*]    public class SolrInputDocument implements Map, Iterable, Serializable   //实现了Map和Iterable的接口并且实现了接口中的方法,其主要的类为SolrInputFiled类




view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg

[*]    public class SolrInputField implements Iterable, Serializable //类中只有三个属性,String key,Object value,还包括评分float boost = 1.0f; 默认是1.0f(如果做权重的话可以设置这个值)

  再来看下执行的CommonHttpSolrServer类所做的操作(表现形式在SolrUtil中的addDocList)
  2.添加文档方法



view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg

[*]public UpdateResponse add(Collection docs )                         throws SolrServerException, IOException {




view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg

[*]            UpdateRequest req = new UpdateRequest();//创建一个request




view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg

[*]          req.add(docs);//调用UpdateRequest的add方法,添加索引文档
[*]          return req.process(this);//亲 重点是这个方法(返回的是response)
[*]   }




view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg

[*]      //再看下UpdateRequest的add方法
[*]      private List documents = null;
[*]      public UpdateRequest add( final Collection docs )
[*]      {
[*]            if( documents == null ) {
[*]                  documents = new ArrayList( docs.size()+1 );
[*]            }
[*]            documents.addAll( docs );
[*]            return this;
[*]      }

3.提交方法 commit,调用的是SolrServer类中的



view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg

[*] public UpdateResponse commit( boolean waitFlush, boolean waitSearcher ) throws Solr    ServerException, IOException {
[*]      return new UpdateRequest().setAction( UpdateRequest.ACTION.COMMIT, waitFlush, waitSearcher ).process( this );//看到了吗?
[*] setAction都是为了对对象ModifiableSolrParams(这个对象在最终CommonHttpSolrServerrequest的request方法中用的到)
[*] 在提交索引的时候也是调用的process方法
[*]}

4.优化索引



view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg

[*]public UpdateResponse optimize(boolean waitFlush, boolean waitSearcher,               int maxSegments ) throws SolrServerException, IOException {
[*]sp;         return new UpdateRequest().setAction( UpdateRequest.ACTION.OPTIMIZE, waitFlush, waitSearcher, maxSegments ).process( this );//同样调用process,通过setAction参数,在CommonHttpSolrServer类方法request()中主要执行的是合并和压缩setAction都是为了对对象ModifiableSolrParams(这个对象在最终CommonHttpSolrServer的request方法中用的到)进行赋值
[*]}

5.既然上面都提到了process方法,那我们来看看



view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg

[*]@Override
[*]   public UpdateResponse process( SolrServer server ) throws SolrServerException,             IOException
[*]   {
[*]         long startTime = System.currentTimeMillis();
[*]         UpdateResponse res = new UpdateResponse();
[*]         res.setResponse( server.request( this ) );//这里面这个方法可是重点之重啊,这是调用了 CommonHttpSolrServer类中的request方法
[*]         res.setElapsedTime( System.currentTimeMillis()-startTime );
[*]         return res;
[*]   }

6.最终的方法是SolrServer的子类CommonHttpSolrServer类的request方法,咋再来看看这个方法是怎么工作的



view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg

[*]public NamedList request(final SolrRequest request, ResponseParser processor    ) throws SolrServerException, IOException {
[*]      
[*]    HttpMethod method = null;
[*]    InputStream is = null;
[*]    SolrParams params = request.getParams();
[*]    Collection streams = requestWriter.getContentStreams(request);
[*]    String path = requestWriter.getPath(request);
[*]      
[*]    //创建索引进来的是/update/select 为查询   
[*]    if( path == null || !path.startsWith( "/" ) ) {
[*]      path = "/select";
[*]    }
[*]      
[*]    ResponseParser parser = request.getResponseParser();
[*]    if( parser == null ) {
[*]      parser = _parser;
[*]    }
[*]      
[*]    // The parser 'wt=' and 'version=' params are used instead of the original params
[*]    ModifiableSolrParams wparams = new ModifiableSolrParams();
[*]    wparams.set( CommonParams.WT, parser.getWriterType() );
[*]    wparams.set( CommonParams.VERSION, parser.getVersion());
[*]    if( params == null ) {
[*]      params = wparams;
[*]    }
[*]    else {
[*]      params = new DefaultSolrParams( wparams, params );
[*]    }
[*]      
[*]    if( _invariantParams != null ) {
[*]      params = new DefaultSolrParams( _invariantParams, params );
[*]    }
[*]
[*]    int tries = _maxRetries + 1;
[*]    try {
[*]      while( tries-- > 0 ) {
[*]      // Note: since we aren't do intermittent time keeping
[*]      // ourselves, the potential non-timeout latency could be as
[*]      // much as tries-times (plus scheduling effects) the given
[*]      // timeAllowed.
[*]      try {//通过使用查看solr源码,在使用UpdateRequest对象时会自动设置为Post
[*]          if( SolrRequest.METHOD.GET == request.getMethod() ) {
[*]            if( streams != null ) {
[*]                  throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "GET can't send streams!" );
[*]            }
[*]            method = new GetMethod( _baseURL + path + ClientUtils.toQueryString( params, false ) );
[*]          }
[*]          else if( SolrRequest.METHOD.POST == request.getMethod() ) {//所以我们直接看
[*]
[*]            String url = _baseURL + path;
[*]            boolean isMultipart = ( streams != null && streams.size() > 1 );
[*]
[*]            if (streams == null || isMultipart) {
[*]            PostMethod post = new PostMethod(url);//设置post,包括request头部、内容、参数、等等一些操作
[*]            post.getParams().setContentCharset("UTF-8");
[*]            if (!this.useMultiPartPost && !isMultipart) {
[*]                post.addRequestHeader("Content-Type",
[*]                  "application/x-www-form-urlencoded; charset=UTF-8");
[*]            }
[*]
[*]            List parts = new LinkedList();
[*]            Iterator iter = params.getParameterNamesIterator();
[*]            while (iter.hasNext()) {
[*]                String p = iter.next();
[*]                String[] vals = params.getParams(p);
[*]                if (vals != null) {
[*]                  for (String v : vals) {
[*]                  if (this.useMultiPartPost || isMultipart) {
[*]                      parts.add(new StringPart(p, v, "UTF-8"));
[*]                  } else {
[*]                      post.addParameter(p, v);
[*]                  }
[*]                  }
[*]                }
[*]            }
[*]
[*]            if (isMultipart) {
[*]                int i = 0;
[*]                for (ContentStream content : streams) {
[*]                  final ContentStream c = content;
[*]
[*]                  String charSet = null;
[*]                  PartSource source = new PartSource() {
[*]                  public long getLength() {
[*]                      return c.getSize();
[*]                  }
[*]                  public String getFileName() {
[*]                      return c.getName();
[*]                  }
[*]                  public InputStream createInputStream() throws IOException {
[*]                      return c.getStream();
[*]                  }
[*]                  };
[*]                  
[*]                  parts.add(new FilePart(c.getName(), source,   
[*]                                       c.getContentType(), charSet));
[*]                }
[*]            }
[*]            if (parts.size() > 0) {
[*]                post.setRequestEntity(new MultipartRequestEntity(parts
[*]                  .toArray(new Part), post.getParams()));
[*]            }
[*]
[*]            method = post;
[*]            }
[*]            // It is has one stream, it is the post body, put the params in the URL
[*]            else {
[*]            String pstr = ClientUtils.toQueryString(params, false);
[*]            PostMethod post = new PostMethod(url + pstr);
[*]
[*]            // Single stream as body
[*]            // Using a loop just to get the first one
[*]            final ContentStream[] contentStream = new ContentStream;
[*]            for (ContentStream content : streams) {
[*]                contentStream = content;
[*]                break;
[*]            }
[*]            if (contentStream instanceof RequestWriter.LazyContentStream) {
[*]                post.setRequestEntity(new RequestEntity() {
[*]                  public long getContentLength() {
[*]                  return -1;
[*]                  }
[*]
[*]                  public String getContentType() {
[*]                  return contentStream.getContentType();
[*]                  }
[*]
[*]                  public boolean isRepeatable() {
[*]                  return false;
[*]                  }
[*]
[*]                  public void writeRequest(OutputStream outputStream) throws IOException {
[*]                  ((RequestWriter.LazyContentStream) contentStream).writeTo(outputStream);
[*]                  }
[*]                }
[*]                );
[*]
[*]            } else {
[*]                is = contentStream.getStream();
[*]                post.setRequestEntity(new InputStreamRequestEntity(is, contentStream.getContentType()));
[*]            }
[*]            method = post;
[*]            }
[*]          }
[*]          else {
[*]            throw new SolrServerException("Unsupported method: "+request.getMethod() );
[*]          }
[*]      }
[*]      catch( NoHttpResponseException r ) {
[*]          // This is generally safe to retry on
[*]          method.releaseConnection();
[*]          method = null;
[*]          if(is != null) {
[*]            is.close();
[*]          }
[*]          // If out of tries then just rethrow (as normal error).
[*]          if( ( tries < 1 ) ) {
[*]            throw r;
[*]          }
[*]          //log.warn( "Caught: " + r + ". Retrying..." );
[*]      }
[*]      }
[*]    }
[*]    catch( IOException ex ) {
[*]      throw new SolrServerException("error reading streams", ex );
[*]    }
[*]
[*]    method.setFollowRedirects( _followRedirects );
[*]    method.addRequestHeader( "User-Agent", AGENT );
[*]    if( _allowCompression ) {
[*]      method.setRequestHeader( new Header( "Accept-Encoding", "gzip,deflate" ) );
[*]    }
[*]
[*]    try {
[*]      // Execute the method.
[*]      //System.out.println( "EXECUTE:"+method.getURI() );
[*]      //执行请求,返回状态码,然后组装response 最后返回
[*]      int statusCode = _httpClient.executeMethod(method);
[*]      if (statusCode != HttpStatus.SC_OK) {
[*]      StringBuilder msg = new StringBuilder();
[*]      msg.append( method.getStatusLine().getReasonPhrase() );
[*]      msg.append( "\n\n" );
[*]      msg.append( method.getStatusText() );
[*]      msg.append( "\n\n" );
[*]      msg.append( "request: "+method.getURI() );
[*]      throw new SolrException(statusCode, java.net.URLDecoder.decode(msg.toString(), "UTF-8") );
[*]      }
[*]
[*]      // Read the contents
[*]      String charset = "UTF-8";
[*]      if( method instanceof HttpMethodBase ) {
[*]      charset = ((HttpMethodBase)method).getResponseCharSet();
[*]      }
[*]      InputStream respBody = method.getResponseBodyAsStream();
[*]      // Jakarta Commons HTTPClient doesn't handle any
[*]      // compression natively.Handle gzip or deflate
[*]      // here if applicable.
[*]      if( _allowCompression ) {
[*]      Header contentEncodingHeader = method.getResponseHeader( "Content-Encoding" );
[*]      if( contentEncodingHeader != null ) {
[*]          String contentEncoding = contentEncodingHeader.getValue();
[*]          if( contentEncoding.contains( "gzip" ) ) {
[*]            //log.debug( "wrapping response in GZIPInputStream" );
[*]            respBody = new GZIPInputStream( respBody );
[*]          }
[*]          else if( contentEncoding.contains( "deflate" ) ) {
[*]            //log.debug( "wrapping response in InflaterInputStream" );
[*]            respBody = new InflaterInputStream(respBody);
[*]          }
[*]      }
[*]      else {
[*]          Header contentTypeHeader = method.getResponseHeader( "Content-Type" );
[*]          if( contentTypeHeader != null ) {
[*]            String contentType = contentTypeHeader.getValue();
[*]            if( contentType != null ) {
[*]            if( contentType.startsWith( "application/x-gzip-compressed" ) ) {
[*]                //log.debug( "wrapping response in GZIPInputStream" );
[*]                respBody = new GZIPInputStream( respBody );
[*]            }
[*]            else if ( contentType.startsWith("application/x-deflate") ) {
[*]                //log.debug( "wrapping response in InflaterInputStream" );
[*]                respBody = new InflaterInputStream(respBody);
[*]            }
[*]            }
[*]          }
[*]      }
[*]      }
[*]      return processor.processResponse(respBody, charset);
[*]    }
[*]    catch (HttpException e) {
[*]      throw new SolrServerException( e );
[*]    }
[*]    catch (IOException e) {
[*]      throw new SolrServerException( e );
[*]    }
[*]    finally {
[*]      method.releaseConnection();
[*]      if(is != null) {
[*]      is.close();
[*]      }
[*]    }
[*]}


页: [1]
查看完整版本: solr创建索引源码解析