jiang1799 发表于 2017-12-19 12:09:56

HBase协处理器同步二级索引到Solr(续)

一、 已知的问题和不足  二、解决思路
  三、代码
  3.1 读取config文件内容
  3.2 封装SolrServer的获取方式
  3.3 编写提交数据到Solr的代码
  3.4 拦截HBase的Put和Delete操作信息
  四、 使用
一、 已知的问题和不足
  在上一个版本中,实现了使用HBase的协处理器将HBase的二级索引同步到Solr中,但是仍旧有几个缺陷:

[*]写入Solr的Collection是写死在代码里面,且是唯一的。如果我们有一张表的数据希望将不同的字段同步到Solr中该如何做呢?
[*]目前所有配置相关信息都是写死到了代码中的,是否可以添加外部配置文件。
[*]原来的方法是每次都需要编译新的Jar文件单独运行,能否将所有的同步使用一段通用的代码完成?
二、解决思路
  针对上面的三个主要问题,我们一一解决

[*]通常一张表会对应多个SolrCollection以及不同的Column。我们可以使用Map[表名->List[(Collection1,List),(Collection2,List)...]]这样的类型,根据表名获取所有的Collection和Column。
[*]通过Typesafe Config读取外部配置文件,达到所有信息可配的目的。
[*]所有的数据都只有Put和Delete,只要我们拦截到具体的消息之后判断当前的表名,然后根据问题一中的Collection和Column即可写入对应的SolrServer。在协处理器中获取表名的是e.getEnvironment().getRegion().getTableDesc().getTableName().getNameAsString()其中e是ObserverContext
三、代码
3.1 读取config文件内容
  使用typesafe的config组件读取morphlines.conf文件,将内容转换为 Map<String,List<HBaseIndexerMappin>>。具体代码如下
  

  


[*]public>
[*]    private static SourceConfig sourceConfig = new SourceConfig();
[*] public static Config config;
[*] static {
[*]      sourceConfig.setConfigFiles("morphlines.conf");
[*]config =sourceConfig.getConfig();
[*]}
[*]    public static Map<String,List<HBaseIndexerMappin>> getHBaseIndexerMappin(){
[*]      Map<String,List<HBaseIndexerMappin>> mappin = new HashMap<String, List<HBaseIndexerMappin>>();
[*]Config mappinConf = config.getConfig("Mappin");
[*]List<String> tables = mappinConf.getStringList("HBaseTables");
[*] for (String table :tables){
[*]            List<Config> confList = (List<Config>) mappinConf.getConfigList(table);
[*]List<HBaseIndexerMappin> maps = new LinkedList<HBaseIndexerMappin>();
[*] for(Config tmp :confList){
[*]                HBaseIndexerMappin map = new HBaseIndexerMappin();
[*]map.solrConnetion = tmp.getString("SolrCollection");
[*]map.columns = tmp.getStringList("Columns");
[*]maps.add(map);
[*]}
[*]            mappin.put(table,maps);
[*]}
[*]      return mappin;
[*]}
[*]}
  

  

3.2 封装SolrServer的获取方式
  因为目前我使用的环境是Solr和HBase公用的同一套Zookeeper,因此我们完全可以借助HBase的Zookeeper信息。HBase的协处理器是运行在HBase的环境中的,自然可以通过HBase的Configuration获取当前的Zookeeper节点和端口,然后轻松的获取到Solr的地址。
  

  


