设为首页 收藏本站
查看: 1214|回复: 0

[经验分享] flume的自定义sink-Kafka

[复制链接]

尚未签到

发表于 2015-9-17 07:06:28 | 显示全部楼层 |阅读模式
  
    1、创建一个agent,sink类型需指定为自定义sink
        vi /usr/local/flume/conf/agent3.conf
        agent3.sources=as1
        agent3.channels=c1
        agent3.sinks=s1
        agent3.sources.as1.type=avro
        agent3.sources.as1.bind=0.0.0.0
        agent3.sources.as1.port=41414
        agent3.sources.as1.channels=c1
        agent3.channels.c1.type=memory
        agent3.sinks.s1.type=storm.test.kafka.TestKafkaSink
        agent3.sinks.s1.channel=c1
    2、创建自定义kafka sink(自定义kafka sink中包装的是kafka的生产者),代码如下
        //参考flume官方的开发文档:http://flume.apache.org/FlumeDeveloperGuide.html#sink
        //自定义kafkasink需要继承AbstractSink类实现Configurable接口
        //该sink中使用的kafka topic(test111)必须存在
        package storm.test.kafka;
        import java.util.Properties;
        import kafka.javaapi.producer.Producer;
        import kafka.producer.KeyedMessage;
        import kafka.producer.ProducerConfig;
        import kafka.serializer.StringEncoder;
        import org.apache.flume.Channel;
        import org.apache.flume.Context;
        import org.apache.flume.Event;
        import org.apache.flume.EventDeliveryException;
        import org.apache.flume.Transaction;
        import org.apache.flume.conf.Configurable;
        import org.apache.flume.sink.AbstractSink;
        public class TestKafkaSink extends AbstractSink implements Configurable {
            Producer<String, String> producer;
            String topic = "test111";
            
            @Override
            public Status process() throws EventDeliveryException {
                Status status = null;
                Channel channel = getChannel();
                Transaction transaction = channel.getTransaction();
                transaction.begin();
                try {
                    Event event = channel.take();
                    if (event==null) {
                        transaction.rollback();
                        status = Status.BACKOFF;
                        return status;
                    }
                    byte[] body = event.getBody();
                    final String msg = new String(body);
                    final KeyedMessage<String, String> message = new KeyedMessage<String, String>(topic , msg);
                    producer.send(message);
                    transaction.commit();
                    status = Status.READY;            
                } catch (Exception e) {
                    transaction.rollback();
                    status = Status.BACKOFF;
                } finally {
                    transaction.close();
                }
               
                return status;
            }
            @Override
            public void configure(Context arg0) {
                Properties prop = new Properties();
                prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181");
                prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092");
                prop.put("serializer.class", StringEncoder.class.getName());
                producer = new Producer<String, String>(new ProducerConfig(prop));
            }
        }
        将代码打包为kafkasink.jar后复制到flume所在节点上的flume/lib目录下,然后还需要将kafka_2.10-0.8.2.0.jar、kafka-clients-0.8.2.0.jar、metrics-core-2.2.0.jar、scala-library-2.10.4.jar这4个jar包复制到flume所在节点上的flume/lib目录下。
    3、启动flume自定义的kafkasink的agent
        [iyunv@h5 ~]# cd /usr/local/flume/
        [iyunv@h5 flume]# bin/flume-ng agent --conf conf/ --conf-file conf/agent3.conf --name agent3 -Dflume.root.logger=INFO,console
    4、将日志写入到flume的agent,代码如下
        log4j.properties
            log4j.rootLogger=INFO,flume
            log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
            log4j.appender.flume.Hostname = 192.168.1.35
            log4j.appender.flume.Port = 41414
            log4j.appender.flume.UnsafeMode = true
        将日志写入到flume,代码如下
            package com.mengyao.flume;
            import java.io.File;
            import java.io.IOException;
            import java.util.Collection;
            import java.util.List;
            import org.apache.commons.io.FileUtils;
            import org.apache.log4j.Logger;
            public class FlumeProducer {
                private static List<String> getLines() {
                    List<String> lines = null;
                    try {
                        final Collection<File> listFiles = FileUtils.listFiles(new File("D:/"), null, false);
                        for (File file : listFiles) {
                            lines = FileUtils.readLines(file);
                            break;
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    return lines;
                }
               
                public static void main(String[] args) throws Exception {
                    final List<String> lines = getLines();
                    final Logger logger = Logger.getLogger(FlumeProducer.class);
                    for (String line : lines) {
                        logger.info(line+"\t"+System.currentTimeMillis());
                        Thread.sleep(1000);            
                    }
                }   
            }
            必须加入flume-ng-log4jappender-1.5.0-cdh5.1.3-jar-with-dependencies.jar这个依赖jar
    5、使用kafka消费者消费flume(自定义kafka sink中使用了kafka的生产者)生产的数据
        1、消费者shell代码
            [iyunv@h7 kafka]# bin/kafka-console-consumer.sh --zookeeper h7:2181 --topic test111 --from-beginning        ##kafka集群是h5、h6、h7;zookeeper集群是h5、h6、h7。在任意kafka节点上使用消费者都一样
        
        2、消费者java代码
            package storm.test.kafka;
            import java.util.HashMap;
            import java.util.List;
            import java.util.Map;
            import java.util.Properties;
            import kafka.consumer.Consumer;
            import kafka.consumer.ConsumerConfig;
            import kafka.consumer.ConsumerIterator;
            import kafka.consumer.KafkaStream;
            import kafka.javaapi.consumer.ConsumerConnector;
            import kafka.serializer.StringEncoder;
            public class TestConsumer {
                static final String topic = "test111";
               
                public static void main(String[] args) {
                    Properties prop = new Properties();
                    prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181");
                    prop.put("serializer.class", StringEncoder.class.getName());
                    prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092");
                    prop.put("group.id", "group1");
                    ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
                    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
                    topicCountMap.put(topic, 1);
                    Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
                    final KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0);
                    ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
                    while (iterator.hasNext()) {
                        String msg = new String(iterator.next().message());
                        System.out.println("收到消息:"+msg);
                    }
                }
            }

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-114559-1-1.html 上篇帖子: Flume-NG之KafkaChannel 下篇帖子: 开源日志系统比较:scribe、chukwa、kafka、flume
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表