设为首页 收藏本站
查看: 1277|回复: 0

[经验分享] HBase协处理器同步二级索引到Solr(续)

[复制链接]

尚未签到

发表于 2017-12-19 12:09:56 | 显示全部楼层 |阅读模式
一、 已知的问题和不足  二、解决思路
  三、代码
  3.1 读取config文件内容
  3.2 封装SolrServer的获取方式
  3.3 编写提交数据到Solr的代码
  3.4 拦截HBase的Put和Delete操作信息
  四、 使用
一、 已知的问题和不足
  在上一个版本中,实现了使用HBase的协处理器将HBase的二级索引同步到Solr中,但是仍旧有几个缺陷:

  • 写入Solr的Collection是写死在代码里面,且是唯一的。如果我们有一张表的数据希望将不同的字段同步到Solr中该如何做呢?
  • 目前所有配置相关信息都是写死到了代码中的,是否可以添加外部配置文件。
  • 原来的方法是每次都需要编译新的Jar文件单独运行,能否将所有的同步使用一段通用的代码完成?
二、解决思路
  针对上面的三个主要问题,我们一一解决

  • 通常一张表会对应多个SolrCollection以及不同的Column。我们可以使用Map[表名->List[(Collection1,List[Columns]),(Collection2,List[Columns])...]]这样的类型,根据表名获取所有的Collection和Column。
  • 通过Typesafe Config读取外部配置文件,达到所有信息可配的目的。
  • 所有的数据都只有Put和Delete,只要我们拦截到具体的消息之后判断当前的表名,然后根据问题一中的Collection和Column即可写入对应的SolrServer。在协处理器中获取表名的是e.getEnvironment().getRegion().getTableDesc().getTableName().getNameAsString()其中e是ObserverContext
三、代码
3.1 读取config文件内容
  使用typesafe的config组件读取morphlines.conf文件,将内容转换为 Map<String,List<HBaseIndexerMappin>>。具体代码如下
  

  


  • public>
  •     private static SourceConfig sourceConfig = new SourceConfig();
  • public static Config config;
  • static {
  •         sourceConfig.setConfigFiles("morphlines.conf");
  •   config =  sourceConfig.getConfig();
  •   }
  •     public static Map<String,List<HBaseIndexerMappin>> getHBaseIndexerMappin(){
  •         Map<String,List<HBaseIndexerMappin>> mappin = new HashMap<String, List<HBaseIndexerMappin>>();
  •   Config mappinConf = config.getConfig("Mappin");
  •   List<String> tables = mappinConf.getStringList("HBaseTables");
  • for (String table :tables){
  •             List<Config> confList = (List<Config>) mappinConf.getConfigList(table);
  •   List<HBaseIndexerMappin> maps = new LinkedList<HBaseIndexerMappin>();
  • for(Config tmp :confList){
  •                 HBaseIndexerMappin map = new HBaseIndexerMappin();
  •   map.solrConnetion = tmp.getString("SolrCollection");
  •   map.columns = tmp.getStringList("Columns");
  •   maps.add(map);
  •   }
  •             mappin.put(table,maps);
  •   }
  •         return mappin;
  •   }
  • }
  

  

3.2 封装SolrServer的获取方式
  因为目前我使用的环境是Solr和HBase公用的同一套Zookeeper,因此我们完全可以借助HBase的Zookeeper信息。HBase的协处理器是运行在HBase的环境中的,自然可以通过HBase的Configuration获取当前的Zookeeper节点和端口,然后轻松的获取到Solr的地址。
  

  


  • public>
  •     static Configuration conf = HBaseConfiguration.create();
  • public static String ZKHost = conf.get("hbase.zookeeper.quorum","bqdpm1,bqdpm2,bqdps2");
  • public static String ZKPort = conf.get("hbase.zookeeper.property.clientPort","2181");
  • public static String SolrUrl = ZKHost + ":" + ZKPort + "/" + "solr";
  • public static int zkClientTimeout = 1800000;// 心跳
  •   public static int zkConnectTimeout = 1800000;// 连接时间

  •   public static CloudSolrServer create(String defaultCollection){
  •         log.info("Create SolrCloudeServer .This collection is " + defaultCollection);
  •   CloudSolrServer solrServer = new CloudSolrServer(SolrUrl);
  •   solrServer.setDefaultCollection(defaultCollection);
  •   solrServer.setZkClientTimeout(zkClientTimeout);
  •   solrServer.setZkConnectTimeout(zkConnectTimeout);
  • return solrServer;
  •   }
  • }
  

  

