设为首页 收藏本站
查看: 1087|回复: 0

[经验分享] 玩转大数据系列之Apache Pig如何与Apache Solr集成(二)

[复制链接]

尚未签到

发表于 2015-7-18 08:01:12 | 显示全部楼层 |阅读模式
  散仙,在上篇文章中介绍了,如何使用Apache Pig与Lucene集成,还不知道的道友们,可以先看下上篇,熟悉下具体的流程。
在与Lucene集成过程中,我们发现最终还要把生成的Lucene索引,拷贝至本地磁盘,才能提供检索服务,这样以来,比较繁琐,而且有以下几个缺点:
(一)在生成索引以及最终能提供正常的服务之前,索引经过多次落地操作,这无疑会给磁盘和网络IO,带来巨大影响
(二)Lucene的Field的配置与其UDF函数的代码耦合性过强,而且提供的配置也比较简单,不太容易满足,灵活多变的检索需求和服务,如果改动索引配置,则有可能需要重新编译源码。
(三)对Hadoop的分布式存储系统HDFS依赖过强,如果使用与Lucene集成,那么则意味着你提供检索的Web服务器,则必须跟hadoop的存储节点在一个机器上,否则,无法从HDFS上下拉索引,除非你自己写程序,或使用scp再次从目标机传输,这样无疑又增加了,系统的复杂性。

鉴于有以上几个缺点,所以建议大家使用Solr或ElasticSearch这样的封装了Lucene更高级的API框架,那么Solr与ElasticSearch和Lucene相比,又有什么优点呢?
(1)在最终的写入数据时,我们可以直接最终结果写入solr或es,同时也可以在HDFS上保存一份,作为灾备。
(2)使用了solr或es,这时,我们字段的配置完全与UDF函数代码无关,我们的任何字段配置的变动,都不会影响Pig的UDF函数的代码,而在UDF函数里,唯一要做的,就是将最终数据,提供给solr和es服务。
(3)solr和es都提供了restful风格的http操作方式,这时候,我们的检索集群完全可以与Hadoop集群分离,从而让他们各自都专注自己的服务。

下面,散仙就具体说下如何使用Pig和Solr集成?
(1)依旧访问这个地址下载源码压缩包。
(2)提取出自己想要的部分,在eclipse工程中,修改定制适合自己环境的的代码(Solr版本是否兼容?hadoop版本是否兼容?,Pig版本是否兼容?)。
(3)使用ant重新打包成jar
(4)在pig里,注册相关依赖的jar包,并使用索引存储

注意,在github下载的压缩里直接提供了对SolrCloud模式的提供,而没有提供,普通模式的函数,散仙在这里稍作修改后,可以支持普通模式的Solr服务,代码如下:

SolrOutputFormat函数