[*]public>
[*]    static Configuration conf = HBaseConfiguration.create();
[*] public static String ZKHost = conf.get("hbase.zookeeper.quorum","bqdpm1,bqdpm2,bqdps2");
[*] public static String ZKPort = conf.get("hbase.zookeeper.property.clientPort","2181");
[*] public static String SolrUrl = ZKHost + ":" + ZKPort + "/" + "solr";
[*] public static int zkClientTimeout = 1800000;// 心跳
[*]public static int zkConnectTimeout = 1800000;// 连接时间
[*]
[*]public static CloudSolrServer create(String defaultCollection){
[*]      log.info("Create SolrCloudeServer .This collection is " + defaultCollection);
[*]CloudSolrServer solrServer = new CloudSolrServer(SolrUrl);
[*]solrServer.setDefaultCollection(defaultCollection);
[*]solrServer.setZkClientTimeout(zkClientTimeout);
[*]solrServer.setZkConnectTimeout(zkConnectTimeout);
[*] return solrServer;
[*]}
[*]}
  

  

3.3 编写提交数据到Solr的代码
  理想状态下,我们时时刻刻都需要提交数据到Solr中,但是事实上我们数据写入的时间是比较分散的,可能集中再每一天的某几个时间点。因此我们必须保证在高并发下能达到一定数据量自动提交,在低并发的情况下能隔一段时间写入一次。只有两种机制并存的情况下才能保证数据能即时写入。
  

  


[*]public>
[*]    public Map<String,List<SolrInputDocument>> putCache = new HashMap<String, List<SolrInputDocument>>();//Collection名字->更新(插入)操作缓存
[*] public Map<String,List<String>> deleteCache = new HashMap<String, List<String>>();//Collection名字->删除操作缓存
[*]Map<String,CloudSolrServer> solrServers = new HashMap<String, CloudSolrServer>();//Collection名字->SolrServers
[*] int maxCache =ConfigManager.config.getInt("MaxCommitSize");
[*]// 任何时候,保证只能有一个线程在提交索引,并清空集合
[*]final static Semaphore semp = new Semaphore(1);
[*]//添加Collection和SolrServer
[*] public void addCollecttion(String collection,CloudSolrServer server){
[*]      this.solrServers.put(collection,server);
[*]}
[*]//往Solr添加(更新)数据
[*]    public UpdateResponse put(CloudSolrServer server,SolrInputDocument doc) throws IOException, SolrServerException {
[*]      server.add(doc);
[*] return server.commit(false, false);
[*]}
[*]//往Solr添加(更新)数据
[*]    public UpdateResponse put(CloudSolrServer server,List<SolrInputDocument> docs) throws IOException, SolrServerException {
[*]      server.add(docs);
[*] return server.commit(false, false);
[*]}
[*]//根据ID删除Solr数据
[*]    public UpdateResponse delete(CloudSolrServer server,String rowkey) throws IOException, SolrServerException {
[*]      server.deleteById(rowkey);
[*] return server.commit(false, false);
[*]}
[*]//根据ID删除Solr数据
[*]    public UpdateResponse delete(CloudSolrServer server,List<String> rowkeys) throws IOException, SolrServerException {
[*]      server.deleteById(rowkeys);
[*] return server.commit(false, false);
[*]}
[*]//将doc添加到缓存
[*]    public void addPutDocToCache(String collection, SolrInputDocument doc) throws IOException, SolrServerException, InterruptedException {
[*]      semp.acquire();
[*]log.debug("addPutDocToCache:" + "collection=" + collection + "data=" + doc.toString());
[*] if(!putCache.containsKey(collection)){
[*]            List<SolrInputDocument> docs = new LinkedList<SolrInputDocument>();
[*]docs.add(doc);
[*]putCache.put(collection,docs);
[*]}else {
[*]            List<SolrInputDocument> cache = putCache.get(collection);
[*]cache.add(doc);
[*] if (cache.size() >= maxCache) {
[*]                try {
[*]                  this.put(solrServers.get(collection), cache);
[*]} finally {
[*]                  putCache.get(collection).clear();
[*]}
[*]            }
[*]      }
[*]      semp.release();//释放信号量
[*]}
[*]//添加删除操作到缓存
[*]    public void addDeleteIdCache(String collection,String rowkey) throws IOException, SolrServerException, InterruptedException {
[*]      semp.acquire();
[*]log.debug("addDeleteIdCache:" + "collection=" + collection + "rowkey=" + rowkey);
[*] if(!deleteCache.containsKey(collection)){
[*]            List<String> rowkeys = new LinkedList<String>();
[*]rowkeys.add(rowkey);
[*]deleteCache.put(collection,rowkeys);
[*]}else{
[*]            List<String> cache = deleteCache.get(collection);
[*]cache.add(rowkey);
[*] if (cache.size() >= maxCache) {
[*]                try{
[*]                  this.delete(solrServers.get(collection),cache);
[*]}finally {
[*]                  putCache.get(collection).clear();
[*]}
[*]            }
[*]      }
[*]      semp.release();//释放信号量
[*]}
[*]
[*]    @Override
[*]public void run() {
[*]      try {
[*]            semp.acquire();
[*]log.debug("开始插入....");
[*]Set<String> collections =solrServers.keySet();
[*] for(String collection:collections){
[*]                if(putCache.containsKey(collection) && (!putCache.get(collection).isEmpty()) ){
[*]                  this.put(solrServers.get(collection),putCache.get(collection));
[*]putCache.get(collection).clear();
[*]}
[*]                if(deleteCache.containsKey(collection) && (!deleteCache.get(collection).isEmpty())){
[*]                  this.delete(solrServers.get(collection),deleteCache.get(collection));
[*]deleteCache.get(collection).clear();
[*]}
[*]            }
[*]      } catch (InterruptedException e) {
[*]            e.printStackTrace();
[*]} catch (Exception e) {
[*]            log.error("Commit putCache to Solr error!Because :" + e.getMessage());
[*]}finally {
[*]            semp.release();//释放信号量
[*]}
[*]    }
[*]}
  

  

