设为首页 收藏本站
查看: 848|回复: 0

[经验分享] Solr 学习(3) —-Solr 数据导入 <一>DIH简单使用

[复制链接]

尚未签到

发表于 2016-12-15 10:47:00 | 显示全部楼层 |阅读模式
  
使用DataImportHandler进行简单数据导入还是比较有效的,特别是DIH中针对简单的数据库表,可以把完全导入和增量导入合并成一个语句,非常方便。我的使用方式如下所示
  1。配置schema


<requestHandler name="/dataimport" class="org.apache.solr.handler.dataimport.DataImportHandler">
<lst name="defaults">
<str name="config">/home/tomcat/bin/solr/conf/data-config.xml</str>
</lst>
</requestHandler>
   2.添加data-config文件

  data-config.xml

<dataConfig>
<dataSource type="JdbcDataSource"
driver="com.mysql.jdbc.Driver"
url="jdbc:mysql://127.0.0.1/db"
user="root"
password="pass"
batchSize="-1"/>
<document>
<entity name="id" pk="id"  
query="select id,username,text,cat  from hot where '${dataimporter.request.clean}' != 'false' OR timestamp > '${dataimporter.last_index_time}'">
<field column="id" name="id"/>
<field column="text" name="text"/>
<field column="username" name="username_s"/>
<field column="cat" name="cat_t"/>
</entity>
</document>
</dataConfig>
  3.让DIH周期性的运行

  
修改dataimport.properties文件,这个是自动生成的,同在solr/conf下,添加参数
  interval 间隔时间 单位 分钟
  syncEnabled=1 打开周期运行
  params 其实就是具体调用的url,周期运行就是周期性的访问一个url

#Wed Dec 28 09:29:42 UTC 2011
port=8983
interval=5
last_index_time=2011-12-28 09\:29\:26
syncEnabled=1
webapp=solr
id.last_index_time=2011-12-28 09\:29\:26
server=127.0.0.1
params=/select?qt\=/dataimport&command\=full-import&clean\=false&commit\=true&optimize\=false
   到此还并不能周期运行,在solr的wiki中有一段实现这个功能的代码,但并没有加入到solr的发行包中,于是我们需要重新编译这段代码,打包放到webapp/solr/WEB-INF/lib中才行

<web-app>
<listener>
<listener-class>org.apache.solr.handler.dataimport.scheduler.ApplicationListener</listener-class>
</listener>
...
</web-app>
   以下是solr wiki上周期运行的代码,我已打好包,放在附件里。

package org.apache.solr.handler.dataimport.scheduler;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Properties;
import org.apache.solr.core.SolrResourceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SolrDataImportProperties {
private Properties properties;
public static final String SYNC_ENABLED         = "syncEnabled";
public static final String SYNC_CORES           = "syncCores";
public static final String SERVER               = "server";
public static final String PORT                 = "port";
public static final String WEBAPP               = "webapp";
public static final String PARAMS               = "params";
public static final String INTERVAL             = "interval";
private static final Logger logger = LoggerFactory.getLogger(SolrDataImportProperties.class);
public SolrDataImportProperties(){
//              loadProperties(true);
}
public void loadProperties(boolean force){
try{
SolrResourceLoader loader = new SolrResourceLoader(null);
logger.info("Instance dir = " + loader.getInstanceDir());
String configDir = loader.getConfigDir();
configDir = SolrResourceLoader.normalizeDir(configDir);
if(force || properties == null){
properties = new Properties();
String dataImportPropertiesPath = configDir + "\\dataimport.properties";
FileInputStream fis = new FileInputStream(dataImportPropertiesPath);
properties.load(fis);
}
}catch(FileNotFoundException fnfe){
logger.error("Error locating DataImportScheduler dataimport.properties file", fnfe);
}catch(IOException ioe){
logger.error("Error reading DataImportScheduler dataimport.properties file", ioe);
}catch(Exception e){
logger.error("Error loading DataImportScheduler properties", e);
}
}
public String getProperty(String key){
return properties.getProperty(key);
}
}
 
package org.apache.solr.handler.dataimport.scheduler;
import java.util.Calendar;
import java.util.Date;
import java.util.Timer;
import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ApplicationListener implements ServletContextListener {
private static final Logger logger = LoggerFactory.getLogger(ApplicationListener.class);
@Override
public void contextDestroyed(ServletContextEvent servletContextEvent) {
ServletContext servletContext = servletContextEvent.getServletContext();
// get our timer from the context
Timer timer = (Timer)servletContext.getAttribute("timer");
// cancel all active tasks in the timers queue
if (timer != null)
timer.cancel();
// remove the timer from the context
servletContext.removeAttribute("timer");
}
@Override
public void contextInitialized(ServletContextEvent servletContextEvent) {
ServletContext servletContext = servletContextEvent.getServletContext();
try{
// create the timer and timer task objects
Timer timer = new Timer();
HTTPPostScheduler task = new HTTPPostScheduler(servletContext.getServletContextName(), timer);
// get our interval from HTTPPostScheduler
int interval = task.getIntervalInt();
// get a calendar to set the start time (first run)
Calendar calendar = Calendar.getInstance();
// set the first run to now + interval (to avoid fireing while the app/server is starting)
calendar.add(Calendar.MINUTE, interval);
Date startTime = calendar.getTime();
// schedule the task
timer.scheduleAtFixedRate(task, startTime, 1000 * 60 * interval);
// save the timer in context
servletContext.setAttribute("timer", timer);
} catch (Exception e) {
if(e.getMessage().endsWith("disabled")){
logger.info("Schedule disabled");
}else{
logger.error("Problem initializing the scheduled task: ", e);
}
}
}
}
 