Java代码   DSC0000.png

  • package com.pig.support.solr;  



  • import java.io.IOException;  
  • import java.util.ArrayList;  
  • import java.util.List;  
  • import java.util.concurrent.Executors;  
  • import java.util.concurrent.ScheduledExecutorService;  
  • import java.util.concurrent.TimeUnit;  

  • import org.apache.hadoop.io.Writable;  
  • import org.apache.hadoop.mapreduce.JobContext;  
  • import org.apache.hadoop.mapreduce.OutputCommitter;  
  • import org.apache.hadoop.mapreduce.RecordWriter;  
  • import org.apache.hadoop.mapreduce.TaskAttemptContext;  
  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  • import org.apache.solr.client.solrj.SolrServer;  
  • import org.apache.solr.client.solrj.SolrServerException;  
  • import org.apache.solr.client.solrj.impl.CloudSolrServer;  
  • import org.apache.solr.client.solrj.impl.HttpSolrServer;  
  • import org.apache.solr.common.SolrInputDocument;  
  • /**
  • * @author qindongliang
  • * 支持SOlr的SolrOutputFormat
  • * 如果你想了解,或学习更多这方面的
  • * 知识,请加入我们的群:
  • *  
  • * 搜索技术交流群(2000人):324714439  
  • * 大数据技术1号交流群(2000人):376932160  (已满)
  • * 大数据技术2号交流群(2000人):415886155  
  • * 微信公众号:我是攻城师(woshigcs)
  • *  
  • * */  
  • public class SolrOutputFormat extends  
  •         FileOutputFormat {

  •     final String address;  
  •     final String collection;  

  •     public SolrOutputFormat(String address, String collection) {  
  •         this.address = address;  
  •         this.collection = collection;  
  •     }

  •     @Override  
  •     public RecordWriter getRecordWriter(  
  •             TaskAttemptContext ctx) throws IOException, InterruptedException {  
  •         return new SolrRecordWriter(ctx, address, collection);  
  •     }


  •     @Override  
  •     public synchronized OutputCommitter getOutputCommitter(  
  •             TaskAttemptContext arg0) throws IOException {  
  •         return new OutputCommitter(){  

  •             @Override  
  •             public void abortTask(TaskAttemptContext ctx) throws IOException {  

  •             }

  •             @Override  
  •             public void commitTask(TaskAttemptContext ctx) throws IOException {  

  •             }

  •             @Override  
  •             public boolean needsTaskCommit(TaskAttemptContext arg0)  
  •                     throws IOException {  
  •                 return true;  
  •             }

  •             @Override  
  •             public void setupJob(JobContext ctx) throws IOException {  

  •             }

  •             @Override  
  •             public void setupTask(TaskAttemptContext ctx) throws IOException {  

  •             }


  •         };
  •     }


  •     /**
  •      * Write out the LuceneIndex to a local temporary location.
  •      * On commit/close the index is copied to the hdfs output directory.
  •      *  
  •      */  
  •     static class SolrRecordWriter extends RecordWriter {  
  •         /**Solr的地址*/  
  •         SolrServer server;
  •         /**批处理提交的数量**/  
  •         int batch = 5000;  

  •         TaskAttemptContext ctx;

  •         List docs = new ArrayList(batch);  
  •         ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
  •         /**
  •          * Opens and forces connect to CloudSolrServer
  •          *  
  •          * @param address
  •          */  
  •         public SolrRecordWriter(final TaskAttemptContext ctx, String address, String collection) {  
  •             try {  
  •                 this.ctx = ctx;  
  •                 server = new HttpSolrServer(address);  

  •                 exec.scheduleWithFixedDelay(new Runnable(){  
  •                     public void run(){  
  •                         ctx.progress();
  •                     }
  •                 }, 1000, 1000, TimeUnit.MILLISECONDS);  
  •             } catch (Exception e) {  
  •                 RuntimeException exc = new RuntimeException(e.toString(), e);  
  •                 exc.setStackTrace(e.getStackTrace());
  •                 throw exc;  
  •             }
  •         }


  •         /**
  •          * On close we commit
  •          */  
  •         @Override  
  •         public void close(final TaskAttemptContext ctx) throws IOException,  
  •                 InterruptedException {

  •             try {  

  •                 if (docs.size() > 0) {  
  •                     server.add(docs);
  •                     docs.clear();
  •                 }

  •                 server.commit();
  •             } catch (SolrServerException e) {  
  •                 RuntimeException exc = new RuntimeException(e.toString(), e);  
  •                 exc.setStackTrace(e.getStackTrace());
  •                 throw exc;  
  •             } finally {  
  •                 server.shutdown();
  •                 exec.shutdownNow();
  •             }

  •         }

  •         /**
  •          * We add the indexed documents without commit
  •          */  
  •         @Override  
  •         public void write(Writable key, SolrInputDocument doc)  
  •                 throws IOException, InterruptedException {  
  •             try {  
  •                 docs.add(doc);
  •                 if (docs.size() >= batch) {  
  •                     server.add(docs);
  •                     docs.clear();
  •                 }
  •             } catch (SolrServerException e) {  
  •                 RuntimeException exc = new RuntimeException(e.toString(), e);  
  •                 exc.setStackTrace(e.getStackTrace());
  •                 throw exc;  
  •             }
  •         }

  •     }
  • }
  

SolrStore函数




