eecdc 发表于 2015-11-27 17:09:22

重写Flume-NG-morphline-avro-sink

  之前在CSDN中发过一篇关于如何编写flume的morphline-avro-sink的文章(http://blog.csdn.net/zh_yi/article/details/39552441)。发现浏览次数不少,但没有评论。可能说明看后对大家没有什么帮助吧,最近发现之前写的程序在大数据量环境下存在性能瓶颈,通过该sink的event只能达到200条/秒的发送量,而morphline-solr-sink能达到1000条/秒(当然,受到硬件配置的制约)。于是决定重写这个sink,以提升性能。
  本sink开发的依据是flume-ng-morphline-solr-sink,在此基础上将Morphline处理完的Record变成event,通过RpcClient发送出去。
  开发的环境需要将flume-ng的1.5.2编译通过,并正确导入到eclipse中。
  下面开始编程部分。也就是修改morphline solr sink的部分。
  一、找到flume源码所在路径(C:\apache-flume-1.5.2-src\flume-ng-sinks),拷贝一份flume-ng-morphline-solr-sink目录到同级文件夹,将目录名称修改成flume-ng-morphline-avro-sink。
  二、修改flume-ng-morphline-avro-sink目录下的pom.xml文件。修改<artifactId>标签中内容为flume-ng-morphline-avro-sink。修改<name>标签中内容为FlumeNG Morphline Avro Sink。注释掉<properties>标签中的<solr.version>和<solr.expected.version>两个标签内容。因为是原Solr中的内容,这里用不到。
  三、在eclipse中打开flume-ng-sinks/pom.xml文件,在Overview标签页中的Modules点击Add,找到flume-ng-morphline-avro-sink并选中确定,将新建的morphline-avro-sink添加到Modules中。这样在该pom文件的“pom.xml”标签页中就可以看到多了一个flume-ng-morphline-avro-sink。
  <pre lang=&quot;html&quot;line=&quot;1&quot; escaped=&quot;true&quot;>
<modules>
   <module>flume-hdfs-sink</module>
    <module>flume-irc-sink</module>
   <module>flume-ng-hbase-sink</module>
    <module>flume-ng-elasticsearch-sink</module>
    <module>flume-ng-morphline-solr-sink</module>
    <module>flume-ng-morphline-avro-sink</module>
  </modules>
  </pre>
  这样就是我们新建的sink利用Maven管理起来了。
  四、修改部分包/类名
  1、 将src/test/java下org.apache.flume.sink.solr.morphline中的solr替换成avro。
  2、 将src/main/java/下org.apache.flume.sink.solr.morphline中的solr替换成avro.
  3、 将src/test/java/org.apache.flume.sink.solr.morphline下的TestMorphlineSolrSink.java修改成TestMorphlineAvroSink.java
  4、 将src/main/java/org.apache.flume.sink.solr.morphline中的MorphlineSolrSink.java修改成MorphlineAvroSink.java
  五、Flume中包含AvroSink,是通过Rpc连接。需要创建一个RpcClient,然后将flume中接到的eventappend到client中,作为avro event发送出去。这是AvroSink基本的也是最关键的动作。因此,就需要在MorphlineSink中初始化一个RpcClient,并在Handler中处理发送的动作。
  具体实现思路:
  因为处理event的代码是顺序执行的,如果每个event处理完成都append到client发送,会导致Morphline处理Record的效率降低。因此,要将此部分代码进行分离。本次将client发送event的动作采用线程池的方式进行处理。初始化构建一个线程池,线程池中的线程接收morphline处理完成的Record,将Record转换成event后再由client发送。
  六、Morphline处理event的简单流程
  Morphline是一个ETL工具,采用管道式执行的方式将其中的Record进行处理转换,并按照配置的命令逐条执行,直到得到最终结果后进行加载。
  1、 初始化MorphlineSink;构建clientProps,将Rpc服务的主机和端口进行赋值,为构建RpcClient做数据准备。
  2、 启动MorphlineSink,调用MorphlineSink的start()方法。
  a)      初始化RpcClient。client = initializeRpcClient(clientProps);
  b)      构建handler。将初始化好的client赋给handler。
  c)       处理event。调用MorphlineSink的process()方法。因为在Morphline中,event将被转换成Morphline定义的Record。在MorphlineSink的process()方法中调用handler的process(event)接口。
  d)      利用handler处理event。正常将event转换成Record。利用Morphline Command继续处理Record。管道式处理,本命令处理完成之后将继续下一条命令的处理。
  七、修改源代码。
  1、 MorphlineHandler.java
  添加setRpcClient接口:
  <pre lang=&quot;html&quot;line=&quot;1&quot; escaped=&quot;true&quot;>
  public void setRpcClient(RpcClientclient);
  </pre>
  2、 MorphlineHandlerImpl.java
  a)      定义线程池中线程的个数的私有成员变量
  private static final int NUM =3;//线程池中线程个数定义为3,根据实际情况而定。
  定义线程池
  <pre lang=&quot;html&quot;line=&quot;1&quot; escaped=&quot;true&quot;>
  public ExcutorService es =Excutors.newFixedThreadPool(NUM);
  </pre>
  b)      添加Override setRpcClient方法
  <pre lang=&quot;html&quot;line=&quot;1&quot; escaped=&quot;true&quot;>
