y23335793 发表于 2018-11-1 12:41:09

玩转大数据系列之Apache Pig如何与Apache Solr集成(二)

  散仙,在上篇文章中介绍了,如何使用Apache Pig与Lucene集成,还不知道的道友们,可以先看下上篇,熟悉下具体的流程。
  在与Lucene集成过程中,我们发现最终还要把生成的Lucene索引,拷贝至本地磁盘,才能提供检索服务,这样以来,比较繁琐,而且有以下几个缺点:
  (一)在生成索引以及最终能提供正常的服务之前,索引经过多次落地操作,这无疑会给磁盘和网络IO,带来巨大影响
  (二)Lucene的Field的配置与其UDF函数的代码耦合性过强,而且提供的配置也比较简单,不太容易满足,灵活多变的检索需求和服务,如果改动索引配置,则有可能需要重新编译源码。
  (三)对Hadoop的分布式存储系统HDFS依赖过强,如果使用与Lucene集成,那么则意味着你提供检索的Web服务器,则必须跟hadoop的存储节点在一个机器上,否则,无法从HDFS上下拉索引,除非你自己写程序,或使用scp再次从目标机传输,这样无疑又增加了,系统的复杂性。
  鉴于有以上几个缺点,所以建议大家使用Solr或ElasticSearch这样的封装了Lucene更高级的API框架,那么Solr与ElasticSearch和Lucene相比,又有什么优点呢?
  (1)在最终的写入数据时,我们可以直接最终结果写入solr或es,同时也可以在HDFS上保存一份,作为灾备。
  (2)使用了solr或es,这时,我们字段的配置完全与UDF函数代码无关,我们的任何字段配置的变动,都不会影响Pig的UDF函数的代码,而在UDF函数里,唯一要做的,就是将最终数据,提供给solr和es服务。
  (3)solr和es都提供了restful风格的http操作方式,这时候,我们的检索集群完全可以与Hadoop集群分离,从而让他们各自都专注自己的服务。
  下面,散仙就具体说下如何使用Pig和Solr集成?
  (1)依旧访问这个地址下载源码压缩包。
  (2)提取出自己想要的部分,在eclipse工程中,修改定制适合自己环境的的代码(Solr版本是否兼容?hadoop版本是否兼容?,Pig版本是否兼容?)。
  (3)使用ant重新打包成jar
  (4)在pig里,注册相关依赖的jar包,并使用索引存储
  注意,在github下载的压缩里直接提供了对SolrCloud模式的提供,而没有提供,普通模式的函数,散仙在这里稍作修改后,可以支持普通模式的Solr服务,代码如下:
  SolrOutputFormat函数
  Java代码