3.3 编写提交数据到Solr的代码
  理想状态下,我们时时刻刻都需要提交数据到Solr中,但是事实上我们数据写入的时间是比较分散的,可能集中再每一天的某几个时间点。因此我们必须保证在高并发下能达到一定数据量自动提交,在低并发的情况下能隔一段时间写入一次。只有两种机制并存的情况下才能保证数据能即时写入。
  

  


  • public>
  •     public Map<String,List<SolrInputDocument>> putCache = new HashMap<String, List<SolrInputDocument>>();//Collection名字->更新(插入)操作缓存
  • public Map<String,List<String>> deleteCache = new HashMap<String, List<String>>();//Collection名字->删除操作缓存
  •   Map<String,CloudSolrServer> solrServers = new HashMap<String, CloudSolrServer>();//Collection名字->SolrServers
  • int maxCache =  ConfigManager.config.getInt("MaxCommitSize");
  •   // 任何时候,保证只能有一个线程在提交索引,并清空集合
  •   final static Semaphore semp = new Semaphore(1);
  •   //添加Collection和SolrServer
  • public void addCollecttion(String collection,CloudSolrServer server){
  •         this.solrServers.put(collection,server);
  •   }
  • //往Solr添加(更新)数据
  •     public UpdateResponse put(CloudSolrServer server,SolrInputDocument doc) throws IOException, SolrServerException {
  •         server.add(doc);
  • return server.commit(false, false);
  •   }
  • //往Solr添加(更新)数据
  •     public UpdateResponse put(CloudSolrServer server,List<SolrInputDocument> docs) throws IOException, SolrServerException {
  •         server.add(docs);
  • return server.commit(false, false);
  •   }
  • //根据ID删除Solr数据
  •     public UpdateResponse delete(CloudSolrServer server,String rowkey) throws IOException, SolrServerException {
  •         server.deleteById(rowkey);
  • return server.commit(false, false);
  •   }
  • //根据ID删除Solr数据
  •     public UpdateResponse delete(CloudSolrServer server,List<String> rowkeys) throws IOException, SolrServerException {
  •         server.deleteById(rowkeys);
  • return server.commit(false, false);
  •   }
  • //将doc添加到缓存
  •     public void addPutDocToCache(String collection, SolrInputDocument doc) throws IOException, SolrServerException, InterruptedException {
  •         semp.acquire();
  •   log.debug("addPutDocToCache:" + "collection=" + collection + "data=" + doc.toString());
  • if(!putCache.containsKey(collection)){
  •             List<SolrInputDocument> docs = new LinkedList<SolrInputDocument>();
  •   docs.add(doc);
  •   putCache.put(collection,docs);
  •   }else {
  •             List<SolrInputDocument> cache = putCache.get(collection);
  •   cache.add(doc);
  • if (cache.size() >= maxCache) {
  •                 try {
  •                     this.put(solrServers.get(collection), cache);
  •   } finally {
  •                     putCache.get(collection).clear();
  •   }
  •             }
  •         }
  •         semp.release();//释放信号量
  •   }
  • //添加删除操作到缓存
  •     public void addDeleteIdCache(String collection,String rowkey) throws IOException, SolrServerException, InterruptedException {
  •         semp.acquire();
  •   log.debug("addDeleteIdCache:" + "collection=" + collection + "rowkey=" + rowkey);
  • if(!deleteCache.containsKey(collection)){
  •             List<String> rowkeys = new LinkedList<String>();
  •   rowkeys.add(rowkey);
  •   deleteCache.put(collection,rowkeys);
  •   }else{
  •             List<String> cache = deleteCache.get(collection);
  •   cache.add(rowkey);
  • if (cache.size() >= maxCache) {
  •                 try{
  •                     this.delete(solrServers.get(collection),cache);
  •   }finally {
  •                     putCache.get(collection).clear();
  •   }
  •             }
  •         }
  •         semp.release();//释放信号量
  •   }

  •     @Override
  •   public void run() {
  •         try {
  •             semp.acquire();
  •   log.debug("开始插入....");
  •   Set<String> collections =  solrServers.keySet();
  • for(String collection:collections){
  •                 if(putCache.containsKey(collection) && (!putCache.get(collection).isEmpty()) ){
  •                     this.put(solrServers.get(collection),putCache.get(collection));
  •   putCache.get(collection).clear();
  •   }
  •                 if(deleteCache.containsKey(collection) && (!deleteCache.get(collection).isEmpty())){
  •                     this.delete(solrServers.get(collection),deleteCache.get(collection));
  •   deleteCache.get(collection).clear();
  •   }
  •             }
  •         } catch (InterruptedException e) {
  •             e.printStackTrace();
  •   } catch (Exception e) {
  •             log.error("Commit putCache to Solr error!Because :" + e.getMessage());
  •   }finally {
  •             semp.release();//释放信号量
  •   }
  •     }
  • }
  

  

