设为首页 收藏本站
查看: 1399|回复: 0

[经验分享] 重写Flume-NG-morphline-avro-sink

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-11-27 17:09:22 | 显示全部楼层 |阅读模式
  之前在CSDN中发过一篇关于如何编写flume的morphline-avro-sink的文章(http://blog.csdn.net/zh_yi/article/details/39552441)。发现浏览次数不少,但没有评论。可能说明看后对大家没有什么帮助吧,最近发现之前写的程序在大数据量环境下存在性能瓶颈,通过该sink的event只能达到200条/秒的发送量,而morphline-solr-sink能达到1000条/秒(当然,受到硬件配置的制约)。于是决定重写这个sink,以提升性能。
  本sink开发的依据是flume-ng-morphline-solr-sink,在此基础上将Morphline处理完的Record变成event,通过RpcClient发送出去。
  开发的环境需要将flume-ng的1.5.2编译通过,并正确导入到eclipse中。
  下面开始编程部分。也就是修改morphline solr sink的部分。
  一、  找到flume源码所在路径(C:\apache-flume-1.5.2-src\flume-ng-sinks),拷贝一份flume-ng-morphline-solr-sink目录到同级文件夹,将目录名称修改成flume-ng-morphline-avro-sink。
  二、  修改flume-ng-morphline-avro-sink目录下的pom.xml文件。修改<artifactId>标签中内容为flume-ng-morphline-avro-sink。修改<name>标签中内容为FlumeNG Morphline Avro Sink。注释掉<properties>标签中的<solr.version>和<solr.expected.version>两个标签内容。因为是原Solr中的内容,这里用不到。
  三、  在eclipse中打开flume-ng-sinks/pom.xml文件,在Overview标签页中的Modules点击Add,找到flume-ng-morphline-avro-sink并选中确定,将新建的morphline-avro-sink添加到Modules中。这样在该pom文件的“pom.xml”标签页中就可以看到多了一个flume-ng-morphline-avro-sink。
  <pre lang=&quot;html&quot;line=&quot;1&quot; escaped=&quot;true&quot;>
<modules>

   <module>flume-hdfs-sink</module>

    <module>flume-irc-sink</module>

   <module>flume-ng-hbase-sink</module>

    <module>flume-ng-elasticsearch-sink</module>

    <module>flume-ng-morphline-solr-sink</module>

    <module>flume-ng-morphline-avro-sink</module>

  </modules>
  </pre>
  这样就是我们新建的sink利用Maven管理起来了。
  四、  修改部分包/类名
  1、 将src/test/java下org.apache.flume.sink.solr.morphline中的solr替换成avro。
  2、 将src/main/java/下org.apache.flume.sink.solr.morphline中的solr替换成avro.
  3、 将src/test/java/org.apache.flume.sink.solr.morphline下的TestMorphlineSolrSink.java修改成TestMorphlineAvroSink.java
  4、 将src/main/java/org.apache.flume.sink.solr.morphline中的MorphlineSolrSink.java修改成MorphlineAvroSink.java
  五、  Flume中包含AvroSink,是通过Rpc连接。需要创建一个RpcClient,然后将flume中接到的eventappend到client中,作为avro event发送出去。这是AvroSink基本的也是最关键的动作。因此,就需要在MorphlineSink中初始化一个RpcClient,并在Handler中处理发送的动作。
  具体实现思路:
  因为处理event的代码是顺序执行的,如果每个event处理完成都append到client发送,会导致Morphline处理Record的效率降低。因此,要将此部分代码进行分离。本次将client发送event的动作采用线程池的方式进行处理。初始化构建一个线程池,线程池中的线程接收morphline处理完成的Record,将Record转换成event后再由client发送。
  六、  Morphline处理event的简单流程
  Morphline是一个ETL工具,采用管道式执行的方式将其中的Record进行处理转换,并按照配置的命令逐条执行,直到得到最终结果后进行加载。
  1、 初始化MorphlineSink;构建clientProps,将Rpc服务的主机和端口进行赋&#20540;,为构建RpcClient做数据准备。
  2、 启动MorphlineSink,调用MorphlineSink的start()方法。
  a)      初始化RpcClient。client = initializeRpcClient(clientProps);
  b)      构建handler。将初始化好的client赋给handler。
  c)       处理event。调用MorphlineSink的process()方法。因为在Morphline中,event将被转换成Morphline定义的Record。在MorphlineSink的process()方法中调用handler的process(event)接口。
  d)      利用handler处理event。正常将event转换成Record。利用Morphline Command继续处理Record。管道式处理,本命令处理完成之后将继续下一条命令的处理。
  七、  修改源代码。
  1、 MorphlineHandler.java
  添加setRpcClient接口:
  <pre lang=&quot;html&quot;line=&quot;1&quot; escaped=&quot;true&quot;>
  public void setRpcClient(RpcClientclient);
  </pre>
  2、 MorphlineHandlerImpl.java
  a)      定义线程池中线程的个数的私有成员变量
  private static final int NUM =3;//线程池中线程个数定义为3,根据实际情况而定。
  定义线程池
  <pre lang=&quot;html&quot;line=&quot;1&quot; escaped=&quot;true&quot;>
  public ExcutorService es =Excutors.newFixedThreadPool(NUM);
  </pre>
  b)      添加Override setRpcClient方法
  <pre lang=&quot;html&quot;line=&quot;1&quot; escaped=&quot;true&quot;>
