设为首页 收藏本站
查看: 950|回复: 0

[经验分享] solr3.6实时索引定时器实现

[复制链接]

尚未签到

发表于 2015-11-12 07:45:55 | 显示全部楼层 |阅读模式
感谢有奉献精神的人

转自:http://www.cnblogs.com/chenying99/archive/2012/08/04/2622879.html

企业要求数据表的数据更新后能够实时的被搜索引擎搜索到,查找solr的DataImport的文档提到了一个定时器实现这种实时要求的解决方案
实现方法:
1 配置监听器
web.xml
<listener>
        <listener-class>
                org.apache.solr.handler.dataimport.scheduler.ApplicationListener
        </listener-class>       </listener>
2 引入jar文件
注:如果用的是jre6, 官方下载的jar文件要重新编译,貌&#20284;是版本不兼容
jar文件包括三个类
(1) 监听器ApplicationListener
package org.apache.solr.handler.dataimport.scheduler;
DSC0000.gif
import java.util.Calendar;
import java.util.Date;
import java.util.Timer;

import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ApplicationListener implements ServletContextListener {

        private static final Logger logger = LoggerFactory.getLogger(ApplicationListener.class);

        @Override
        public void contextDestroyed(ServletContextEvent servletContextEvent) {
                ServletContext servletContext = servletContextEvent.getServletContext();

                // get our timer from the context
                Timer timer = (Timer)servletContext.getAttribute(&quot;timer&quot;);

                // cancel all active tasks in the timers queue
                if (timer != null)
                        timer.cancel();

                // remove the timer from the context
                servletContext.removeAttribute(&quot;timer&quot;);

        }

        @Override
        public void contextInitialized(ServletContextEvent servletContextEvent) {
                ServletContext servletContext = servletContextEvent.getServletContext();
                try{
                        // create the timer and timer task objects
                        Timer timer = new Timer();
                        HTTPPostScheduler task = new HTTPPostScheduler(servletContext.getServletContextName(), timer);

                        // get our interval from HTTPPostScheduler
                        int interval = task.getIntervalInt();

                        // get a calendar to set the start time (first run)
                        Calendar calendar = Calendar.getInstance();

                        // set the first run to now &#43; interval (to avoid fireing while the app/server is starting)
                        calendar.add(Calendar.MINUTE, interval);
                        Date startTime = calendar.getTime();

                        // schedule the task
                        timer.scheduleAtFixedRate(task, startTime, 1000 * 60 * interval);

                        // save the timer in context
                        servletContext.setAttribute(&quot;timer&quot;, timer);

                } catch (Exception e) {
                        if(e.getMessage().endsWith(&quot;disabled&quot;)){
                                logger.info(&quot;Schedule disabled&quot;);
                        }else{
                                logger.error(&quot;Problem initializing the scheduled task: &quot;, e);
                        }
                }
        }

}(2)定时任务HttpPostScheduler
package org.apache.solr.handler.dataimport.scheduler;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class HTTPPostScheduler extends TimerTask {
        private String syncEnabled;
        private String[] syncCores;
        private String server;
        private String port;
        private String webapp;
        private String params;
        private String interval;
        private String cores;
        private SolrDataImportProperties p;
        private boolean singleCore;

        private static final Logger logger = LoggerFactory.getLogger(HTTPPostScheduler.class);

        public HTTPPostScheduler(String webAppName, Timer t) throws Exception{
                //load properties from global dataimport.properties
                p = new SolrDataImportProperties();
                reloadParams();
                fixParams(webAppName);

                if(!syncEnabled.equals(&quot;1&quot;)) throw new Exception(&quot;Schedule disabled&quot;);

                if(syncCores == null || (syncCores.length == 1 && syncCores[0].isEmpty())){
                        singleCore = true;
                        logger.info(&quot;<index update process> Single core identified in dataimport.properties&quot;);
                }else{
                        singleCore = false;
                        logger.info(&quot;<index update process> Multiple cores identified in dataimport.properties. Sync active for: &quot; &#43; cores);
                }
        }

        private void reloadParams(){
                p.loadProperties(true);
                syncEnabled = p.getProperty(SolrDataImportProperties.SYNC_ENABLED);
                cores           = p.getProperty(SolrDataImportProperties.SYNC_CORES);
                server          = p.getProperty(SolrDataImportProperties.SERVER);
                port            = p.getProperty(SolrDataImportProperties.PORT);
                webapp          = p.getProperty(SolrDataImportProperties.WEBAPP);
                params          = p.getProperty(SolrDataImportProperties.PARAMS);
                interval        = p.getProperty(SolrDataImportProperties.INTERVAL);
                syncCores       = cores != null ? cores.split(&quot;,&quot;) : null;
        }

        private void fixParams(String webAppName){
                if(server == null || server.isEmpty())  server = &quot;localhost&quot;;
                if(port == null || port.isEmpty())              port = &quot;8080&quot;;
                if(webapp == null || webapp.isEmpty())  webapp = webAppName;
                if(interval == null || interval.isEmpty() || getIntervalInt() <= 0) interval = &quot;30&quot;;
        }

        public void run() {
                try{
                        // check mandatory params
                        if(server.isEmpty() || webapp.isEmpty() || params == null || params.isEmpty()){
                                logger.warn(&quot;<index update process> Insuficient info provided for data import&quot;);
                                logger.info(&quot;<index update process> Reloading global dataimport.properties&quot;);
                                reloadParams();

                        // single-core
                        }else if(singleCore){
                                prepUrlSendHttpPost();

                        // multi-core
                        }else if(syncCores.length == 0 || (syncCores.length == 1 && syncCores[0].isEmpty())){
                                logger.warn(&quot;<index update process> No cores scheduled for data import&quot;);
                                logger.info(&quot;<index update process> Reloading global dataimport.properties&quot;);
                                reloadParams();

                        }else{
                                for(String core : syncCores){
                                        prepUrlSendHttpPost(core);
                                }
                        }
                }catch(Exception e){
                        logger.error(&quot;Failed to prepare for sendHttpPost&quot;, e);
                        reloadParams();
                }
        }


        private void prepUrlSendHttpPost(){
                String coreUrl = &quot;http://&quot; &#43; server &#43; &quot;:&quot; &#43; port &#43; &quot;/&quot; &#43; webapp &#43; params;
                sendHttpPost(coreUrl, null);
        }

        private void prepUrlSendHttpPost(String coreName){
                String coreUrl = &quot;http://&quot; &#43; server &#43; &quot;:&quot; &#43; port &#43; &quot;/&quot; &#43; webapp &#43; &quot;/&quot; &#43; coreName &#43; params;
                sendHttpPost(coreUrl, coreName);
        }


        private void sendHttpPost(String completeUrl, String coreName){
                DateFormat df = new SimpleDateFormat(&quot;dd.MM.yyyy HH:mm:ss SSS&quot;);
                Date startTime = new Date();

                // prepare the core var
                String core = coreName == null ? &quot;&quot; : &quot;[&quot; &#43; coreName &#43; &quot;] &quot;;

                logger.info(core &#43; &quot;<index update process> Process started at .............. &quot; &#43; df.format(startTime));

                try{

                    URL url = new URL(completeUrl);
                    HttpURLConnection conn = (HttpURLConnection)url.openConnection();

                    conn.setRequestMethod(&quot;POST&quot;);
                    conn.setRequestProperty(&quot;type&quot;, &quot;submit&quot;);
                    conn.setDoOutput(true);

                        // Send HTTP POST
                    conn.connect();

                    logger.info(core &#43; &quot;<index update process> Request method\t\t\t&quot; &#43; conn.getRequestMethod());
                    logger.info(core &#43; &quot;<index update process> Succesfully connected to server\t&quot; &#43; server);
                    logger.info(core &#43; &quot;<index update process> Using port\t\t\t&quot; &#43; port);
                    logger.info(core &#43; &quot;<index update process> Application name\t\t\t&quot; &#43; webapp);
                    logger.info(core &#43; &quot;<index update process> URL params\t\t\t&quot; &#43; params);
                    logger.info(core &#43; &quot;<index update process> Full URL\t\t\t\t&quot; &#43; conn.getURL());
                    logger.info(core &#43; &quot;<index update process> Response message\t\t\t&quot; &#43; conn.getResponseMessage());
                    logger.info(core &#43; &quot;<index update process> Response code\t\t\t&quot; &#43; conn.getResponseCode());

                    //listen for change in properties file if an error occurs
                    if(conn.getResponseCode() != 200){
                        reloadParams();
                    }

                    conn.disconnect();
                    logger.info(core &#43; &quot;<index update process> Disconnected from server\t\t&quot; &#43; server);
                    Date endTime = new Date();
                    logger.info(core &#43; &quot;<index update process> Process ended at ................ &quot; &#43; df.format(endTime));
                }catch(MalformedURLException mue){
                        logger.error(&quot;Failed to assemble URL for HTTP POST&quot;, mue);
                }catch(IOException ioe){
                        logger.error(&quot;Failed to connect to the specified URL while trying to send HTTP POST&quot;, ioe);
                }catch(Exception e){
                        logger.error(&quot;Failed to send HTTP POST&quot;, e);
                }
        }

        public int getIntervalInt() {
                try{
                        return Integer.parseInt(interval);
                }catch(NumberFormatException e){
                        logger.warn(&quot;Unable to convert 'interval' to number. Using default value (30) instead&quot;, e);
                        return 30; //return default in case of error
                }
        }}
(3) 属性文件类SolrDataImportProperties
package org.apache.solr.handler.dataimport.scheduler;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Properties;

import org.apache.solr.core.SolrResourceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SolrDataImportProperties {
        private Properties properties;

        public static final String SYNC_ENABLED         = &quot;syncEnabled&quot;;
        public static final String SYNC_CORES           = &quot;syncCores&quot;;
        public static final String SERVER               = &quot;server&quot;;
        public static final String PORT                 = &quot;port&quot;;
        public static final String WEBAPP               = &quot;webapp&quot;;
        public static final String PARAMS               = &quot;params&quot;;
        public static final String INTERVAL             = &quot;interval&quot;;

        private static final Logger logger = LoggerFactory.getLogger(SolrDataImportProperties.class);

        public SolrDataImportProperties(){
//              loadProperties(true);
        }

        public void loadProperties(boolean force){
                try{
                        SolrResourceLoader loader = new SolrResourceLoader(null);
                        logger.info(&quot;Instance dir = &quot; &#43; loader.getInstanceDir());

                        String configDir = loader.getConfigDir();
                        configDir = SolrResourceLoader.normalizeDir(configDir);
                        if(force || properties == null){
                                properties = new Properties();

                                String dataImportPropertiesPath = configDir &#43; &quot;\\dataimport.properties&quot;;

                                FileInputStream fis = new FileInputStream(dataImportPropertiesPath);
                                properties.load(fis);
                        }
                }catch(FileNotFoundException fnfe){
                        logger.error(&quot;Error locating DataImportScheduler dataimport.properties file&quot;, fnfe);
                }catch(IOException ioe){
                        logger.error(&quot;Error reading DataImportScheduler dataimport.properties file&quot;, ioe);
                }catch(Exception e){
                        logger.error(&quot;Error loading DataImportScheduler properties&quot;, e);
                }
        }

        public String getProperty(String key){
                return properties.getProperty(key);
        }}
3 在solr.home的文件夹下建立conf 文件夹
属性文件dataimport.properties放在该文件夹下
我的属性文件内容配置如下
#################################################
#                                               #
#       dataimport scheduler properties         #
#                                               #
#################################################

#  to sync or not to sync
#  1 - active; anything else - inactive
syncEnabled=1

#  which cores to schedule
#  in a multi-core environment you can decide which cores you want syncronized
#  leave empty or comment it out if using single-core deployment
syncCores=core0

#  solr server name or IP address
#  [defaults to localhost if empty]
server=localhost

#  solr server port
#  [defaults to 80 if empty]
port=8080

#  application name/context
#  [defaults to current ServletContextListener's context (app) name]
webapp=solr

#  URL params [mandatory]
#  remainder of URL
params=/dataimport?command=delta-import&clean=false&commit=true

#  params=/dataimport?command=delta-import&clean=false&commit=true

#  schedule interval
#  number of minutes between two runs
#  [defaults to 30 if empty]interval=10



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-138084-1-1.html 上篇帖子: solr学习之二--------添加文档(Add Document) 下篇帖子: 提高solr的搜索速度
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表