设为首页 收藏本站
查看: 1481|回复: 0

[经验分享] flume 读取kafka 数据

[复制链接]

尚未签到

发表于 2015-11-27 19:40:21 | 显示全部楼层 |阅读模式
  本文介绍flume读取kafka数据的方法
  代码:
  

  /*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*  
* http://www.apache.org/licenses/LICENSE-2.0
*  
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*******************************************************************************/
package org.apache.flume.source.kafka;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;

import kafka.message.MessageAndMetadata;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.SyslogParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* A Source for Kafka which reads messages from kafka. I use this in company production environment
* and its performance is good. Over 100k messages per second can be read from kafka in one source.<p>
* <tt>zookeeper.connect: </tt> the zookeeper ip kafka use.<p>
* <tt>topic: </tt> the topic to read from kafka.<p>
* <tt>group.id: </tt> the groupid of consumer group.<p>
*/
public class KafkaSource extends AbstractSource implements Configurable, PollableSource {
    private static final Logger log = LoggerFactory.getLogger(KafkaSource.class);
    private ConsumerConnector consumer;
    private ConsumerIterator<byte[], byte[]> it;
    private String topic;
   
    public Status process() throws EventDeliveryException {
        List<Event> eventList = new ArrayList<Event>();
        MessageAndMetadata<byte[],byte[]> message;
        Event event;
        Map<String, String> headers;
        String strMessage;
        try {
            if(it.hasNext()) {
                message = it.next();
                event = new SimpleEvent();
                headers = new HashMap<String, String>();
                headers.put(&quot;timestamp&quot;, String.valueOf(System.currentTimeMillis()));

                strMessage =  String.valueOf(System.currentTimeMillis()) &#43; &quot;|&quot; &#43; new String(message.message());
                log.debug(&quot;Message: {}&quot;, strMessage);

                event.setBody(strMessage.getBytes());
                //event.setBody(message.message());
                event.setHeaders(headers);
                eventList.add(event);
            }
            getChannelProcessor().processEventBatch(eventList);
            return Status.READY;
        } catch (Exception e) {
            log.error(&quot;KafkaSource EXCEPTION, {}&quot;, e.getMessage());
            return Status.BACKOFF;
        }
    }

    public void configure(Context context) {
        topic = context.getString(&quot;topic&quot;);
        if(topic == null) {
            throw new ConfigurationException(&quot;Kafka topic must be specified.&quot;);
        }
        try {
            this.consumer = KafkaSourceUtil.getConsumer(context);
        } catch (IOException e) {
            log.error(&quot;IOException occur, {}&quot;, e.getMessage());
        } catch (InterruptedException e) {
            log.error(&quot;InterruptedException occur, {}&quot;, e.getMessage());
        }
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        if(consumerMap == null) {
            throw new ConfigurationException(&quot;topicCountMap is null&quot;);
        }
        List<KafkaStream<byte[], byte[]>> topicList = consumerMap.get(topic);
        if(topicList == null || topicList.isEmpty()) {
            throw new ConfigurationException(&quot;topicList is null or empty&quot;);
        }
        KafkaStream<byte[], byte[]> stream =  topicList.get(0);
        it = stream.iterator();
    }

    @Override
    public synchronized void stop() {
        consumer.shutdown();
        super.stop();
    }

}

/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* &quot;License&quot;); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*  
* http://www.apache.org/licenses/LICENSE-2.0
*  
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* &quot;AS IS&quot; BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*******************************************************************************/
package org.apache.flume.source.kafka;


import java.io.IOException;
import java.util.Map;
import java.util.Properties;

import com.google.common.collect.ImmutableMap;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;

import org.apache.flume.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class KafkaSourceUtil {
    private static final Logger log = LoggerFactory.getLogger(KafkaSourceUtil.class);

    public static Properties getKafkaConfigProperties(Context context) {
        log.info(&quot;context={}&quot;,context.toString());
        Properties props = new Properties();
        ImmutableMap<String, String> contextMap = context.getParameters();
        for (Map.Entry<String,String> entry : contextMap.entrySet()) {
            String key = entry.getKey();
            if (!key.equals(&quot;type&quot;) && !key.equals(&quot;channel&quot;)) {
                props.setProperty(entry.getKey(), entry.getValue());
                log.info(&quot;key={},value={}&quot;, entry.getKey(), entry.getValue());
            }
        }
        return props;
    }
    public static ConsumerConnector getConsumer(Context context) throws IOException, InterruptedException {
        ConsumerConfig consumerConfig = new ConsumerConfig(getKafkaConfigProperties(context));
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
        return consumer;
    }
}



配置文件:( /etc/flume/conf/flume-kafka-file.properties)

  agent_log.sources = kafka0
agent_log.channels = ch0
agent_log.sinks = sink0

agent_log.sources.kafka0.channels = ch0
agent_log.sinks.sink0.channel = ch0



agent_log.sources.kafka0.type = org.apache.flume.source.kafka.KafkaSource
agent_log.sources.kafka0.zookeeper.connect = node3:2181,node4:2181,node5:2181
agent_log.sources.kafka0.topic = kkt-test-topic
agent_log.sources.kafka0.group.id= test

agent_log.channels.ch0.type = memory
agent_log.channels.ch0.capacity = 2048
agent_log.channels.ch0.transactionCapacity = 1000


agent_log.sinks.sink0.type=file_roll
agent_log.sinks.sink0.sink.directory=/data/flumeng/data/test
agent_log.sinks.sink0.sink.rollInterval=300


  启动脚本:
  sudo su  -l -s /bin/bash  flume  -c '/usr/lib/flume/bin/flume-ng agent --conf /etc/flume/conf --conf-file /etc/flume/conf/flume-kafka-file.properties -name agent_log -Dflume.root.logger=INFO,console '


  
注意: 红色字体的功能是对原来数据加入时间戳
  版本 flume-1.4.0.2.1.1.0 &#43; kafka2.8.0-0.8.0
  参考资料:https://github.com/baniuyao/flume-kafka

  编译用到的库:
  flume-ng-configuration-1.4.0.2.1.1.0-385
  flume-ng-core-1.4.0.2.1.1.0-385
  flume-ng-sdk-1.4.0.2.1.1.0-385
  flume-tools-1.4.0.2.1.1.0-385
  guava-11.0.2
  kafka_2.8.0-0.8.0
  log4j-1.2.15
  scala-compiler
  scala-library
  slf4j-api-1.6.1
  slf4j-log4j12-1.6.1
  zkclient-0.3
  zookeeper-3.3.4

  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144345-1-1.html 上篇帖子: flume + Kafka采集数据 超简单 下篇帖子: flume读取日志数据写入kafka
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表