biao199423 发表于 2015-11-27 17:05:27

Flume与Kafka整合

   1、构造
   Flume与Kafka整合就是接口的实现,将Kafka的producer API实现为Flume的sink。简单理解就是将Flume的输出(sinks)作为Kafka的输入(producer)。
   
   2、准备工作
   1)将Kafka libs下所有的包都拷贝到Flume的lib目录下,这样运行基本不会有问题。
   2)做了第一步工作之后,在Flume的lib目录下找到如下这些包,在接下来的eclipse开发中需要这些包:









   3、代码实现

package com.mrshen.cvlab.scut;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
public class KafkaSink extends AbstractSink implements Configurable {
private static final Log logger = LogFactory.getLog(KafkaSink.class);
private String topic;
private Producer<String, String>producer;
@Override
public Status process() throws EventDeliveryException {
// TODO Auto-generated method stub
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
try {
transaction.begin();
Event event = channel.take();
if (event == null ) {
transaction.rollback();
return Status.BACKOFF;
}
KeyedMessage<String, String>data = new KeyedMessage<String, String>(
topic, new String(event.getBody()));
producer.send(data);
logger.info(&quot;flume2kafka: &quot; + new String(event.getBody()));
transaction.commit();
return Status.READY;
} catch (Exception e) {
// TODO: handle exception
logger.error(&quot;Flume KafkaSinkException&quot;, e);
transaction.rollback();
return Status.BACKOFF;
} finally {
transaction.close();
}
}
@Override
public void configure(Context arg0) {
// TODO Auto-generated method stub
topic = &quot;kafkaTopic&quot;;
Properties properties = new Properties();
properties.setProperty(&quot;metadata.broker.list&quot;, &quot;storm:9092&quot;);
properties.setProperty(&quot;serializer.class&quot;, &quot;kafka.serializer.StringEncoder&quot;);
properties.setProperty(&quot;num.partitions&quot;, &quot;4&quot;);
/**/properties.put(&quot;partitioner.class&quot;, &quot;kafka.producer.DefaultPartitioner&quot;);
/**/properties.put(&quot;zookeeper.connect&quot;, &quot;storm:2181&quot;);
properties.put(&quot;request.required.acks&quot;, &quot;1&quot;);
ProducerConfig pConfig = new ProducerConfig(properties);
producer = new Producer<String, String>(pConfig);
logger.info(&quot;KafkaSink init successfully!&quot;);
}
}



   PS:实现的configure抽象方法中参数(key)的设置是根据Kafka的producer API来填充的,wiki:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0&#43;Producer&#43;Example,而参数中的值(value)用到时官网默认的值,在文档 http://kafka.apache.org/082/documentation.html#producerconfigs 的3.3
Producer Configs 小节。



   4、测试
   1)首先导出上述代码的jar包,放在Flume的lib目录下。
   2)配置Flume的agent文件,在本例中,主要修改了sink的type,将其指定为我们写好的KafkaSink类。








   3)启动zookeeper和Kafka服务
   4)在终端中启动flume agent(截取部分日志)















   5)在终端中向Flume发送syslog
   echo “test flume 2 kafka” | nc storm 6666
   nc后面写上你的主机名和在agent的配置文件中指定的端口号,执行该命令之后可以在启动flume的终端看到该信息









   6)在另一个终端开启Kafka消费者,订阅在KafkaSink类中实现的topic “kafkaTopic”









   成功看到Kafka消费者受到Flume端发送来的消息。至此Flume与Kafka的整合完成。
页: [1]
查看完整版本: Flume与Kafka整合