设为首页 收藏本站
查看: 1464|回复: 0

[经验分享] HBase协处理器同步二级索引到Solr

[复制链接]

尚未签到

发表于 2017-12-18 21:14:38 | 显示全部楼层 |阅读模式
一、 背景  二、 什么是HBase的协处理器
  三、 HBase协处理器同步数据到Solr
  四、 添加协处理器
  五、 测试
  六、 协处理器动态加载
一、 背景
  在实际生产中,HBase往往不能满足多维度分析,我们能想到的办法就是通过创建HBase数据的二级索引来快速获取rowkey,从而得到想要的数据。目前比较流行的二级索引解决方案有Lily HBase Indexer,Phoenix自带的二级索引,华为Indexer,以及360的二级索引方案。上面的目前使用比较广泛的应该是Lily HBase Indexer,但是我们有时候只想实现一些简单的功能或者比较特殊的功能的时候,需要自己写协处理器进行处理。学习HBase的协处理器对于了解HBase架构是有帮助的。
二、 什么是HBase的协处理器
  协处理器分两种类型,系统协处理器可以全局导入region server上的所有数据表,表协处理器即是用户可以指定一张表使用协处理器。
  Hbase的coprocessor分为两类,Observer和EndPoint。其中Observer相当于触发器,EndPoint相当于存储过程。其中Observer的代码部署在服务端,相当于对API调用的代理。
  另一个是终端(endpoint),动态的终端有点像存储过程。
  
Observer
  观察者的设计意图是允许用户通过插入代码来重载协处理器框架的upcall方法,而具体的事件触发的callback方法由HBase的核心代码来执行。协处理器框架处理所有的callback调用细节,协处理器自身只需要插入添加或者改变的功能。以HBase0.92版本为例,它提供了三种观察者接口:

  • RegionObserver:提供客户端的数据操纵事件钩子:Get、Put、Delete、Scan等。
  • WALObserver:提供WAL相关操作钩子。
  • MasterObserver:提供DDL-类型的操作钩子。如创建、删除、修改数据表等。
  这些接口可以同时使用在同一个地方,按照不同优先级顺序执行.用户可以任意基于协处理器实现复杂的HBase功能层。HBase有很多种事件可以触发观察者方法,这些事件与方法从HBase0.92版本起,都会集成在HBase API中。不过这些API可能会由于各种原因有所改动,不同版本的接口改动比较大。
三、 HBase协处理器同步数据到Solr
  实时更新数据需要获取到HBase的插入、更新和删除操作。由于HBase中的插入和更新都是对应RegionServer的Put操作,因此我们需要使用RegionObserver中的"postPut"和"postDelete函数"。至于Truncate操作则需要使用MasterObserver。
  
我们需要做的就是拦截put和delete操作,将里面的内容获取出来,写入Solr。 对应的协处理器代码如下:
  

  


  • package com.bqjr.bigdata.HBaseObserver.server;

  • import com.bqjr.bigdata.HBaseObserver.entity.SolrServerManager;
  • import org.apache.hadoop.hbase.CellUtil;
  • import org.apache.hadoop.hbase.client.Delete;
  • import org.apache.hadoop.hbase.client.Durability;
  • import org.apache.hadoop.hbase.client.Put;
  • import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
  • import org.apache.hadoop.hbase.coprocessor.ObserverContext;
  • import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
  • import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
  • import org.apache.hadoop.hbase.util.Bytes;
  • import org.apache.solr.client.solrj.SolrServerException;
  • import org.apache.solr.common.SolrInputDocument;
  • import java.io.IOException;
  • /**
  • * Created by hp on 2017-02-15. */
  • public>
  •     String[] columns = {"test_age","test_name"};
  •   String collection = "bqjr";
  •   SolrServerManager solrManager = new SolrServerManager(collection);

  •   @Override
  •   public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
  •   Put put, WALEdit edit, Durability durability) throws IOException {
  •         String rowkey= Bytes.toString(put.getRow());

  •   SolrInputDocument doc = new SolrInputDocument();
  • for(String column : columns){
  • if(put.has(Bytes.toBytes("cf1"),Bytes.toBytes(column))){
  • doc.addField(column,Bytes.toString(CellUtil.cloneValue(put.get(Bytes.toBytes("cf1"),Bytes.toBytes(column)).get(0))));
  • }
  •   }
  •         try {
  •             solrManager.addDocToCache(doc);
  •   } catch (SolrServerException e1) {
  •             e1.printStackTrace();
  •   }

  •     }

  •     @Override
  •   public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e,
  •   Delete delete,
  •   WALEdit edit,
  •   Durability durability) throws IOException{
  •         String rowkey= Bytes.toString(delete.getRow());
  • try {
  •             solrManager.delete(rowkey);
  •   } catch (SolrServerException e1) {
  •             e1.printStackTrace();
  •   }
  •     }

  • }
  

  

  大体的写入流程我们已经完成了,接下来就是Solr的写入实现了。由于Solr需要使用Zookeeper等信息,我们可以直接通过HBase的conf中获取Zookeeper相关信息来构造所需要的SolrCloudServer。
  
