qiantin 发表于 2015-11-27 20:32:51

kafka学习四: flume+kafka+storm

  转自:http://blog.csdn.net/desilting/article/details/23194039
  
配置flume:
       http://blog.csdn.net/desilting/article/details/22811593
conf/flume-conf.properties文件:

viewplaincopyprint?http://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg
[*]producer.sources = s
[*]producer.channels = c
[*]producer.sinks = r
[*]
[*]producer.sources.s.channels = c
[*]producer.sources.s.type= netcat
[*]producer.sources.s.bind= 192.168.40.134
[*]producer.sources.s.port= 44444
[*]
[*]producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
[*]producer.sinks.r.metadata.broker.list=192.168.40.134:9092
[*]producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
[*]producer.sinks.r.request.required.acks=1
[*]producer.sinks.r.max.message.size=1000000
[*]producer.sinks.r.custom.topic.name=mykafka
[*]producer.sinks.r.channel = c
[*]
[*]producer.channels.c.type = memory
[*]producer.channels.c.capacity = 1000



配置kafka:
       http://blog.csdn.net/desilting/article/details/22872839

启动zookeeper、kafka及storm

创建topic:
      bin/kafka-topics.sh --create --zookeeper 192.168.40.132:2181 --replication-factor 3 --partitions 1 --topicmykafka
查看topic:
       bin/kafka-topics.sh --describe --zookeeper 192.168.40.132:2181
       Topic:mykafka PartitionCount:1ReplicationFactor:3Configs:
       Topic: mykafka Partition: 0Leader: 134Replicas: 133,134,132Isr: 134,133,132

partition同一个topic下可以设置多个partition,将topic下的message存储到不同的partition下,目的是为了提高并行性leader负责此partition的读写操作,每个broker都有可能成为某partition的leaderreplicas副本,即此partition在哪几个broker上有备份,不管broker是否存活isr存活的replicas    启动flume:
       bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console




KafkaSink类:

viewplaincopyprint?http://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg
[*]import org.slf4j.Logger;
[*]import org.slf4j.LoggerFactory;
[*]
[*]import java.util.Map;
[*]import java.util.Properties;
[*]import kafka.javaapi.producer.Producer;
[*]import kafka.producer.KeyedMessage;
[*]import kafka.producer.ProducerConfig;
[*]
[*]import org.apache.flume.Context;
[*]import org.apache.flume.Channel;
[*]import org.apache.flume.Event;
[*]import org.apache.flume.Transaction;
[*]import org.apache.flume.conf.Configurable;
[*]import org.apache.flume.sink.AbstractSink;
[*]import com.google.common.base.Preconditions;
[*]import com.google.common.collect.ImmutableMap;
[*]
[*]public class KafkaSink extends AbstractSink implements Configurable {
[*]
[*]    private Context context;
[*]    private Properties parameters;
[*]    private Producer<String, String> producer;
[*]
[*]    private static final String PARTITION_KEY_NAME = &quot;custom.partition.key&quot;;
[*]    private static final String CUSTOME_TOPIC_KEY_NAME = &quot;custom.topic.name&quot;;
[*]    private static final String DEFAULT_ENCODING = &quot;UTF-8&quot;;
[*]    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSink.class);
[*]
[*]    public void configure(Context context) {
[*]      this.context = context;
[*]      ImmutableMap<String, String> props = context.getParameters();
[*]      this.parameters = new Properties();
[*]      for (Map.Entry<String,String> entry : props.entrySet()) {
[*]            this.parameters.put(entry.getKey(), entry.getValue());
[*]      }
[*]    }
[*]
[*]    @Override
[*]    public synchronized void start() {
[*]      super.start();
[*]      ProducerConfig config = new ProducerConfig(this.parameters);
[*]      this.producer = new Producer<String, String>(config);
[*]    }
[*]
[*]    public Status process() {
[*]      Status status = null;
[*]      Channel channel = getChannel();
[*]      Transaction transaction = channel.getTransaction();
[*]
[*]      try {
[*]            transaction.begin();
[*]            Event event = channel.take();
[*]            if (event != null) {
[*]                String partitionKey = (String) parameters.get(PARTITION_KEY_NAME);
[*]                String topic = Preconditions.checkNotNull((String) this.parameters.get(CUSTOME_TOPIC_KEY_NAME),
[*]                        &quot;topic name is required&quot;);
[*]                String eventData = new String(event.getBody(), DEFAULT_ENCODING);
[*]                KeyedMessage<String, String> data = (partitionKey.isEmpty()) ? new KeyedMessage<String, String>(topic,
[*]                        eventData) : new KeyedMessage<String, String>(topic, partitionKey, eventData);
[*]                LOGGER.info(&quot;Sending Message to Kafka : [&quot; &#43; topic &#43; &quot;:&quot; &#43; eventData &#43; &quot;]&quot;);
[*]                producer.send(data);
[*]                transaction.commit();
[*]                LOGGER.info(&quot;Send message success&quot;);
[*]                status = Status.READY;
[*]            } else {
[*]                transaction.rollback();
[*]                status = Status.BACKOFF;
[*]            }
[*]      } catch (Exception e) {
[*]            e.printStackTrace();
[*]            LOGGER.info(&quot;Send message failed!&quot;);
[*]            transaction.rollback();
[*]            status = Status.BACKOFF;
[*]      } finally {
[*]            transaction.close();
[*]      }
[*]      return status;
[*]    }
[*]
[*]    @Override
[*]    public void stop() {
[*]      producer.close();
[*]    }
[*]}



KafkaSpout参考其他人的代码:
https://github.com/HolmesNL/kafka-spout需要做一些修改。


storm测试程序:

viewplaincopyprint?http://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg
[*]public class ExclamationTopology {
[*]
[*]public static class ExclamationBolt extends BaseRichBolt {
[*]    OutputCollector _collector;
[*]    transient CountMetric _countMetric;
[*]    transient MultiCountMetric _wordCountMetric;
[*]    transient ReducedMetric _wordLengthMeanMetric;
[*]    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
[*]      _collector = collector;
[*]      initMetrics(context);
[*]    }
[*]
[*]    public void execute(Tuple tuple) {
[*]      _collector.emit(tuple, new Values(tuple.getString(0) &#43; &quot;!!!&quot;));
[*]      _collector.ack(tuple);
[*]      updateMetrics(tuple.getString(0));
[*]    }
[*]
[*]      void updateMetrics(String word)
[*]      {
[*]          _countMetric.incr();
[*]          _wordCountMetric.scope(word).incr();
[*]          _wordLengthMeanMetric.update(word.length());
[*]      }
[*]    public void declareOutputFields(OutputFieldsDeclarer declarer) {
[*]      declarer.declare(new Fields(&quot;word&quot;));
[*]    }
[*]
[*]      void initMetrics(TopologyContext context)
[*]      {
[*]          _countMetric = new CountMetric();
[*]          _wordCountMetric = new MultiCountMetric();
[*]          _wordLengthMeanMetric = new ReducedMetric(new MeanReducer());
[*]
[*]          context.registerMetric(&quot;execute_count&quot;, _countMetric, 5);
[*]          context.registerMetric(&quot;word_count&quot;, _wordCountMetric, 60);
[*]          context.registerMetric(&quot;word_length&quot;, _wordLengthMeanMetric, 60);
[*]      }
[*]}
[*]
[*]public static void main(String[] args) throws Exception {
[*]    TopologyBuilder builder = new TopologyBuilder();
[*]    String topic = args.length==2 ? args[1] : args[0];
[*]    KafkaSpout kafkaSpout = new KafkaSpout(topic,&quot;testKafkaGroup&quot;,&quot;192.168.40.132:2181&quot;);
[*]
[*]    builder.setSpout(&quot;word&quot;, kafkaSpout, 10);
[*]    builder.setBolt(&quot;exclaim1&quot;, new ExclamationBolt(), 3).shuffleGrouping(&quot;word&quot;);
[*]    builder.setBolt(&quot;exclaim2&quot;, new ExclamationBolt(), 2).shuffleGrouping(&quot;exclaim1&quot;);
[*]
[*]    Config conf = new Config();
[*]    conf.setDebug(true);
[*]    conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 2);
[*]
[*]    if (args != null && args.length == 2) {
[*]      conf.setNumWorkers(3);
[*]
[*]      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
[*]    }
[*]    else {
[*]
[*]      LocalCluster cluster = new LocalCluster();
[*]      cluster.submitTopology(&quot;test&quot;, conf, builder.createTopology());
[*]      Utils.sleep(10000);
[*]      cluster.killTopology(&quot;test&quot;);
[*]      cluster.shutdown();
[*]    }
[*]}
[*]}

测试结果:
telnet 192.168.40.134 44444

在telnet端随便输入ASDF字符:

viewplaincopyprint?http://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg
[*]ASDF
[*]OK
[*]ASD
[*]OK
[*]F
[*]OK
[*]ASDF
[*]OK


flume端显示:

viewplaincopyprint?http://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg
[*]2014-04-08 01:30:44,379 (SinkRunner-PollingRunner-DefaultSinkProcessor) 77)] Sending Message t] Kafka :
[*]2014-04-08 01:30:44,387 (SinkRunner-PollingRunner-DefaultSinkProcessor) 80)] Send message success
[*]2014-04-08 01:30:44,604 (SinkRunner-PollingRunner-DefaultSinkProcessor) 77)] Sending Message t] Kafka :
[*]2014-04-08 01:30:44,611 (SinkRunner-PollingRunner-DefaultSinkProcessor) 80)] Send message success
[*]2014-04-08 01:30:44,794 (SinkRunner-PollingRunner-DefaultSinkProcessor) 77)] Sending Message t] Kafka :
[*]2014-04-08 01:30:44,799 (SinkRunner-PollingRunner-DefaultSinkProcessor) 80)] Send message success
[*]2014-04-08 01:30:45,038 (SinkRunner-PollingRunner-DefaultSinkProcessor) 77)] Sending Message t] Kafka :

最终在storm的logs/metrics.log文件中,会找到这样的记录:

viewplaincopyprint?http://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg
[*]2014-04-08 01:30:28,446 495106 1396945828 ubun
页: [1]
查看完整版本: kafka学习四: flume+kafka+storm