设为首页 收藏本站
查看: 1020|回复: 0

[经验分享] 使用Coprocessor实现hbase+solr数据交互

[复制链接]

尚未签到

发表于 2015-7-17 10:14:53 | 显示全部楼层 |阅读模式
  HBase和Solr可以通过协处理器 Coprocessor 的方式向Solr发出请求,Solr对于接收到的数据可以做相关的同步:增、删、改索引的操作。使用solr作为hbase的二级索引,构建基于solr+hbase的快速多条件复杂查询。
  查询时,先根据条件在solr中查找符合条件的rowkey,再根据rowkey从hbase中取数据,根据测试,分页查询时基本可以实现ms级的快速查询。

1. 编写SolrIndexCoprocessorObserver代码


DSC0000.gif DSC0001.gif


package cn.ac.ict.solr.server;
import cn.ac.ict.solr.utils.SolrWriter;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* 监听HBase,一有数据postPut就向Solr发送
* hbase有两种coprocessor,一种是Observer(观察者),类似于关系数据库的trigger(触发器)
* 另外一种就是EndPoint,类似于关系数据库的存储过程
* 使用solrwrite进行写数据
* User: zhaop
* Date: 15-4-7
* Time: 下午2:16
*/
public class SolrIndexCoprocessorObserver extends BaseRegionObserver{
private static final Logger logger = LoggerFactory.getLogger(SolrIndexCoprocessorObserver.class);
@Override
public void postPut(ObserverContext e, Put put,WALEdit edit, Durability durability) throws IOException {
logger.info("postPut 向solr中插入数据");
inputSolr(put);
}
@Override
public void postDelete(ObserverContext e, Delete delete,WALEdit edit,Durability durability) throws IOException {
String rowKey = Bytes.toString(delete.getRow());
try {
logger.info("postDelete 删除solr中的数据");
SolrWriter solrWriter = new SolrWriter();
solrWriter.deleteDoc(rowKey);
} catch (Exception ex){
logger.info("postDelete delete rowKey = "+rowKey+" from solr fail:"+ex.getMessage());
logger.error(ex.getMessage(),ex);
}
}

public void inputSolr(Put put) {
String rowKey = Bytes.toString(put.getRow());
try {
Cell cell_did = put.get(Bytes.toBytes("values"), Bytes.toBytes("did")).get(0);
String did = new String(CellUtil.cloneValue(cell_did));
Cell cell_dvid = put.get(Bytes.toBytes("values"), Bytes.toBytes("dvid")).get(0);
String dvid = new String(CellUtil.cloneValue(cell_dvid));
Cell cell_value= put.get(Bytes.toBytes("values"), Bytes.toBytes("value")).get(0);
String value = new String(CellUtil.cloneValue(cell_value));
Cell cell_timestamp = put.get(Bytes.toBytes("values"), Bytes.toBytes("timestamp")).get(0);
String timestamp = new String(CellUtil.cloneValue(cell_timestamp));
Cell cell_model = put.get(Bytes.toBytes("values"), Bytes.toBytes("model")).get(0);
String model = new String(CellUtil.cloneValue(cell_model));
SolrInputDocument doc = new SolrInputDocument();
doc.addField("rowkey", rowKey);
doc.addField("did", did);
doc.addField("dvid", dvid);
doc.addField("value", value);
doc.addField("timestamp", timestamp);
doc.addField("model", model);
SolrWriter.addDocToCache(doc);
logger.info("postPut 向solr缓存中插入数据成功,rowKey = "+rowKey);
} catch (Exception e) {
logger.info("postPut write rowKey = "+rowKey+" to solr fail:"+e.getMessage());
logger.error(e.getMessage(),e);
}
}
}
View Code