3.4 拦截HBase的Put和Delete操作信息
  在每个prePut和preDelete中拦截操作信息,记录表名、列名、值。将这些信息根据表名和Collection名进行分类写入缓存。
  

  


[*]public>
[*]
[*]    Map<String,List<HBaseIndexerMappin>> mappins = ConfigManager.getHBaseIndexerMappin();
[*]
[*]Timer timer = new Timer();
[*] int maxCommitTime = ConfigManager.config.getInt("MaxCommitTime"); //最大提交时间,s
[*]SolrCommitTimer solrCommit = new SolrCommitTimer();
[*] public HBaseIndexerToSolrObserver(){
[*]      log.info("Initialization HBaseIndexerToSolrObserver ...");
[*] for(Map.Entry<String,List<HBaseIndexerMappin>> entry : mappins.entrySet() ){
[*]            List<HBaseIndexerMappin> solrmappin = entry.getValue();
[*] for(HBaseIndexerMappin map:solrmappin){
[*]                String collection = map.solrConnetion;//获取Collection名字
[*]log.info("Create Solr Server connection .The collection is " + collection);
[*]CloudSolrServer solrserver = SolrServerManager.create(collection);//根据Collection初始化SolrServer连接
[*]solrCommit.addCollecttion(collection,solrserver);
[*]}
[*]      }
[*]      timer.schedule(solrCommit, 10 * 1000L, maxCommitTime * 1000L);
[*]}
[*]
[*]    @Override
[*]public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
[*]Put put, WALEdit edit, Durability durability) throws IOException {
[*]      String table =e.getEnvironment().getRegion().getTableDesc().getTableName().getNameAsString();//获取表名
[*]String rowkey= Bytes.toString(put.getRow());//获取主键
[*]SolrInputDocument doc = new SolrInputDocument();
[*]List<HBaseIndexerMappin> mappin = mappins.get(table);
[*] for(HBaseIndexerMappin mapp : mappin){
[*]            for(String column : mapp.columns){
[*]                String[] tmp = column.split(":");
[*]String cf = tmp;
[*]String cq = tmp;
[*] if(put.has(Bytes.toBytes(cf),Bytes.toBytes(cq))){
[*]                  Cell cell = put.get(Bytes.toBytes(cf),Bytes.toBytes(cq)).get(0);//获取制定列的数据
[*]Map<String, String > operation = new HashMap<String,String>();
[*]operation.put("set",Bytes.toString(CellUtil.cloneValue(cell)));
[*]doc.setField(cq,operation);//使用原子更新的方式将HBase二级索引写入Solr
[*]}
[*]            }
[*]            doc.addField("id",rowkey);
[*] try {
[*]                solrCommit.addPutDocToCache(mapp.solrConnetion,doc);//添加doc到缓存
[*]} catch (SolrServerException e1) {
[*]                e1.printStackTrace();
[*]} catch (InterruptedException e1) {
[*]                e1.printStackTrace();
[*]}
[*]      }
[*]    }
[*]
[*]    @Override
[*]public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e,
[*]Delete delete,
[*]WALEdit edit,
[*]Durability durability) throws IOException{
[*]      String table =e.getEnvironment().getRegion().getTableDesc().getTableName().getNameAsString();
[*]String rowkey= Bytes.toString(delete.getRow());
[*]List<HBaseIndexerMappin> mappin = mappins.get(table);
[*] for(HBaseIndexerMappin mapp : mappin){
[*]            try {
[*]                solrCommit.addDeleteIdCache(mapp.solrConnetion,rowkey);//添加删除操作到缓存
[*]} catch (SolrServerException e1) {
[*]                e1.printStackTrace();
[*]} catch (InterruptedException e1) {
[*]                e1.printStackTrace();
[*]}
[*]      }
[*]
[*]    }
[*]
[*]}
  

  