另一方面,我们不能来了一条数据就马上写入,这样非常消耗资源。因此我们需要做一个缓存,将这些Solr数据暂时保存在里面,定时 + 定量的发送。代码如下
  

  


  • package com.bqjr.bigdata.HBaseObserver.entity;

  • import org.apache.hadoop.conf.Configuration;
  • import org.apache.hadoop.hbase.HBaseConfiguration;
  • import org.apache.solr.client.solrj.SolrServerException;
  • import org.apache.solr.client.solrj.impl.CloudSolrServer;
  • import org.apache.solr.client.solrj.response.UpdateResponse;
  • import org.apache.solr.common.SolrInputDocument;

  • import java.io.IOException;
  • import java.util.*;

  • /**
  • * Created by hp on 2017-02-15. */public>
  •     public static String ZKHost = "";
  • public static String ZKPort = "";
  • int zkClientTimeout = 1800000;// 心跳
  •   int zkConnectTimeout = 1800000;// 连接时间
  •   CloudSolrServer solrServer;
  • private static String defaultCollection;
  • int maxCache = 10000;
  • public static List<SolrInputDocument> cache = new LinkedList<SolrInputDocument>();
  • private static int maxCommitTime = 60; //最大提交时间,s

  •   public SolrServerManager(String collection) {
  •         defaultCollection = collection;
  •   Configuration conf = HBaseConfiguration.create();
  •   ZKHost = conf.get("hbase.zookeeper.quorum", "bqdpm1,bqdpm2,bqdps2");
  •   ZKPort = conf.get("hbase.zookeeper.property.clientPort", "2181");
  •   String SolrUrl = ZKHost + ":" + ZKPort + "/" + "solr";
  •   solrServer = new CloudSolrServer(SolrUrl);
  •   solrServer.setDefaultCollection(defaultCollection);
  •   solrServer.setZkClientTimeout(zkClientTimeout);
  •   solrServer.setZkConnectTimeout(zkConnectTimeout);
  •   //启动定时任务,第一次延迟10执行,之后每隔指定时间执行一次
  •   Timer timer = new Timer();
  •   timer.schedule(new CommitTimer(), 10 * 1000L, maxCommitTime * 1000L);
  •   }

  •     public UpdateResponse put(SolrInputDocument doc) throws IOException, SolrServerException {
  •         solrServer.add(doc);
  • return solrServer.commit(false, false);
  •   }

  •     public UpdateResponse put(List<SolrInputDocument> docs) throws IOException, SolrServerException {
  •         solrServer.add(docs);
  • return solrServer.commit(false, false);
  •   }

  •     public void addDocToCache(SolrInputDocument doc) throws IOException, SolrServerException {
  •         synchronized (cache) {
  •             cache.add(doc);
  • if (cache.size() >= maxCache) {
  •                 this.put(cache);
  •   cache.clear();
  •   }
  •         }
  •     }

  •     public UpdateResponse delete(String rowkey) throws IOException, SolrServerException {
  •         solrServer.deleteById(rowkey);
  • return solrServer.commit(false, false);
  •   }

  •     /**
  • * 提交定时器 */  static>
  •         @Override
  •   public void run() {
  •             synchronized (cache) {
  •                 try {
  •                     new SolrServerManager(defaultCollection).put(cache);
  •   cache.clear();
  •   } catch (IOException e) {
  •                     e.printStackTrace();
  •   } catch (SolrServerException e) {
  •                     e.printStackTrace();
  •   }
  •                 cache.clear();
  •   }
  •         }
  •     }
  • }
  

  