package cn.ac.ict.solr.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.Vector;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 向sola中写数据,每隔一段时间
* User: zhaop
* Date: 15-4-9
* Time: 下午8:50
*/
public class SolrWriter {
private static final Logger logger = LoggerFactory.getLogger(SolrWriter.class);
public static String urlSolr = "";     //solr地址
private static String defaultCollection = "";  //默认collection
private static int zkClientTimeOut = 0;//zk客户端请求超时间
private static int zkConnectTimeOut = 0;//zk客户端连接超时间
private static CloudSolrClient cloudSolrClient = null;
private static int maxCacheCount = 0;   //缓存大小,当达到该上限时提交
private static Vector cache = null;   //缓存 此处缓存对象可以改为 SolrInputDocument更具通用性
public static Lock commitLock = new ReentrantLock();  //在添加缓存或进行提交时加锁
private static int maxCommitTime = 60; //最大提交时间,s
static {
Configuration conf = HBaseConfiguration.create();
urlSolr = conf.get("hbase.solr.zklist", "192.168.0.177:2181");
defaultCollection = conf.get("hbase.solr.collection", "dev_values");
zkClientTimeOut = conf.getInt("hbase.solr.zkClientTimeOut", 10000);
zkConnectTimeOut = conf.getInt("hbase.solr.zkConnectTimeOut", 10000);
maxCacheCount = conf.getInt("hbase.solr.maxCacheCount", 10000);
maxCommitTime = conf.getInt("hbase.solr.maxCommitTime", 1);
logger.info("solr init param " + urlSolr + "  " + defaultCollection + "  " + zkClientTimeOut + "  " + zkConnectTimeOut + "  " + maxCacheCount + "  " + maxCommitTime);
try {
cache = new Vector(maxCacheCount);
cloudSolrClient = new CloudSolrClient(urlSolr);
cloudSolrClient.setDefaultCollection(defaultCollection);
cloudSolrClient.setZkClientTimeout(zkClientTimeOut);
cloudSolrClient.setZkConnectTimeout(zkConnectTimeOut);
//启动定时任务,第一次延迟1s执行,之后每隔指定时间执行一次
Timer timer = new Timer();
timer.schedule(new CommitTimer(), 1 * 1000, maxCommitTime * 1000);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
/**
* 批量提交
*/
public void inputDoc(List docList) throws IOException, SolrServerException {
if (docList == null || docList.size() == 0) {
return;
}
/*List doclist = new ArrayList(deviceDataList.size());
for (DeviceData dd : deviceDataList) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("rowkey", dd.getRowkey());
doc.addField("did", dd.getDid());
doc.addField("dvid", dd.getDvid());
doc.addField("value", dd.getValue());
doc.addField("timestamp", dd.getTimestamp());
doc.addField("model", dd.getModel());
doclist.add(doc);
}*/
cloudSolrClient.add(docList);
cloudSolrClient.commit();
}
/**
* 单条提交
*/
public void inputDoc(SolrInputDocument doc) throws IOException, SolrServerException {
if (doc == null) {
return;
}
cloudSolrClient.add(doc);
cloudSolrClient.commit();
}
public void deleteDoc(List rowkeys) throws IOException, SolrServerException {
if (rowkeys == null || rowkeys.size() == 0) {
return;
}
cloudSolrClient.deleteById(rowkeys);
cloudSolrClient.commit();
}
public void deleteDoc(String rowkey) throws IOException, SolrServerException {
cloudSolrClient.deleteById(rowkey);
cloudSolrClient.commit();
}
/**
* 添加记录到cache,如果cache达到maxCacheCount,则提交
*/
public static void addDocToCache(SolrInputDocument doc) {
commitLock.lock();
try {
cache.add(doc);
logger.info("cache commit maxCacheCount:" + maxCacheCount);
logger.info("cache size:" + cache.size());
if (cache.size() >= maxCacheCount) { //cache满则提交
logger.info("cache commit, count:" + cache.size());
new SolrWriter().inputDoc(cache);
cache.clear();
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
commitLock.unlock();
}
}
/**
* 提交定时器
*/
static class CommitTimer extends TimerTask {
@Override
public void run() {
commitLock.lock();
try {
if (cache.size() > 0) { //cache中有数据则提交
logger.info("timer commit, count:" + cache.size());
new SolrWriter().inputDoc(cache);
cache.clear();
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
commitLock.unlock();
}
}
}
}
View Code

2. 打成jar包上传到hadoop中

  目录为hdfs:///lib/coprocessor-solr-1.0-SNAPSHOT.jar



进入hadoop bin目录
创建lib目录
hadoop fs -mkdir /lib
上传文件
hadoop fs -put coprocessor-solr-1.0-SNAPSHOT.jar /lib
查看是否已存在
hadoop fs -lsr /lib

3. hbase shell中添加coprocessor



对表增加coprocessor
disable 'dev_values'
alter 'dev_values', METHOD => 'table_att', 'coprocessor'=>'hdfs:///lib/coprocessor-solr-1.0-SNAPSHOT.jar|cn.ac.ict.solr.server.SolrIndexCoprocessorObserver|1001|'
enable 'dev_values'
查看是否已添加成功
describe 'dev_values'
'dev_values', {TABLE_ATTRIBUTES => {coprocessor$2 => 'hdfs:///lib/coprocessor-solr-1.0-SNAPSHOT.jar|cn.ac.ict.solr.server. true                                                               
SolrIndexCoprocessorObserver|1001|'}, {NAME => 'values', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_                                                                    
SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'false'                                                                    
, BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}  

4. 大功告成,进行测试

  向hbase中插入一条数据,查看日志是否有记录,solr中查看数据是否已存在

5.hbase shell 删除coprocessor



disable 'dev_values'
alter 'dev_values',METHOD => 'table_att_unset',NAME =>'coprocessor$1'
enable 'dev_values'
  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-87563-1-1.html 上篇帖子: solr DIH 知识梳理 下篇帖子: solr加ik分词器
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表