qq78707 发表于 2019-1-30 09:41:45

Flume NG 学习笔记(十) Transaction、Sink、Source和Channel开发

版权声明:本文为博主原创文章,未经博主允许不得转载。
目录(?)[+]
一、Transaction interface
  Transaction接口是基于flume的稳定性考虑的。所有主要的组件(sources、sinks、channels)都必须使用Flume Transaction。我们也可以理解Transaction接口就是flume的事务,sources和sinks的发送数据与接受数据都是在一个Transaction里完成的。


  从上图中可以看出,一个Transaction在Channel实现内实现。每一个连接到channel的source和sink都要获取一个Transaction对象。这Sources实际上使用了一个ChannelSelector接口来封装Transaction。存放事件到channel和从channel中提取事件的操作是在一个活跃的Transaction内执行的。
  下面是官网例子
  

   view plain copy

[*]  Channel ch = new MemoryChannel();
[*]  Transaction txn = ch.getTransaction();
[*]  txn.begin();
[*]  try {
[*]  // This try clause includes whatever Channel operations you want to do
[*]  
[*]  Event eventToStage = EventBuilder.withBody("Hello Flume!",
[*]                       Charset.forName("UTF-8"));
[*]  ch.put(eventToStage);
[*]  // Event takenEvent = ch.take();
[*]  // ...
[*]  txn.commit();
[*]  } catch (Throwable t) {
[*]  txn.rollback();
[*]  
[*]  // Log exception, handle individual exceptions as needed
[*]  
[*]  // re-throw all Errors
[*]  if (t instanceof Error) {
[*]      throw (Error)t;
[*]  }
[*]  } finally {
[*]  txn.close();
[*]  }
  

  上面的代码是一个很简单的Transaction示例,在自定义Source与自定义Sink中都要使用。
二、自定义Sink开发
  Sink提取event数据从channel中,然后直接将数据发送到下一个flume agent中或者存储到外部库中。
  Sink和channel的关联关系可以在配置文件中配置。有一个SinkRunner实例与每一个已配置的Sink关联,当Flume框架调用SinkRunner.start()方法时候,将创建一个新的线程来驱动这Sink。
  这个线程将管理这个Sink的生命周期。Sink需要实现LifecycleAware接口的start()和stop()方法。start()方法用于初始化数据;stop()用于释放资源;process()是从channel中提取event数据和转发数据的核心方法。
  这Sink需要实现Configurable接口以便操作配置文件。
  下面是官网例子:
   view plain copy

[*]  public class MySink extends AbstractSink implements Configurable {
[*]  private String myProp;
[*]  
[*]  @Override
[*]  public void configure(Context context) {
[*]      String myProp = context.getString("myProp", "defaultValue");
[*]  
[*]      // Process the myProp value (e.g. validation)
[*]  
[*]      // Store myProp for later retrieval by process() method
[*]      this.myProp = myProp;
[*]  }
[*]  
[*]  @Override
[*]  public void start() {
[*]      // Initialize the connection to the external repository (e.g. HDFS) that
[*]      // this Sink will forward Events to ..
[*]  }
[*]  
[*]  @Override
[*]  public void stop () {
[*]      // Disconnect from the external respository and do any
[*]      // additional cleanup (e.g. releasing resources or nulling-out
[*]      // field values) ..
[*]  }
[*]  
[*]  @Override
[*]  public Status process() throws EventDeliveryException {
[*]      Status status = null;
[*]  
[*]      // Start transaction
[*]      Channel ch = getChannel();
[*]      Transaction txn = ch.getTransaction();
[*]      txn.begin();
[*]      try {
[*]        // This try clause includes whatever Channel operations you want to do
[*]  
[*]        Event event = ch.take();
[*]  
[*]        // Send the Event to the external repository.
[*]        // storeSomeData(e);
[*]  
[*]        txn.commit();
[*]        status = Status.READY;
[*]      } catch (Throwable t) {
[*]        txn.rollback();
[*]  
[*]        // Log exception, handle individual exceptions as needed
[*]  
[*]        status = Status.BACKOFF;
[*]  
[*]        // re-throw all Errors
[*]        if (t instanceof Error) {
[*]        throw (Error)t;
[*]        }
[*]      } finally {
[*]        txn.close();
[*]      }
[*]      return status;
[*]  }
[*]  }
  下面是测试例子:

   view plain copy