四、 添加协处理器
DSC0000.png

  

  


  • #先禁用这张表
  • disable 'HBASE_OBSERVER_TEST'
  • #为这张表添加协处理器,设置的参数具体为: jar文件路径|类名|优先级(SYSTEM或者USER)
  • alter 'HBASE_OBSERVER_TEST','coprocessor'=>'hdfs://bqdpm1:8020/ext_lib/HBaseObserver-1.0.0.jar|com.bqjr.bigdata.HBaseObserver.server.HBaseIndexerToSolrObserver||'
  • #启用这张表
  • enable 'HBASE_OBSERVER_TEST'
  • #删除某个协处理器,"$<bumber>"后面跟的ID号与desc里面的ID号相同
  • alter 'HBASE_OBSERVER_TEST',METHOD=>'table_att_unset',NAME => 'coprocessor$1'
  

  

五、 测试
  尝试插入一条数据put 'HBASE_OBSERVER_TEST','001','cf1:test_age','18'
  
结果Solr中一条数据都没有
  
DSC0001.png
  然后查看了regionserver的日志发现,没有找到SolrJ的类
  
DSC0002.png
  然后我们将所有的依赖加到Jar包里面之后,再次运行。就可以看到数据了。
  
DSC0003.png
  测试Delete功能
  
DSC0004.png
  测试进行到这里就完了吗?当然不是
  
我们尝试再插入一条put 'HBASE_OBSERVER_TEST','001','cf1:test_name','Bob'
  理论上我们需要在Solr中看到 test_age = 18,test_name = Bob。
  
但是在Solr中只有一条数据
  
DSC0005.png
  于是我们需要使用到Solr的原子更新功能。将postPut改成下面这样的代码即可
  

  


  • public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
  •   Put put, WALEdit edit, Durability durability) throws IOException {
  •         String rowkey= Bytes.toString(put.getRow());
  •   Long version = 1L;
  •   SolrInputDocument doc = new SolrInputDocument();
  • for(String column : columns){
  •             if(put.has(Bytes.toBytes("cf1"),Bytes.toBytes(column))){
  •                 Cell cell = put.get(Bytes.toBytes("cf1"),Bytes.toBytes(column)).get(0);
  •   Map<String, String > operation = new HashMap<String,String>();
  •   operation.put("set",Bytes.toString(CellUtil.cloneValue(cell)));
  •   doc.setField(column,operation);
  •   }
  •         }
  •         doc.addField("id",rowkey);
  • //        doc.addField("_version_",version);
  •   try {
  •             solrManager.addDocToCache(doc);
  •   } catch (SolrServerException e1) {
  •             e1.printStackTrace();
  •   }

  •     }
  

  

  再次插入数据
  
DSC0006.png
  
查看Solr结果
  
DSC0007.png
六、 协处理器动态加载
  hbase的官方文档指出动态级别的协处理器,可以做到不重启hbase,更新协处理,做法就是  
  
禁用表,卸载协处理器,重新指定协处理器, 激活表,即可,但实际测试发现  
  
动态加载无效,是hbase的一个bug,看这个链接:  
  
https://issues.apache.org/jira/browse/HBASE-8445  
  
因为协处理器,已经被JVM加载,即使删除jar也不能重新load的jar,因为cache里面的hdfs的jar路径,没有变化,所以动态更新无效  
  
,除非重启JVM,那样就意味着,需要重启RegionServer,  
  
里面的小伙伴们指出了两种办法,使协处理器加载生效:  
  
(1)滚动重启regionserver,避免停掉所有的节点  
  
(2)改变协处理器的jar的类名字或者hdfs加载路径,以方便有新的ClassLoad去加载它
  但总体来看,第2种方法,比较安全,第一种风险太大,一般情况下没有人会随便滚动重启线上的服务器的,这只在hbase升级的时候使用

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-425507-1-1.html 上篇帖子: Solr产品化部署 下篇帖子: 搜索引擎solr和elasticsearch
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表