@Override

    public void setRpcClient(RpcClient client){

        this.client =client;

  }
  </pre>
  c)       构建Collector内部类,其中定义一个队列,用来存储被处理完成的Record。
  <pre lang=&quot;html&quot; line=&quot;1&quot;escaped=&quot;true&quot;>
public static final class Collector implements Command {

       public Collector(){}

       private BlockingQueue<Record>queue = new ArrayBlockingQueue<Record>(10);

       @Override

       public Command getParent() {

           return null;

       }

       public BlockingQueue<Record> getQueue() {

           returnqueue;

       }

       public void setQueue(BlockingQueue<Record> queue) {

           this.queue = queue;

       }

       @Override

       public void notify(Record notification) {

       }

       @Override

       public boolean process(Record record) {

           Preconditions.checkNotNull(record);

           try {

  //接收到Record后,将其压如队列
              queue.put(record);

           } catch (InterruptedException e) {

              e.printStackTrace();

              return false;

           }

           return true;

       }

  }
  </pre>
  d)      构建处理利用client发送event的内部线程
  <pre lang=&quot;html&quot; line=&quot;1&quot;escaped=&quot;true&quot;>
public class MorphlineTask implements Runnable {

       private BlockingQueue<Record>queue;

       private RpcClientclient;

       public MorphlineTask() {

       }

       public MorphlineTask(BlockingQueue<Record> queue,RpcClient client) {

           this.queue = queue;

           this.client = client;

       }

       @Override

       public void run() {

           Record r = null;

           while (true) {

              try {

                  //从队列中取出Record,并转换成event。

                  r = queue.take();

                  Map<String, String> headers = null;

                  headers = new HashMap<String, String>();

                  ListMultimap<String, Object> lmt =r.getFields();

                  Map<String, Collection<Object>> m =lmt.asMap();

                  Iterator it = m.entrySet().iterator();

                  while (it.hasNext()) {

                     Entry<String, Object> entry = (Entry<String,Object>) it

                            .next();

                     if (entry.getValue() !=null && !entry.getKey().equals(Fields.ATTACHMENT_BODY)) {

                         List v = (List)entry.getValue();

                         if (v.get(0) !=null) {

                            headers.put(entry.getKey(),v.get(0).toString());

                         }

                     }

                  }

                  try{

                     Event e = EventBuilder.withBody((byte[])r.getFirstValue(Fields.ATTACHMENT_BODY), headers);

                     client.append(e);

                     }catch(NullPointerException e){

                     e.printStackTrace();

                     LOG.error(&quot;Rpc Clientis null!&quot;);

                  }

              } catch (InterruptedException e) {

                  // TODO Auto-generatedcatch block

                  e.printStackTrace();

                  break;

              } catch (EventDeliveryException e1) {

                  // TODO Auto-generatedcatch block

                  e1.printStackTrace();

              }


           }

       }

  }
  </pre>
  e)      修改configure(Context context)方法
  <pre lang=&quot;html&quot; line=&quot;1&quot;escaped=&quot;true&quot;>
Collector finalChild =null;

try{

    //初始化内部处理Record的Command

    finalChild = newCollector();

}catch(Exception e){

    e.printStackTrace();

}

//执行线程池中的线程,发送event

for (int i = 0; i <NUM; i&#43;&#43;) {

              es.execute(newMorphlineTask(finalChild.getQueue(),client));

}

</pre>

  3、 MorphlineSink.java
  a)      将该类原来集成的AbstractSink修改成AbstractRpcSink;
  b)      定义私有成员变量RpcClient client;
  c)       定义私有成员变量Properties clientProps;
  d)      修改configure(Context context)方法。
  <pre lang=&quot;html&quot; line=&quot;1&quot;escaped=&quot;true&quot;>
//创建clientProps。为client提供初始化数据

clientProps = new Properties();   clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS,&quot;h1&quot;);   clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX &#43;&quot;h1&quot;, context.getString(&quot;hostname&quot;)&#43; &quot;:&quot; &#43; context.getInteger(&quot;port&quot;));

for (Entry<String,String> entry: context.getParameters().entrySet()) {     clientProps.setProperty(entry.getKey(),entry.getValue());

}

</pre>

  e)      修改start()方法
添加初始化client代码

<pre lang=&quot;html&quot; line=&quot;1&quot;escaped=&quot;true&quot;>

//初始化RpcClient

           try {

               client =initializeRpcClient(clientProps);

           } catch (Exception e) {

               e.printStackTrace();

}

</pre>

赋&#20540;client

在构建handler部分添加赋&#20540;client代码

tmpHandler.setRpcClient(client);

  f)       修改stop()方法
finally中添加client.close();


至此Flume-NG-Morphline-Avro-Sink编写完成。如有问题欢迎指正,有兴趣交流的码农欢迎加QQ:58431505,请注明:Flume交流。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144267-1-1.html 上篇帖子: Flume 分布式日志收集 下篇帖子: Flume-ng使用指南
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表