四、 使用
  首先需要添加morphlines.conf文件。里面包含了需要同步数据到Solr的HBase表名、对应的Solr Collection的名字、要同步的列、多久提交一次、最大批次容量的相关信息。具体配置如下:
  

  


[*]#最大提交时间(单位:秒)
[*]MaxCommitTime = 30
[*]#最大批次容量
[*]MaxCommitSize = 10000
[*]
[*]Mappin {
[*]HBaseTables: ["HBASE_OBSERVER_TEST"] #需要同步的HBase表名
[*]"HBASE_OBSERVER_TEST": [
[*]    {
[*]      SolrCollection: "bqjr" #Solr Collection名字
[*]Columns: [
[*]      "cf1:test_age",   #需要同步的列,格式<列族:列>
[*]"cf1:test_name"
[*]]
[*]    },
[*]]
[*]}
  

  

  该配置文件默认放在各个节点的/etc/hbase/conf/下。如果你希望将配置文件路径修改为其他路径,请修改com.bqjr.bigdata.HBaseObserver.comm.config.SourceConfig类中的configHome路径。
  然后将代码打包,上传到HDFS中,将协处理器添加到对应的表中。
  

  


[*]#先禁用这张表
[*]disable 'HBASE_OBSERVER_TEST'
[*]#为这张表添加协处理器,设置的参数具体为: jar文件路径|类名|优先级(SYSTEM或者USER)
[*]alter 'HBASE_OBSERVER_TEST','coprocessor'=>'hdfs://hostname:8020/ext_lib/HBaseObserver-1.0.0.jar|com.bqjr.bigdata.HBaseObserver.server.HBaseIndexerToSolrObserver||'
[*]#启用这张表
[*]enable 'HBASE_OBSERVER_TEST'
[*]#删除某个协处理器,"$<bumber>"后面跟的ID号与desc里面的ID号相同
[*]alter 'HBASE_OBSERVER_TEST',METHOD=>'table_att_unset',NAME => 'coprocessor$1'
  

  

  如果需要新增一张表同步到Solr。只需要修改morphlines.conf文件,分发倒各个节点。然后将协处理器添加到HBase表中,这样就不用再次修改代码了。
页: [1]
查看完整版本: HBase协处理器同步二级索引到Solr(续)