[*]  package com.pig.support.solr;
[*]
[*]
[*]
[*]  import java.io.IOException;
[*]  import java.util.ArrayList;
[*]  import java.util.List;
[*]  import java.util.concurrent.Executors;
[*]  import java.util.concurrent.ScheduledExecutorService;
[*]  import java.util.concurrent.TimeUnit;
[*]
[*]  import org.apache.hadoop.io.Writable;
[*]  import org.apache.hadoop.mapreduce.JobContext;
[*]  import org.apache.hadoop.mapreduce.OutputCommitter;
[*]  import org.apache.hadoop.mapreduce.RecordWriter;
[*]  import org.apache.hadoop.mapreduce.TaskAttemptContext;
[*]  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
[*]  import org.apache.solr.client.solrj.SolrServer;
[*]  import org.apache.solr.client.solrj.SolrServerException;
[*]  import org.apache.solr.client.solrj.impl.CloudSolrServer;
[*]  import org.apache.solr.client.solrj.impl.HttpSolrServer;
[*]  import org.apache.solr.common.SolrInputDocument;
[*]  /**
[*]  * @author qindongliang
[*]  * 支持SOlr的SolrOutputFormat
[*]  * 如果你想了解,或学习更多这方面的
[*]  * 知识,请加入我们的群:
[*]  *
[*]  * 搜索技术交流群(2000人):324714439
[*]  * 大数据技术1号交流群(2000人):376932160(已满)
[*]  * 大数据技术2号交流群(2000人):415886155
[*]  * 微信公众号:我是攻城师(woshigcs)
[*]  *
[*]  * */
[*]  public class SolrOutputFormat extends
[*]  FileOutputFormat {
[*]
[*]  final String address;
[*]  final String collection;
[*]
[*]  public SolrOutputFormat(String address, String collection) {
[*]  this.address = address;
[*]  this.collection = collection;
[*]  }
[*]
[*]  @Override
[*]  public RecordWriter getRecordWriter(
[*]  TaskAttemptContext ctx) throws IOException, InterruptedException {
[*]  return new SolrRecordWriter(ctx, address, collection);
[*]  }
[*]
[*]
[*]  @Override
[*]  public synchronized OutputCommitter getOutputCommitter(
[*]  TaskAttemptContext arg0) throws IOException {
[*]  return new OutputCommitter(){
[*]
[*]  @Override
[*]  public void abortTask(TaskAttemptContext ctx) throws IOException {
[*]
[*]  }
[*]
[*]  @Override
[*]  public void commitTask(TaskAttemptContext ctx) throws IOException {
[*]
[*]  }
[*]
[*]  @Override
[*]  public boolean needsTaskCommit(TaskAttemptContext arg0)
[*]  throws IOException {
[*]  return true;
[*]  }
[*]
[*]  @Override
[*]  public void setupJob(JobContext ctx) throws IOException {
[*]
[*]  }
[*]
[*]  @Override
[*]  public void setupTask(TaskAttemptContext ctx) throws IOException {
[*]
[*]  }
[*]
[*]
[*]  };
[*]  }
[*]
[*]
[*]  /**
[*]  * Write out the LuceneIndex to a local temporary location.
[*]  * On commit/close the index is copied to the hdfs output directory.
[*]  *
[*]  */
[*]  static class SolrRecordWriter extends RecordWriter {
[*]  /**Solr的地址*/
[*]  SolrServer server;
[*]  /**批处理提交的数量**/
[*]  int batch = 5000;
[*]
[*]  TaskAttemptContext ctx;
[*]
[*]  List docs = new ArrayList(batch);
[*]  ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
[*]  /**
[*]  * Opens and forces connect to CloudSolrServer
[*]  *
[*]  * @param address
[*]  */
[*]  public SolrRecordWriter(final TaskAttemptContext ctx, String address, String collection) {
[*]  try {
[*]  this.ctx = ctx;
[*]  server = new HttpSolrServer(address);
[*]
[*]  exec.scheduleWithFixedDelay(new Runnable(){
[*]  public void run(){
[*]  ctx.progress();
[*]  }
[*]  }, 1000, 1000, TimeUnit.MILLISECONDS);
[*]  } catch (Exception e) {
[*]  RuntimeException exc = new RuntimeException(e.toString(), e);
[*]  exc.setStackTrace(e.getStackTrace());
[*]  throw exc;
[*]  }
[*]  }
[*]
[*]
[*]  /**
[*]  * On close we commit
[*]  */
[*]  @Override
[*]  public void close(final TaskAttemptContext ctx) throws IOException,
[*]  InterruptedException {
[*]
[*]  try {
[*]
[*]  if (docs.size() > 0) {
[*]  server.add(docs);
[*]  docs.clear();
[*]  }
[*]
[*]  server.commit();
[*]  } catch (SolrServerException e) {
[*]  RuntimeException exc = new RuntimeException(e.toString(), e);
[*]  exc.setStackTrace(e.getStackTrace());
[*]  throw exc;
[*]  } finally {
[*]  server.shutdown();
[*]  exec.shutdownNow();
[*]  }
[*]
[*]  }
[*]
[*]  /**
[*]  * We add the indexed documents without commit
[*]  */
[*]  @Override
[*]  public void write(Writable key, SolrInputDocument doc)
[*]  throws IOException, InterruptedException {
[*]  try {
[*]  docs.add(doc);
[*]  if (docs.size() >= batch) {
[*]  server.add(docs);
[*]  docs.clear();
[*]  }
[*]  } catch (SolrServerException e) {
[*]  RuntimeException exc = new RuntimeException(e.toString(), e);
[*]  exc.setStackTrace(e.getStackTrace());
[*]  throw exc;
[*]  }
[*]  }
[*]
[*]  }
[*]  }
  SolrStore函数
  Java代码