package org.apache.solr.handler.dataimport.scheduler;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HTTPPostScheduler extends TimerTask {
private String syncEnabled;
private String[] syncCores;
private String server;
private String port;
private String webapp;
private String params;
private String interval;
private String cores;
private SolrDataImportProperties p;
private boolean singleCore;
private static final Logger logger = LoggerFactory.getLogger(HTTPPostScheduler.class);
public HTTPPostScheduler(String webAppName, Timer t) throws Exception{
//load properties from global dataimport.properties
p = new SolrDataImportProperties();
reloadParams();
fixParams(webAppName);
if(!syncEnabled.equals("1")) throw new Exception("Schedule disabled");
if(syncCores == null || (syncCores.length == 1 && syncCores[0].isEmpty())){
singleCore = true;
logger.info("<index update process> Single core identified in dataimport.properties");
}else{
singleCore = false;
logger.info("<index update process> Multiple cores identified in dataimport.properties. Sync active for: " + cores);
}
}
private void reloadParams(){
p.loadProperties(true);
syncEnabled = p.getProperty(SolrDataImportProperties.SYNC_ENABLED);
cores           = p.getProperty(SolrDataImportProperties.SYNC_CORES);
server          = p.getProperty(SolrDataImportProperties.SERVER);
port            = p.getProperty(SolrDataImportProperties.PORT);
webapp          = p.getProperty(SolrDataImportProperties.WEBAPP);
params          = p.getProperty(SolrDataImportProperties.PARAMS);
interval        = p.getProperty(SolrDataImportProperties.INTERVAL);
syncCores       = cores != null ? cores.split(",") : null;
}
private void fixParams(String webAppName){
if(server == null || server.isEmpty())  server = "localhost";
if(port == null || port.isEmpty())              port = "8080";
if(webapp == null || webapp.isEmpty())  webapp = webAppName;
if(interval == null || interval.isEmpty() || getIntervalInt() <= 0) interval = "30";
}
public void run() {
try{
// check mandatory params
if(server.isEmpty() || webapp.isEmpty() || params == null || params.isEmpty()){
logger.warn("<index update process> Insuficient info provided for data import");
logger.info("<index update process> Reloading global dataimport.properties");
reloadParams();
// single-core
}else if(singleCore){
prepUrlSendHttpPost();
// multi-core
}else if(syncCores.length == 0 || (syncCores.length == 1 && syncCores[0].isEmpty())){
logger.warn("<index update process> No cores scheduled for data import");
logger.info("<index update process> Reloading global dataimport.properties");
reloadParams();
}else{
for(String core : syncCores){
prepUrlSendHttpPost(core);
}
}
}catch(Exception e){
logger.error("Failed to prepare for sendHttpPost", e);
reloadParams();
}
}

private void prepUrlSendHttpPost(){
String coreUrl = "http://" + server + ":" + port + "/" + webapp + params;
sendHttpPost(coreUrl, null);
}
private void prepUrlSendHttpPost(String coreName){
String coreUrl = "http://" + server + ":" + port + "/" + webapp + "/" + coreName + params;
sendHttpPost(coreUrl, coreName);
}

private void sendHttpPost(String completeUrl, String coreName){
DateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss SSS");
Date startTime = new Date();
// prepare the core var
String core = coreName == null ? "" : "[" + coreName + "] ";
logger.info(core + "<index update process> Process started at .............. " + df.format(startTime));
try{
URL url = new URL(completeUrl);
HttpURLConnection conn = (HttpURLConnection)url.openConnection();
conn.setRequestMethod("POST");
conn.setRequestProperty("type", "submit");
conn.setDoOutput(true);
// Send HTTP POST
conn.connect();
logger.info(core + "<index update process> Request method\t\t\t" + conn.getRequestMethod());
logger.info(core + "<index update process> Succesfully connected to server\t" + server);
logger.info(core + "<index update process> Using port\t\t\t" + port);
logger.info(core + "<index update process> Application name\t\t\t" + webapp);
logger.info(core + "<index update process> URL params\t\t\t" + params);
logger.info(core + "<index update process> Full URL\t\t\t\t" + conn.getURL());
logger.info(core + "<index update process> Response message\t\t\t" + conn.getResponseMessage());
logger.info(core + "<index update process> Response code\t\t\t" + conn.getResponseCode());
//listen for change in properties file if an error occurs
if(conn.getResponseCode() != 200){
reloadParams();
}
conn.disconnect();
logger.info(core + "<index update process> Disconnected from server\t\t" + server);
Date endTime = new Date();
logger.info(core + "<index update process> Process ended at ................ " + df.format(endTime));
}catch(MalformedURLException mue){
logger.error("Failed to assemble URL for HTTP POST", mue);
}catch(IOException ioe){
logger.error("Failed to connect to the specified URL while trying to send HTTP POST", ioe);
}catch(Exception e){
logger.error("Failed to send HTTP POST", e);
}
}
public int getIntervalInt() {
try{
return Integer.parseInt(interval);
}catch(NumberFormatException e){
logger.warn("Unable to convert 'interval' to number. Using default value (30) instead", e);
return 30; //return default in case of error
}
}
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-314678-1-1.html 上篇帖子: 分布式搜索方案选型之一:Solr 下篇帖子: Solr技术在电商中的应用
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表