3.4 拦截HBase的Put和Delete操作信息
  在每个prePut和preDelete中拦截操作信息,记录表名、列名、值。将这些信息根据表名和Collection名进行分类写入缓存。
  

  


  • public>

  •     Map<String,List<HBaseIndexerMappin>> mappins = ConfigManager.getHBaseIndexerMappin();

  •   Timer timer = new Timer();
  • int maxCommitTime = ConfigManager.config.getInt("MaxCommitTime"); //最大提交时间,s
  •   SolrCommitTimer solrCommit = new SolrCommitTimer();
  • public HBaseIndexerToSolrObserver(){
  •         log.info("Initialization HBaseIndexerToSolrObserver ...");
  • for(Map.Entry<String,List<HBaseIndexerMappin>> entry : mappins.entrySet() ){
  •             List<HBaseIndexerMappin> solrmappin = entry.getValue();
  • for(HBaseIndexerMappin map:solrmappin){
  •                 String collection = map.solrConnetion;//获取Collection名字
  •   log.info("Create Solr Server connection .The collection is " + collection);
  •   CloudSolrServer solrserver = SolrServerManager.create(collection);//根据Collection初始化SolrServer连接
  •   solrCommit.addCollecttion(collection,solrserver);
  •   }
  •         }
  •         timer.schedule(solrCommit, 10 * 1000L, maxCommitTime * 1000L);
  •   }

  •     @Override
  •   public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
  •   Put put, WALEdit edit, Durability durability) throws IOException {
  •         String table =  e.getEnvironment().getRegion().getTableDesc().getTableName().getNameAsString();//获取表名
  •   String rowkey= Bytes.toString(put.getRow());//获取主键
  •   SolrInputDocument doc = new SolrInputDocument();
  •   List<HBaseIndexerMappin> mappin = mappins.get(table);
  • for(HBaseIndexerMappin mapp : mappin){
  •             for(String column : mapp.columns){
  •                 String[] tmp = column.split(":");
  •   String cf = tmp[0];
  •   String cq = tmp[1];
  • if(put.has(Bytes.toBytes(cf),Bytes.toBytes(cq))){
  •                     Cell cell = put.get(Bytes.toBytes(cf),Bytes.toBytes(cq)).get(0);//获取制定列的数据
  •   Map<String, String > operation = new HashMap<String,String>();
  •   operation.put("set",Bytes.toString(CellUtil.cloneValue(cell)));
  •   doc.setField(cq,operation);//使用原子更新的方式将HBase二级索引写入Solr
  •   }
  •             }
  •             doc.addField("id",rowkey);
  • try {
  •                 solrCommit.addPutDocToCache(mapp.solrConnetion,doc);//添加doc到缓存
  •   } catch (SolrServerException e1) {
  •                 e1.printStackTrace();
  •   } catch (InterruptedException e1) {
  •                 e1.printStackTrace();
  •   }
  •         }
  •     }

  •     @Override
  •   public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e,
  •   Delete delete,
  •   WALEdit edit,
  •   Durability durability) throws IOException{
  •         String table =  e.getEnvironment().getRegion().getTableDesc().getTableName().getNameAsString();
  •   String rowkey= Bytes.toString(delete.getRow());
  •   List<HBaseIndexerMappin> mappin = mappins.get(table);
  • for(HBaseIndexerMappin mapp : mappin){
  •             try {
  •                 solrCommit.addDeleteIdCache(mapp.solrConnetion,rowkey);//添加删除操作到缓存
  •   } catch (SolrServerException e1) {
  •                 e1.printStackTrace();
  •   } catch (InterruptedException e1) {
  •                 e1.printStackTrace();
  •   }
  •         }

  •     }

  • }
  

  

四、 使用
  首先需要添加morphlines.conf文件。里面包含了需要同步数据到Solr的HBase表名、对应的Solr Collection的名字、要同步的列、多久提交一次、最大批次容量的相关信息。具体配置如下:
  

  


  • #最大提交时间(单位:秒)
  • MaxCommitTime = 30
  • #最大批次容量
  • MaxCommitSize = 10000

  • Mappin {
  •   HBaseTables: ["HBASE_OBSERVER_TEST"] #需要同步的HBase表名
  •   "HBASE_OBSERVER_TEST": [
  •     {
  •       SolrCollection: "bqjr" #Solr Collection名字
  •   Columns: [
  •         "cf1:test_age",   #需要同步的列,格式<列族:列>
  •   "cf1:test_name"
  •   ]
  •     },
  •   ]
  • }
  

  

  该配置文件默认放在各个节点的/etc/hbase/conf/下。如果你希望将配置文件路径修改为其他路径,请修改com.bqjr.bigdata.HBaseObserver.comm.config.SourceConfig类中的configHome路径。
  然后将代码打包,上传到HDFS中,将协处理器添加到对应的表中。
  

  


  • #先禁用这张表
  • disable 'HBASE_OBSERVER_TEST'
  • #为这张表添加协处理器,设置的参数具体为: jar文件路径|类名|优先级(SYSTEM或者USER)
  • alter 'HBASE_OBSERVER_TEST','coprocessor'=>'hdfs://hostname:8020/ext_lib/HBaseObserver-1.0.0.jar|com.bqjr.bigdata.HBaseObserver.server.HBaseIndexerToSolrObserver||'
  • #启用这张表
  • enable 'HBASE_OBSERVER_TEST'
  • #删除某个协处理器,"$<bumber>"后面跟的ID号与desc里面的ID号相同
  • alter 'HBASE_OBSERVER_TEST',METHOD=>'table_att_unset',NAME => 'coprocessor$1'
  

  

  如果需要新增一张表同步到Solr。只需要修改morphlines.conf文件,分发倒各个节点。然后将协处理器添加到HBase表中,这样就不用再次修改代码了。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-425685-1-1.html 上篇帖子: 解决solr搜索多词匹配度和排序方案 下篇帖子: java操作solr实现查询功能
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表