[*]  import org.apache.flume.Channel;
[*]  import org.apache.flume.Context;
[*]  import org.apache.flume.Event;
[*]  import org.apache.flume.EventDeliveryException;
[*]  import org.apache.flume.Transaction;
[*]  import org.apache.flume.conf.Configurable;
[*]  
[*]  import org.apache.flume.sink.AbstractSink;
[*]  
[*]  
[*]  public class Custom_Sink extends AbstractSink implements Configurable {
[*]        private String myProp;
[*]     @Override
[*]        public void configure(Context context) {
[*]        String myProp = context.getString("myProp", "defaultValue");
[*]  
[*]        // Process the myProp value (e.g. validation)
[*]  
[*]        // Store myProp for later retrieval by process() method
[*]        this.myProp = myProp;
[*]        }
[*]  
[*]        @Override
[*]        public void start() {
[*]        // Initialize the connection to the external repository (e.g. HDFS) that
[*]        // this Sink will forward Events to ..
[*]        }
[*]  
[*]        @Override
[*]        public void stop () {
[*]        // Disconnect from the external respository and do any
[*]        // additional cleanup (e.g. releasing resources or nulling-out
[*]        // field values) ..
[*]        }
[*]  
[*]        @Override
[*]        public Status process() throws EventDeliveryException {
[*]        Status status = null;
[*]  
[*]        // Start transaction
[*]        Channel ch = getChannel();
[*]        Transaction txn = ch.getTransaction();
[*]        txn.begin();
[*]        try {
[*]            // This try clause includes whatever Channel operations you want to do
[*]              
[*]            Event event = ch.take();
[*]            String out = new String(event.getBody());   
[*]            // Send the Event to the external repository.
[*]            // storeSomeData(e);
[*]            System.out.println(out);
[*]              
[*]            txn.commit();
[*]            status = Status.READY;
[*]        } catch (Throwable t) {
[*]            txn.rollback();
[*]  
[*]            // Log exception, handle individual exceptions as needed
[*]  
[*]            status = Status.BACKOFF;
[*]  
[*]            // re-throw all Errors
[*]            if (t instanceof Error) {
[*]              throw (Error)t;
[*]            }
[*]        } finally {
[*]            txn.close();
[*]        }
[*]        return status;
[*]        }
[*]  
[*]  }
  上面的测试例子只输出事件的BODY信息,这里说明下直接用代码event.getBody().tostring() 输出是乱码。因为所有sink都是在Transaction里完成的,因此自定义开发sink是需要加上Transaction相关设置。
  然后是测试配置,这里是自定义的jar 包是flumedev.Custom_Sink。注意,打包之后请放在目录$FLUME_HOME/lib下
   view plain copy

[*]  #配置文件:custom_sink_case23.conf
[*]  # Name the components on this agent
[*]  a1.sources = r1
[*]  a1.sinks = k1
[*]  a1.channels = c1
[*]  
[*]  # Describe/configure the source
[*]  a1.sources.r1.type = syslogtcp
[*]  a1.sources.r1.port = 50000
[*]  a1.sources.r1.bind = 192.168.233.128
[*]  a1.sources.r1.channels = c1
[*]  
[*]  # Describe the sink
[*]  a1.sinks.k1.channel = c1
[*]  a1.sinks.k1.type = flumedev.Custom_Sink
[*]  #a1.sinks.k1.type =logger
[*]  
[*]  # Use a channel which buffers events in memory
[*]  a1.channels.c1.type = memory
[*]  a1.channels.c1.capacity = 1000
[*]  a1.channels.c1.transactionCapacity = 100
  #敲命令
  flume-ng agent -cconf -f conf/custom_sink_case23.conf -n a1 -Dflume.root.logger=INFO,console
  启动成功后
  打开另一个终端输入,往侦听端口送数据
  echo "testcustom_sink" | nc 192.168.233.128 50000
  #在启动的终端查看console输出
http://img.blog.csdn.net/20141031155552532?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvbG9va2xvb2s1/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center

  可以看到数据正常输出。
  

三、自定义Source开发
  Source从外面接收数据并把数据存入Channel中。很少有人用。
  下面是官网的例子
   view plain copy