[*]  package com.pig.support.solr;
[*]
[*]
[*]
[*]  import java.io.IOException;
[*]  import java.util.Properties;
[*]
[*]  import org.apache.hadoop.fs.Path;
[*]  import org.apache.hadoop.io.Writable;
[*]  import org.apache.hadoop.mapreduce.Job;
[*]  import org.apache.hadoop.mapreduce.OutputFormat;
[*]  import org.apache.hadoop.mapreduce.RecordWriter;
[*]  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
[*]  import org.apache.pig.ResourceSchema;
[*]  import org.apache.pig.ResourceSchema.ResourceFieldSchema;
[*]  import org.apache.pig.ResourceStatistics;
[*]  import org.apache.pig.StoreFunc;
[*]  import org.apache.pig.StoreMetadata;
[*]  import org.apache.pig.data.Tuple;
[*]  import org.apache.pig.impl.util.UDFContext;
[*]  import org.apache.pig.impl.util.Utils;
[*]  import org.apache.solr.common.SolrInputDocument;
[*]
[*]  /**
[*]  *
[*]  * Create a lucene index
[*]  *
[*]  */
[*]  public class SolrStore extends StoreFunc implements StoreMetadata {
[*]
[*]  private static final String SCHEMA_SIGNATURE = "solr.output.schema";
[*]
[*]  ResourceSchema schema;
[*]  String udfSignature;
[*]  RecordWriter writer;
[*]
[*]  String address;
[*]  String collection;
[*]
[*]  public SolrStore(String address, String collection) {
[*]  this.address = address;
[*]  this.collection = collection;
[*]  }
[*]
[*]  public void storeStatistics(ResourceStatistics stats, String location,
[*]  Job job) throws IOException {
[*]  }
[*]
[*]  public void storeSchema(ResourceSchema schema, String location, Job job)
[*]  throws IOException {
[*]  }
[*]
[*]  @Override
[*]  public void checkSchema(ResourceSchema s) throws IOException {
[*]  UDFContext udfc = UDFContext.getUDFContext();
[*]  Properties p = udfc.getUDFProperties(this.getClass(),
[*]  new String[] { udfSignature });
[*]  p.setProperty(SCHEMA_SIGNATURE, s.toString());
[*]  }
[*]
[*]  public OutputFormat getOutputFormat()
[*]  throws IOException {
[*]  // not be used
[*]  return new SolrOutputFormat(address, collection);
[*]  }
[*]
[*]  /**
[*]  * Not used
[*]  */
[*]  @Override
[*]  public void setStoreLocation(String location, Job job) throws IOException {
[*]  FileOutputFormat.setOutputPath(job, new Path(location));
[*]  }
[*]
[*]  @Override
[*]  public void setStoreFuncUDFContextSignature(String signature) {
[*]  this.udfSignature = signature;
[*]  }
[*]
[*]  @SuppressWarnings({ "unchecked", "rawtypes" })
[*]  @Override
[*]  public void prepareToWrite(RecordWriter writer) throws IOException {
[*]  this.writer = writer;
[*]  UDFContext udc = UDFContext.getUDFContext();
[*]  String schemaStr = udc.getUDFProperties(this.getClass(),
[*]  new String[] { udfSignature }).getProperty(SCHEMA_SIGNATURE);
[*]
[*]  if (schemaStr == null) {
[*]  throw new RuntimeException("Could not find udf signature");
[*]  }
[*]
[*]  schema = new ResourceSchema(Utils.getSchemaFromString(schemaStr));
[*]
[*]  }
[*]
[*]  /**
[*]  * Shamelessly copied from : https://issues.apache.org/jira/secure/attachment/12484764/NUTCH-1016-2.0.patch
[*]  * @param input
[*]  * @return
[*]  */
[*]  private static String stripNonCharCodepoints(String input) {
[*]  StringBuilder retval = new StringBuilder(input.length());
[*]  char ch;
[*]
[*]  for (int i = 0; i < input.length(); i++) {
[*]  ch = input.charAt(i);
[*]
[*]  // Strip all non-characters
[*]  // http://unicode.org/cldr/utility/list-unicodeset.jsp?a=[:Noncharacter_Code_Point=True:]
[*]  // and non-printable control characters except tabulator, new line
[*]  // and carriage return
[*]  if (ch % 0x10000 != 0xffff && // 0xffff - 0x10ffff range step
[*]  // 0x10000
[*]  ch % 0x10000 != 0xfffe && // 0xfffe - 0x10fffe range
[*]  (ch = 0xfdef) && // 0xfdd0 - 0xfdef
[*]  (ch > 0x1F || ch == 0x9 || ch == 0xa || ch == 0xd)) {
[*]
[*]  retval.append(ch);
[*]  }
[*]  }
[*]
[*]  return retval.toString();
[*]  }
[*]
[*]  @Override
[*]  public void putNext(Tuple t) throws IOException {
[*]
[*]  final SolrInputDocument doc = new SolrInputDocument();
[*]
[*]  final ResourceFieldSchema[] fields = schema.getFields();
[*]  int docfields = 0;
[*]
[*]  for (int i = 0; i < fields.length; i++) {
[*]  final Object value = t.get(i);
[*]
[*]  if (value != null) {
[*]  docfields++;
[*]  doc.addField(fields.getName().trim(), stripNonCharCodepoints(value.toString()));
[*]  }
[*]
[*]  }
[*]
[*]  try {
[*]  if (docfields > 0)
[*]  writer.write(null, doc);
[*]  } catch (InterruptedException e) {
[*]  Thread.currentThread().interrupt();
[*]  return;
[*]  }
[*]
[*]  }
[*]
[*]  }
  Pig脚本如下:
  Java代码