Java代码  

  • package com.pig.support.solr;  



  • import java.io.IOException;  
  • import java.util.Properties;  

  • import org.apache.hadoop.fs.Path;  
  • import org.apache.hadoop.io.Writable;  
  • import org.apache.hadoop.mapreduce.Job;  
  • import org.apache.hadoop.mapreduce.OutputFormat;  
  • import org.apache.hadoop.mapreduce.RecordWriter;  
  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  • import org.apache.pig.ResourceSchema;  
  • import org.apache.pig.ResourceSchema.ResourceFieldSchema;  
  • import org.apache.pig.ResourceStatistics;  
  • import org.apache.pig.StoreFunc;  
  • import org.apache.pig.StoreMetadata;  
  • import org.apache.pig.data.Tuple;  
  • import org.apache.pig.impl.util.UDFContext;  
  • import org.apache.pig.impl.util.Utils;  
  • import org.apache.solr.common.SolrInputDocument;  

  • /**
  • *  
  • * Create a lucene index
  • *  
  • */  
  • public class SolrStore extends StoreFunc implements StoreMetadata {  

  •     private static final String SCHEMA_SIGNATURE = "solr.output.schema";  

  •     ResourceSchema schema;
  •     String udfSignature;
  •     RecordWriter writer;

  •     String address;
  •     String collection;

  •     public SolrStore(String address, String collection) {  
  •         this.address = address;  
  •         this.collection = collection;  
  •     }

  •     public void storeStatistics(ResourceStatistics stats, String location,  
  •             Job job) throws IOException {  
  •     }

  •     public void storeSchema(ResourceSchema schema, String location, Job job)  
  •             throws IOException {  
  •     }

  •     @Override  
  •     public void checkSchema(ResourceSchema s) throws IOException {  
  •         UDFContext udfc = UDFContext.getUDFContext();
  •         Properties p = udfc.getUDFProperties(this.getClass(),  
  •                 new String[] { udfSignature });  
  •         p.setProperty(SCHEMA_SIGNATURE, s.toString());
  •     }

  •     public OutputFormat getOutputFormat()  
  •             throws IOException {  
  •         // not be used  
  •         return new SolrOutputFormat(address, collection);  
  •     }

  •     /**
  •      * Not used
  •      */  
  •     @Override  
  •     public void setStoreLocation(String location, Job job) throws IOException {  
  •         FileOutputFormat.setOutputPath(job, new Path(location));  
  •     }

  •     @Override  
  •     public void setStoreFuncUDFContextSignature(String signature) {  
  •         this.udfSignature = signature;  
  •     }

  •     @SuppressWarnings({ "unchecked", "rawtypes" })  
  •     @Override  
  •     public void prepareToWrite(RecordWriter writer) throws IOException {  
  •         this.writer = writer;  
  •         UDFContext udc = UDFContext.getUDFContext();
  •         String schemaStr = udc.getUDFProperties(this.getClass(),  
  •                 new String[] { udfSignature }).getProperty(SCHEMA_SIGNATURE);  

  •         if (schemaStr == null) {  
  •             throw new RuntimeException("Could not find udf signature");  
  •         }

  •         schema = new ResourceSchema(Utils.getSchemaFromString(schemaStr));  

  •     }

  •     /**
  •      * Shamelessly copied from : https://issues.apache.org/jira/secure/attachment/12484764/NUTCH-1016-2.0.patch
  •      * @param input
  •      * @return
  •      */  
  •     private static String stripNonCharCodepoints(String input) {  
  •         StringBuilder retval = new StringBuilder(input.length());  
  •         char ch;  

  •         for (int i = 0; i < input.length(); i++) {  
  •             ch = input.charAt(i);

  •             // Strip all non-characters  
  •             // http://unicode.org/cldr/utility/list-unicodeset.jsp?a=[:Noncharacter_Code_Point=True:]  
  •             // and non-printable control characters except tabulator, new line  
  •             // and carriage return  
  •             if (ch % 0x10000 != 0xffff && // 0xffff - 0x10ffff range step  
  •                                             // 0x10000  
  •                     ch % 0x10000 != 0xfffe && // 0xfffe - 0x10fffe range  
  •                     (ch = 0xfdef) && // 0xfdd0 - 0xfdef  
  •                     (ch > 0x1F || ch == 0x9 || ch == 0xa || ch == 0xd)) {  

  •                 retval.append(ch);
  •             }
  •         }

  •         return retval.toString();  
  •     }

  •     @Override  
  •     public void putNext(Tuple t) throws IOException {  

  •         final SolrInputDocument doc = new SolrInputDocument();  

  •         final ResourceFieldSchema[] fields = schema.getFields();  
  •         int docfields = 0;  

  •         for (int i = 0; i < fields.length; i++) {  
  •             final Object value = t.get(i);  

  •             if (value != null) {  
  •                 docfields++;
  •                 doc.addField(fields.getName().trim(), stripNonCharCodepoints(value.toString()));
  •             }

  •         }

  •         try {  
  •             if (docfields > 0)  
  •                 writer.write(null, doc);  
  •         } catch (InterruptedException e) {  
  •             Thread.currentThread().interrupt();
  •             return;  
  •         }

  •     }

  • }
  
Pig脚本如下:



Java代码  

  • --注册依赖文件的jar包
  • REGISTER ./dependfiles/tools.jar;

  • --注册solr相关的jar包
  • REGISTER  ./solrdependfiles/pigudf.jar;
  • REGISTER  ./solrdependfiles/solr-core-4.10.2.jar;  
  • REGISTER  ./solrdependfiles/solr-solrj-4.10.2.jar;  
  • REGISTER  ./solrdependfiles/httpclient-4.3.1.jar  
  • REGISTER  ./solrdependfiles/httpcore-4.3.jar  
  • REGISTER  ./solrdependfiles/httpmime-4.3.1.jar  
  • REGISTER  ./solrdependfiles/noggit-0.5.jar  


  • --加载HDFS数据,并定义scheaml
  • a = load '/tmp/data' using PigStorage(',') as (sword:chararray,scount:int);  

  • --存储到solr中,并提供solr的ip地址和端口号
  • store d into '/user/search/solrindextemp'  using com.pig.support.solr.SolrStore('http://localhost:8983/solr/collection1','collection1');  
  • ~
  • ~
  • ~
  
配置成功之后,我们就可以运行程序,加载HDFS上数据,经过计算处理之后,并将最终的结果,存储到Solr之中,截图如下:

DSC0001.jpg

成功之后,我们就可以很方便的在solr中进行毫秒级别的操作了,例如各种各样的全文查询,过滤,排序统计等等!
同样的方式,我们也可以将索引存储在ElasticSearch中,关于如何使用Pig和ElasticSearch集成,散仙也会在后面的文章中介绍,敬请期待!

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-87827-1-1.html 上篇帖子: 【Nutch2.3基础教程】集成Nutch/Hadoop/Hbase/Solr构建搜索引擎:安装及运行【集群环境】 下篇帖子: 使用solr搭建全文索引
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表