[*]  public class MySource extends AbstractSource implements Configurable, PollableSource {
[*]  private String myProp;
[*]  
[*]  @Override
[*]  public void configure(Context context) {
[*]      String myProp = context.getString("myProp", "defaultValue");
[*]  
[*]      // Process the myProp value (e.g. validation, convert to another type, ...)
[*]  
[*]      // Store myProp for later retrieval by process() method
[*]      this.myProp = myProp;
[*]  }
[*]  
[*]  @Override
[*]  public void start() {
[*]      // Initialize the connection to the external client
[*]  }
[*]  
[*]  @Override
[*]  public void stop () {
[*]      // Disconnect from external client and do any additional cleanup
[*]      // (e.g. releasing resources or nulling-out field values) ..
[*]  }
[*]  
[*]  @Override
[*]  public Status process() throws EventDeliveryException {
[*]      Status status = null;
[*]  
[*]      // Start transaction
[*]      Channel ch = getChannel();
[*]      Transaction txn = ch.getTransaction();
[*]      txn.begin();
[*]      try {
[*]        // This try clause includes whatever Channel operations you want to do
[*]  
[*]        // Receive new data
[*]        Event e = getSomeData();
[*]  
[*]        // Store the Event into this Source's associated Channel(s)
[*]        getChannelProcessor().processEvent(e)
[*]  
[*]        txn.commit();
[*]        status = Status.READY;
[*]      } catch (Throwable t) {
[*]        txn.rollback();
[*]  
[*]        // Log exception, handle individual exceptions as needed
[*]  
[*]        status = Status.BACKOFF;
[*]  
[*]        // re-throw all Errors
[*]        if (t instanceof Error) {
[*]        throw (Error)t;
[*]        }
[*]      } finally {
[*]        txn.close();
[*]      }
[*]      return status;
[*]  }
[*]  }
  

  测试的话,主要针对Event e 这里进行传输数据,这里就不测试了。
四、自定义Channel开发
  官网说待定。
  下面是美团网的自定义Channel 开发,下面是链接
  http://tech.meituan.com/mt-log-system-optimization.html
……
Flume本身提供了MemoryChannel和FileChannel。MemoryChannel处理速度快,但缓存大小有限,且没有持久化;FileChannel则刚好相反。我们希望利用两者的优势,在Sink处理速度够快,Channel没有缓存过多日志的时候,就使用MemoryChannel,当Sink处理速度跟不上,又需要Channel能够缓存下应用端发送过来的日志时,就使用FileChannel,由此我们开发了DualChannel,能够智能的在两个Channel之间切换。
其具体的逻辑如下:
   view plain copy

[*]  /***
[*]   * putToMemChannel indicate put event to memChannel or fileChannel
[*]   * takeFromMemChannel indicate take event from memChannel or fileChannel
[*]   * */
[*]  private AtomicBoolean putToMemChannel = new AtomicBoolean(true);
[*]  private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);
[*]  
[*]  void doPut(Event event) {
[*]        if (switchon && putToMemChannel.get()) {
[*]              //往memChannel中写数据
[*]              memTransaction.put(event);
[*]  
[*]              if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) {
[*]                  putToMemChannel.set(false);
[*]              }
[*]        } else {
[*]              //往fileChannel中写数据
[*]              fileTransaction.put(event);
[*]        }
[*]  }
[*]  
[*]  Event doTake() {
[*]      Event event = null;
[*]      if ( takeFromMemChannel.get() ) {
[*]        //从memChannel中取数据
[*]        event = memTransaction.take();
[*]        if (event == null) {
[*]              takeFromMemChannel.set(false);
[*]        }   
[*]      } else {
[*]        //从fileChannel中取数据
[*]        event = fileTransaction.take();
[*]        if (event == null) {
[*]              takeFromMemChannel.set(true);
[*]  
[*]              putToMemChannel.set(true);
[*]        }   
[*]      }
[*]      return event;
[*]  }
  这里要说明下,官网是建议使用file channel,虽然它的效率比较低,但是它能保证数据完整性,而memory channel效率高,但是只能对数据丢失和重复不太敏感的业务使用
  




页: [1]
查看完整版本: Flume NG 学习笔记(十) Transaction、Sink、Source和Channel开发