[*]  --注册依赖文件的jar包
[*]  REGISTER ./dependfiles/tools.jar;
[*]
[*]  --注册solr相关的jar包
[*]  REGISTER./solrdependfiles/pigudf.jar;
[*]  REGISTER./solrdependfiles/solr-core-4.10.2.jar;
[*]  REGISTER./solrdependfiles/solr-solrj-4.10.2.jar;
[*]  REGISTER./solrdependfiles/httpclient-4.3.1.jar
[*]  REGISTER./solrdependfiles/httpcore-4.3.jar
[*]  REGISTER./solrdependfiles/httpmime-4.3.1.jar
[*]  REGISTER./solrdependfiles/noggit-0.5.jar
[*]
[*]
[*]  --加载HDFS数据,并定义scheaml
[*]  a = load '/tmp/data' using PigStorage(',') as (sword:chararray,scount:int);
[*]
[*]  --存储到solr中,并提供solr的ip地址和端口号
[*]  store d into '/user/search/solrindextemp'using com.pig.support.solr.SolrStore('http://localhost:8983/solr/collection1','collection1');
[*]  ~
[*]  ~
[*]  ~
  配置成功之后,我们就可以运行程序,加载HDFS上数据,经过计算处理之后,并将最终的结果,存储到Solr之中,截图如下:

  成功之后,我们就可以很方便的在solr中进行毫秒级别的操作了,例如各种各样的全文查询,过滤,排序统计等等!
  同样的方式,我们也可以将索引存储在ElasticSearch中,关于如何使用Pig和ElasticSearch集成,散仙也会在后面的文章中介绍,敬请期待!

页: [1]
查看完整版本: 玩转大数据系列之Apache Pig如何与Apache Solr集成(二)