solr创建索引源码解析
先说下创建索引源码流程:源码类:
1.CommonHttpSolrServer (SolrServer的子类)
2.SolrServer(抽象类)
3.SolrRequest (基类)
4.AbstractUpdateRequest (抽象类、SolrRequest的子类)
5.UpdateRequest(AbstractUpdateRequest的子类)
6.SolrInputDocument(设置需要索引的名称和值、这个应该放在第一位)
创建索引代码:
[*] 查询数据库数据,或者其他文档数据进行索引
view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg
[*] private void updateBook(String sql, String url, String idColumn,
[*] String timeColumn,BufferedWriter dataFile) throws Exception {
[*] long start = System.currentTimeMillis();
[*] SolrUtil solrUtil = new SolrUtil(url);//初始化索引
[*] SolrDocument doc = SqlSh.getSolrMaxDoc(solrUtil, idColumn, timeColumn);
[*] if (doc == null) {
[*] CommonLogger.getLogger().error("solr no data.");
[*] return;
[*] }
[*] int maxId = Integer.parseInt(doc.get(idColumn).toString());
[*] long maxTime = Long.parseLong(doc.get(timeColumn).toString())*1000;
[*] Date maxDate = new Date(maxTime);
[*]
[*] DateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
[*] //获取数据库需要索引的数据
[*] ResultSet rs = stmt_m.executeQuery(String.format(sql,
[*] dateFormat2.format(maxDate)));
[*] //获取需要创建索引的key
[*] initColumeMeta(rs.getMetaData());
[*]
[*] //解析数据并索引
[*] parseRs(rs, solrUtil);
[*]
[*] rs.close();
[*]
[*] //优化索引
[*] solrUtil.server.optimize();
[*]
[*] CommonLogger.getLogger().info(
[*] "update book time:" + (System.currentTimeMillis() - start)
[*] / 1000 + "s");
[*] }
[*]咱们看下上面代码的parseRs方法
view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg
[*]//下面是简单的解析数据方法并写索引
view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg
[*] private void parseRs(ResultSet rs, SolrUtil solrUtil) throwsException {
[*] Collection docs=new ArrayList();
[*] SolrInputDocument doc = null;
[*] int locBk = 0;
[*] boolean flag=true;
[*] StringBuilder sb=null;
[*] String vl=null;
[*] try {
[*] while (rs.next()) {
[*] doc = new SolrInputDocument();
[*] for (int i = 0; i < ToolMain.columnNames.length; i++) {
[*] doc.addField(
[*] ToolMain.columnNames,
[*] getColumnValue(
[*] rs.getObject(ToolMain.columnNames),
[*] ToolMain.columnTypes));//此方法为设置一个域,可以添加一个参数来设置权重
[*] }
[*] docs.add(doc);
[*] locBk++;
[*] if (docs.size() >= 1000) {
[*] solrUtil.addDocList(docs);//创建索引和提交索引操作都在这里面
[*] docs.clear();
[*] }
[*] }
[*] if (docs.size() > 0) {
[*] solrUtil.addDocList(docs);
[*] docs.clear();
[*] }
[*] } catch (Exception e) {
[*] throw e;
[*] } finally {
[*] docs.clear();
[*] docs = null;
[*] }
[*] }
3.下面来说明下SolrUtil类,此类主要是封装了CommonHttpSolrServer
view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg
[*]import java.util.Collection;
[*]
[*]import log.CommonLogger;
[*]
[*]import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
[*]import org.apache.solr.common.SolrInputDocument;
[*]
[*]public class SolrUtil {
[*] public CommonsHttpSolrServer server = null;
[*]
[*] public String url = "";//url为solr服务的地址
[*] publicString shards = "";
[*]
[*] public SolrUtil(String url) {
[*] this.url = url;
[*] initSolr();
[*] }
[*] public SolrUtil(String url,String shards) {
[*] this.url = url;
[*] this.shards=shards;
[*] initSolr();
[*] }
[*] //初始化Server
[*] private void initSolr() {
[*] try {
[*] server = new CommonsHttpSolrServer(url);
[*] server.setSoTimeout(60*1000);
[*] server.setConnectionTimeout(60*1000);
[*] server.setDefaultMaxConnectionsPerHost(1000);
[*] server.setMaxTotalConnections(1000);
[*] server.setFollowRedirects(false);
[*] server.setAllowCompression(true);
[*] } catch (Exception e) {
[*] e.printStackTrace();
[*] System.exit(-1);
[*] }
[*] }
[*] //封装了add、commit
[*] public void addDocList(Collection docs) {
[*] try {
[*] server.add(docs);
[*] server.commit();
[*] docs.clear();//释放
[*] } catch (Exception e) {
[*] CommonLogger.getLogger().error("addDocList error.", e);
[*] }
[*] }
[*]
[*] public void deleteDocByQuery(String query) throws Exception {
[*] try {
[*] server.deleteByQuery(query);
[*] server.commit();
[*] } catch (Exception e) {
[*] CommonLogger.getLogger().error("deleteDocByQuery error.", e);
[*] throw e;
[*] }
[*] }
[*]}
4.现在来看看solr创建索引的源码
其实源码执行的操作无非是 生成请求request返回response
1.上面代码中的SolrInputDocument 类所做的操作
view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg
[*] public class SolrInputDocument implements Map, Iterable, Serializable //实现了Map和Iterable的接口并且实现了接口中的方法,其主要的类为SolrInputFiled类
view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg
[*] public class SolrInputField implements Iterable, Serializable //类中只有三个属性,String key,Object value,还包括评分float boost = 1.0f; 默认是1.0f(如果做权重的话可以设置这个值)
再来看下执行的CommonHttpSolrServer类所做的操作(表现形式在SolrUtil中的addDocList)
2.添加文档方法
view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg
[*]public UpdateResponse add(Collection docs ) throws SolrServerException, IOException {
view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg
[*] UpdateRequest req = new UpdateRequest();//创建一个request
view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg
[*] req.add(docs);//调用UpdateRequest的add方法,添加索引文档
[*] return req.process(this);//亲 重点是这个方法(返回的是response)
[*] }
view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg
[*] //再看下UpdateRequest的add方法
[*] private List documents = null;
[*] public UpdateRequest add( final Collection docs )
[*] {
[*] if( documents == null ) {
[*] documents = new ArrayList( docs.size()+1 );
[*] }
[*] documents.addAll( docs );
[*] return this;
[*] }
3.提交方法 commit,调用的是SolrServer类中的
view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg
[*] public UpdateResponse commit( boolean waitFlush, boolean waitSearcher ) throws Solr ServerException, IOException {
[*] return new UpdateRequest().setAction( UpdateRequest.ACTION.COMMIT, waitFlush, waitSearcher ).process( this );//看到了吗?
[*] setAction都是为了对对象ModifiableSolrParams(这个对象在最终CommonHttpSolrServerrequest的request方法中用的到)
[*] 在提交索引的时候也是调用的process方法
[*]}
4.优化索引
view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg
[*]public UpdateResponse optimize(boolean waitFlush, boolean waitSearcher, int maxSegments ) throws SolrServerException, IOException {
[*]sp; return new UpdateRequest().setAction( UpdateRequest.ACTION.OPTIMIZE, waitFlush, waitSearcher, maxSegments ).process( this );//同样调用process,通过setAction参数,在CommonHttpSolrServer类方法request()中主要执行的是合并和压缩setAction都是为了对对象ModifiableSolrParams(这个对象在最终CommonHttpSolrServer的request方法中用的到)进行赋值
[*]}
5.既然上面都提到了process方法,那我们来看看
view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg
[*]@Override
[*] public UpdateResponse process( SolrServer server ) throws SolrServerException, IOException
[*] {
[*] long startTime = System.currentTimeMillis();
[*] UpdateResponse res = new UpdateResponse();
[*] res.setResponse( server.request( this ) );//这里面这个方法可是重点之重啊,这是调用了 CommonHttpSolrServer类中的request方法
[*] res.setElapsedTime( System.currentTimeMillis()-startTime );
[*] return res;
[*] }
6.最终的方法是SolrServer的子类CommonHttpSolrServer类的request方法,咋再来看看这个方法是怎么工作的
view plaincopyhttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.iyunv.com/assets/ico_fork.svg
[*]public NamedList request(final SolrRequest request, ResponseParser processor ) throws SolrServerException, IOException {
[*]
[*] HttpMethod method = null;
[*] InputStream is = null;
[*] SolrParams params = request.getParams();
[*] Collection streams = requestWriter.getContentStreams(request);
[*] String path = requestWriter.getPath(request);
[*]
[*] //创建索引进来的是/update/select 为查询
[*] if( path == null || !path.startsWith( "/" ) ) {
[*] path = "/select";
[*] }
[*]
[*] ResponseParser parser = request.getResponseParser();
[*] if( parser == null ) {
[*] parser = _parser;
[*] }
[*]
[*] // The parser 'wt=' and 'version=' params are used instead of the original params
[*] ModifiableSolrParams wparams = new ModifiableSolrParams();
[*] wparams.set( CommonParams.WT, parser.getWriterType() );
[*] wparams.set( CommonParams.VERSION, parser.getVersion());
[*] if( params == null ) {
[*] params = wparams;
[*] }
[*] else {
[*] params = new DefaultSolrParams( wparams, params );
[*] }
[*]
[*] if( _invariantParams != null ) {
[*] params = new DefaultSolrParams( _invariantParams, params );
[*] }
[*]
[*] int tries = _maxRetries + 1;
[*] try {
[*] while( tries-- > 0 ) {
[*] // Note: since we aren't do intermittent time keeping
[*] // ourselves, the potential non-timeout latency could be as
[*] // much as tries-times (plus scheduling effects) the given
[*] // timeAllowed.
[*] try {//通过使用查看solr源码,在使用UpdateRequest对象时会自动设置为Post
[*] if( SolrRequest.METHOD.GET == request.getMethod() ) {
[*] if( streams != null ) {
[*] throw new SolrException( SolrException.ErrorCode.BAD_REQUEST, "GET can't send streams!" );
[*] }
[*] method = new GetMethod( _baseURL + path + ClientUtils.toQueryString( params, false ) );
[*] }
[*] else if( SolrRequest.METHOD.POST == request.getMethod() ) {//所以我们直接看
[*]
[*] String url = _baseURL + path;
[*] boolean isMultipart = ( streams != null && streams.size() > 1 );
[*]
[*] if (streams == null || isMultipart) {
[*] PostMethod post = new PostMethod(url);//设置post,包括request头部、内容、参数、等等一些操作
[*] post.getParams().setContentCharset("UTF-8");
[*] if (!this.useMultiPartPost && !isMultipart) {
[*] post.addRequestHeader("Content-Type",
[*] "application/x-www-form-urlencoded; charset=UTF-8");
[*] }
[*]
[*] List parts = new LinkedList();
[*] Iterator iter = params.getParameterNamesIterator();
[*] while (iter.hasNext()) {
[*] String p = iter.next();
[*] String[] vals = params.getParams(p);
[*] if (vals != null) {
[*] for (String v : vals) {
[*] if (this.useMultiPartPost || isMultipart) {
[*] parts.add(new StringPart(p, v, "UTF-8"));
[*] } else {
[*] post.addParameter(p, v);
[*] }
[*] }
[*] }
[*] }
[*]
[*] if (isMultipart) {
[*] int i = 0;
[*] for (ContentStream content : streams) {
[*] final ContentStream c = content;
[*]
[*] String charSet = null;
[*] PartSource source = new PartSource() {
[*] public long getLength() {
[*] return c.getSize();
[*] }
[*] public String getFileName() {
[*] return c.getName();
[*] }
[*] public InputStream createInputStream() throws IOException {
[*] return c.getStream();
[*] }
[*] };
[*]
[*] parts.add(new FilePart(c.getName(), source,
[*] c.getContentType(), charSet));
[*] }
[*] }
[*] if (parts.size() > 0) {
[*] post.setRequestEntity(new MultipartRequestEntity(parts
[*] .toArray(new Part), post.getParams()));
[*] }
[*]
[*] method = post;
[*] }
[*] // It is has one stream, it is the post body, put the params in the URL
[*] else {
[*] String pstr = ClientUtils.toQueryString(params, false);
[*] PostMethod post = new PostMethod(url + pstr);
[*]
[*] // Single stream as body
[*] // Using a loop just to get the first one
[*] final ContentStream[] contentStream = new ContentStream;
[*] for (ContentStream content : streams) {
[*] contentStream = content;
[*] break;
[*] }
[*] if (contentStream instanceof RequestWriter.LazyContentStream) {
[*] post.setRequestEntity(new RequestEntity() {
[*] public long getContentLength() {
[*] return -1;
[*] }
[*]
[*] public String getContentType() {
[*] return contentStream.getContentType();
[*] }
[*]
[*] public boolean isRepeatable() {
[*] return false;
[*] }
[*]
[*] public void writeRequest(OutputStream outputStream) throws IOException {
[*] ((RequestWriter.LazyContentStream) contentStream).writeTo(outputStream);
[*] }
[*] }
[*] );
[*]
[*] } else {
[*] is = contentStream.getStream();
[*] post.setRequestEntity(new InputStreamRequestEntity(is, contentStream.getContentType()));
[*] }
[*] method = post;
[*] }
[*] }
[*] else {
[*] throw new SolrServerException("Unsupported method: "+request.getMethod() );
[*] }
[*] }
[*] catch( NoHttpResponseException r ) {
[*] // This is generally safe to retry on
[*] method.releaseConnection();
[*] method = null;
[*] if(is != null) {
[*] is.close();
[*] }
[*] // If out of tries then just rethrow (as normal error).
[*] if( ( tries < 1 ) ) {
[*] throw r;
[*] }
[*] //log.warn( "Caught: " + r + ". Retrying..." );
[*] }
[*] }
[*] }
[*] catch( IOException ex ) {
[*] throw new SolrServerException("error reading streams", ex );
[*] }
[*]
[*] method.setFollowRedirects( _followRedirects );
[*] method.addRequestHeader( "User-Agent", AGENT );
[*] if( _allowCompression ) {
[*] method.setRequestHeader( new Header( "Accept-Encoding", "gzip,deflate" ) );
[*] }
[*]
[*] try {
[*] // Execute the method.
[*] //System.out.println( "EXECUTE:"+method.getURI() );
[*] //执行请求,返回状态码,然后组装response 最后返回
[*] int statusCode = _httpClient.executeMethod(method);
[*] if (statusCode != HttpStatus.SC_OK) {
[*] StringBuilder msg = new StringBuilder();
[*] msg.append( method.getStatusLine().getReasonPhrase() );
[*] msg.append( "\n\n" );
[*] msg.append( method.getStatusText() );
[*] msg.append( "\n\n" );
[*] msg.append( "request: "+method.getURI() );
[*] throw new SolrException(statusCode, java.net.URLDecoder.decode(msg.toString(), "UTF-8") );
[*] }
[*]
[*] // Read the contents
[*] String charset = "UTF-8";
[*] if( method instanceof HttpMethodBase ) {
[*] charset = ((HttpMethodBase)method).getResponseCharSet();
[*] }
[*] InputStream respBody = method.getResponseBodyAsStream();
[*] // Jakarta Commons HTTPClient doesn't handle any
[*] // compression natively.Handle gzip or deflate
[*] // here if applicable.
[*] if( _allowCompression ) {
[*] Header contentEncodingHeader = method.getResponseHeader( "Content-Encoding" );
[*] if( contentEncodingHeader != null ) {
[*] String contentEncoding = contentEncodingHeader.getValue();
[*] if( contentEncoding.contains( "gzip" ) ) {
[*] //log.debug( "wrapping response in GZIPInputStream" );
[*] respBody = new GZIPInputStream( respBody );
[*] }
[*] else if( contentEncoding.contains( "deflate" ) ) {
[*] //log.debug( "wrapping response in InflaterInputStream" );
[*] respBody = new InflaterInputStream(respBody);
[*] }
[*] }
[*] else {
[*] Header contentTypeHeader = method.getResponseHeader( "Content-Type" );
[*] if( contentTypeHeader != null ) {
[*] String contentType = contentTypeHeader.getValue();
[*] if( contentType != null ) {
[*] if( contentType.startsWith( "application/x-gzip-compressed" ) ) {
[*] //log.debug( "wrapping response in GZIPInputStream" );
[*] respBody = new GZIPInputStream( respBody );
[*] }
[*] else if ( contentType.startsWith("application/x-deflate") ) {
[*] //log.debug( "wrapping response in InflaterInputStream" );
[*] respBody = new InflaterInputStream(respBody);
[*] }
[*] }
[*] }
[*] }
[*] }
[*] return processor.processResponse(respBody, charset);
[*] }
[*] catch (HttpException e) {
[*] throw new SolrServerException( e );
[*] }
[*] catch (IOException e) {
[*] throw new SolrServerException( e );
[*] }
[*] finally {
[*] method.releaseConnection();
[*] if(is != null) {
[*] is.close();
[*] }
[*] }
[*]}
页:
[1]