@Override
    public void setRpcClient(RpcClient client){
      this.client =client;
  }
  </pre>
  c)       构建Collector内部类,其中定义一个队列,用来存储被处理完成的Record。
  <pre lang=&quot;html&quot; line=&quot;1&quot;escaped=&quot;true&quot;>
public static final class Collector implements Command {
       public Collector(){}
       private BlockingQueue<Record>queue = new ArrayBlockingQueue<Record>(10);
       @Override
       public Command getParent() {
         return null;
       }
       public BlockingQueue<Record> getQueue() {
         returnqueue;
       }
       public void setQueue(BlockingQueue<Record> queue) {
         this.queue = queue;
       }
       @Override
       public void notify(Record notification) {
       }
       @Override
       public boolean process(Record record) {
         Preconditions.checkNotNull(record);
         try {
  //接收到Record后,将其压如队列
            queue.put(record);
         } catch (InterruptedException e) {
            e.printStackTrace();
            return false;
         }
         return true;
       }
  }
  </pre>
  d)      构建处理利用client发送event的内部线程
  <pre lang=&quot;html&quot; line=&quot;1&quot;escaped=&quot;true&quot;>
public class MorphlineTask implements Runnable {
       private BlockingQueue<Record>queue;
       private RpcClientclient;
       public MorphlineTask() {
       }
       public MorphlineTask(BlockingQueue<Record> queue,RpcClient client) {
         this.queue = queue;
         this.client = client;
       }
       @Override
       public void run() {
         Record r = null;
         while (true) {
            try {
                  //从队列中取出Record,并转换成event。
                  r = queue.take();
                  Map<String, String> headers = null;
                  headers = new HashMap<String, String>();
                  ListMultimap<String, Object> lmt =r.getFields();
                  Map<String, Collection<Object>> m =lmt.asMap();
                  Iterator it = m.entrySet().iterator();
                  while (it.hasNext()) {
                     Entry<String, Object> entry = (Entry<String,Object>) it
                            .next();
                     if (entry.getValue() !=null && !entry.getKey().equals(Fields.ATTACHMENT_BODY)) {
                         List v = (List)entry.getValue();
                         if (v.get(0) !=null) {
                            headers.put(entry.getKey(),v.get(0).toString());
                         }
                     }
                  }
                  try{
                     Event e = EventBuilder.withBody((byte[])r.getFirstValue(Fields.ATTACHMENT_BODY), headers);
                     client.append(e);
                     }catch(NullPointerException e){
                     e.printStackTrace();
                     LOG.error(&quot;Rpc Clientis null!&quot;);
                  }
            } catch (InterruptedException e) {
                  // TODO Auto-generatedcatch block
                  e.printStackTrace();
                  break;
            } catch (EventDeliveryException e1) {
                  // TODO Auto-generatedcatch block
                  e1.printStackTrace();
            }

         }
       }
  }
  </pre>
  e)      修改configure(Context context)方法
  <pre lang=&quot;html&quot; line=&quot;1&quot;escaped=&quot;true&quot;>
Collector finalChild =null;
try{
    //初始化内部处理Record的Command
    finalChild = newCollector();
}catch(Exception e){
    e.printStackTrace();
}
//执行线程池中的线程,发送event
for (int i = 0; i <NUM; i&#43;&#43;) {
            es.execute(newMorphlineTask(finalChild.getQueue(),client));
}
</pre>
  3、 MorphlineSink.java
  a)      将该类原来集成的AbstractSink修改成AbstractRpcSink;
  b)      定义私有成员变量RpcClient client;
  c)       定义私有成员变量Properties clientProps;
  d)      修改configure(Context context)方法。
  <pre lang=&quot;html&quot; line=&quot;1&quot;escaped=&quot;true&quot;>
//创建clientProps。为client提供初始化数据
clientProps = new Properties();   clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS,&quot;h1&quot;);   clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX &#43;&quot;h1&quot;, context.getString(&quot;hostname&quot;)&#43; &quot;:&quot; &#43; context.getInteger(&quot;port&quot;));
for (Entry<String,String> entry: context.getParameters().entrySet()) {   clientProps.setProperty(entry.getKey(),entry.getValue());
}
</pre>
  e)      修改start()方法
添加初始化client代码
<pre lang=&quot;html&quot; line=&quot;1&quot;escaped=&quot;true&quot;>
//初始化RpcClient。
         try {
               client =initializeRpcClient(clientProps);
         } catch (Exception e) {
               e.printStackTrace();
}
</pre>
赋值client
在构建handler部分添加赋值client代码
tmpHandler.setRpcClient(client);
  f)       修改stop()方法
finally中添加client.close();

至此Flume-NG-Morphline-Avro-Sink编写完成。如有问题欢迎指正,有兴趣交流的码农欢迎加QQ:58431505,请注明:Flume交流。
页: [1]
查看完整版本: 重写Flume-NG-morphline-avro-sink