设为首页 收藏本站
查看: 1212|回复: 0

[经验分享] kafka学习四: flume+kafka+storm

[复制链接]

尚未签到

发表于 2015-11-27 20:32:51 | 显示全部楼层 |阅读模式
  转自:http://blog.csdn.net/desilting/article/details/23194039
  
配置flume:
       http://blog.csdn.net/desilting/article/details/22811593
conf/flume-conf.properties文件:

[html] viewplaincopyprint?

  • producer.sources = s  
  • producer.channels = c  
  • producer.sinks = r  
  •   
  • producer.sources.s.channels = c  
  • producer.sources.s.type= netcat  
  • producer.sources.s.bind= 192.168.40.134  
  • producer.sources.s.port= 44444  
  •   
  • producer.sinks.r.type = org.apache.flume.plugins.KafkaSink  
  • producer.sinks.r.metadata.broker.list=192.168.40.134:9092  
  • producer.sinks.r.serializer.class=kafka.serializer.StringEncoder  
  • producer.sinks.r.request.required.acks=1  
  • producer.sinks.r.max.message.size=1000000  
  • producer.sinks.r.custom.topic.name=mykafka  
  • producer.sinks.r.channel = c  
  •   
  • producer.channels.c.type = memory  
  • producer.channels.c.capacity = 1000  



配置kafka:
       http://blog.csdn.net/desilting/article/details/22872839

启动zookeeper、kafka及storm

创建topic:
        bin/kafka-topics.sh --create --zookeeper 192.168.40.132:2181 --replication-factor 3 --partitions 1 --topic  mykafka
查看topic:
       bin/kafka-topics.sh --describe --zookeeper 192.168.40.132:2181
       Topic:mykafka PartitionCount:1ReplicationFactor:3Configs:
       Topic: mykafka Partition: 0Leader: 134Replicas: 133,134,132Isr: 134,133,132

partition同一个topic下可以设置多个partition,将topic下的message存储到不同的partition下,目的是为了提高并行性leader负责此partition的读写操作,每个broker都有可能成为某partition的leaderreplicas副本,即此partition在哪几个broker上有备份,不管broker是否存活isr存活的replicas    启动flume:
       bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console




KafkaSink类:

[java] viewplaincopyprint?

  • import org.slf4j.Logger;  
  • import org.slf4j.LoggerFactory;  
  •   
  • import java.util.Map;  
  • import java.util.Properties;  
  • import kafka.javaapi.producer.Producer;  
  • import kafka.producer.KeyedMessage;  
  • import kafka.producer.ProducerConfig;  
  •   
  • import org.apache.flume.Context;  
  • import org.apache.flume.Channel;  
  • import org.apache.flume.Event;  
  • import org.apache.flume.Transaction;  
  • import org.apache.flume.conf.Configurable;  
  • import org.apache.flume.sink.AbstractSink;  
  • import com.google.common.base.Preconditions;  
  • import com.google.common.collect.ImmutableMap;  
  •   
  • public class KafkaSink extends AbstractSink implements Configurable {  
  •   
  •     private Context context;  
  •     private Properties parameters;  
  •     private Producer<String, String> producer;  
  •   
  •     private static final String PARTITION_KEY_NAME = &quot;custom.partition.key&quot;;  
  •     private static final String CUSTOME_TOPIC_KEY_NAME = &quot;custom.topic.name&quot;;  
  •     private static final String DEFAULT_ENCODING = &quot;UTF-8&quot;;  
  •     private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSink.class);  
  •   
  •     public void configure(Context context) {  
  •         this.context = context;  
  •         ImmutableMap<String, String> props = context.getParameters();  
  •         this.parameters = new Properties();  
  •         for (Map.Entry<String,String> entry : props.entrySet()) {  
  •             this.parameters.put(entry.getKey(), entry.getValue());  
  •         }  
  •     }  
  •   
  •     @Override  
  •     public synchronized void start() {  
  •         super.start();  
  •         ProducerConfig config = new ProducerConfig(this.parameters);  
  •         this.producer = new Producer<String, String>(config);  
  •     }  
  •   
  •     public Status process() {  
  •         Status status = null;  
  •         Channel channel = getChannel();  
  •         Transaction transaction = channel.getTransaction();  
  •   
  •         try {  
  •             transaction.begin();  
  •             Event event = channel.take();  
  •             if (event != null) {  
  •                 String partitionKey = (String) parameters.get(PARTITION_KEY_NAME);  
  •                 String topic = Preconditions.checkNotNull((String) this.parameters.get(CUSTOME_TOPIC_KEY_NAME),  
  •                         &quot;topic name is required&quot;);  
  •                 String eventData = new String(event.getBody(), DEFAULT_ENCODING);  
  •                 KeyedMessage<String, String> data = (partitionKey.isEmpty()) ? new KeyedMessage<String, String>(topic,  
  •                         eventData) : new KeyedMessage<String, String>(topic, partitionKey, eventData);  
  •                 LOGGER.info(&quot;Sending Message to Kafka : [&quot; &#43; topic &#43; &quot;:&quot; &#43; eventData &#43; &quot;]&quot;);  
  •                 producer.send(data);  
  •                 transaction.commit();  
  •                 LOGGER.info(&quot;Send message success&quot;);  
  •                 status = Status.READY;  
  •             } else {  
  •                 transaction.rollback();  
  •                 status = Status.BACKOFF;  
  •             }  
  •         } catch (Exception e) {  
  •             e.printStackTrace();  
  •             LOGGER.info(&quot;Send message failed!&quot;);  
  •             transaction.rollback();  
  •             status = Status.BACKOFF;  
  •         } finally {  
  •             transaction.close();  
  •         }  
  •         return status;  
  •     }  
  •   
  •     @Override  
  •     public void stop() {  
  •         producer.close();  
  •     }  
  • }  



KafkaSpout参考其他人的代码:
https://github.com/HolmesNL/kafka-spout需要做一些修改。


storm测试程序:

[java] viewplaincopyprint?

  • public class ExclamationTopology {  
  •   
  •   public static class ExclamationBolt extends BaseRichBolt {  
  •     OutputCollector _collector;  
  •     transient CountMetric _countMetric;  
  •     transient MultiCountMetric _wordCountMetric;  
  •     transient ReducedMetric _wordLengthMeanMetric;  
  •     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {  
  •       _collector = collector;  
  •       initMetrics(context);  
  •     }  
  •   
  •     public void execute(Tuple tuple) {  
  •       _collector.emit(tuple, new Values(tuple.getString(0) &#43; &quot;!!!&quot;));  
  •       _collector.ack(tuple);  
  •         updateMetrics(tuple.getString(0));  
  •     }  
  •   
  •       void updateMetrics(String word)  
  •       {  
  •           _countMetric.incr();  
  •           _wordCountMetric.scope(word).incr();  
  •           _wordLengthMeanMetric.update(word.length());  
  •       }  
  •     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  •       declarer.declare(new Fields(&quot;word&quot;));  
  •     }  
  •   
  •       void initMetrics(TopologyContext context)  
  •       {  
  •           _countMetric = new CountMetric();  
  •           _wordCountMetric = new MultiCountMetric();  
  •           _wordLengthMeanMetric = new ReducedMetric(new MeanReducer());  
  •   
  •           context.registerMetric(&quot;execute_count&quot;, _countMetric, 5);  
  •           context.registerMetric(&quot;word_count&quot;, _wordCountMetric, 60);  
  •           context.registerMetric(&quot;word_length&quot;, _wordLengthMeanMetric, 60);  
  •       }  
  •   }  
  •   
  •   public static void main(String[] args) throws Exception {  
  •     TopologyBuilder builder = new TopologyBuilder();  
  •     String topic = args.length==2 ? args[1] : args[0];  
  •     KafkaSpout kafkaSpout = new KafkaSpout(topic,&quot;testKafkaGroup&quot;,&quot;192.168.40.132:2181&quot;);  
  •   
  •     builder.setSpout(&quot;word&quot;, kafkaSpout, 10);  
  •     builder.setBolt(&quot;exclaim1&quot;, new ExclamationBolt(), 3).shuffleGrouping(&quot;word&quot;);  
  •     builder.setBolt(&quot;exclaim2&quot;, new ExclamationBolt(), 2).shuffleGrouping(&quot;exclaim1&quot;);  
  •   
  •     Config conf = new Config();  
  •     conf.setDebug(true);  
  •     conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 2);  
  •   
  •     if (args != null && args.length == 2) {  
  •       conf.setNumWorkers(3);  
  •   
  •       StormSubmitter.submitTopology(args[0], conf, builder.createTopology());  
  •     }  
  •     else {  
  •   
  •       LocalCluster cluster = new LocalCluster();  
  •       cluster.submitTopology(&quot;test&quot;, conf, builder.createTopology());  
  •       Utils.sleep(10000);  
  •       cluster.killTopology(&quot;test&quot;);  
  •       cluster.shutdown();  
  •     }  
  •   }  
  • }  

测试结果:
telnet 192.168.40.134 44444

在telnet端随便输入ASDF字符:

[java] viewplaincopyprint?

  • ASDF  
  • OK  
  • ASD  
  • OK  
  • F  
  • OK  
  • ASDF  
  • OK  


flume端显示:

[java] viewplaincopyprint?

  • 2014-04-08 01:30:44,379 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:77)] Sending Message t] Kafka : [mykafka:ASDF  
  • 2014-04-08 01:30:44,387 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:80)] Send message success  
  • 2014-04-08 01:30:44,604 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:77)] Sending Message t] Kafka : [mykafka:ASD  
  • 2014-04-08 01:30:44,611 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:80)] Send message success  
  • 2014-04-08 01:30:44,794 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:77)] Sending Message t] Kafka : [mykafka:F  
  • 2014-04-08 01:30:44,799 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:80)] Send message success  
  • 2014-04-08 01:30:45,038 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.plugins.KafkaSink.process(KafkaSink.java:77)] Sending Message t] Kafka : [mykafka:ASDF  

最终在storm的logs/metrics.log文件中,会找到这样的记录:

[plain] viewplaincopyprint?

  • 2014-04-08 01:30:28,446 495106 1396945828 ubun

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144372-1-1.html 上篇帖子: flume-ng+Kafka+Storm+HDFS+jdbc 实时系统搭建的完美整合 下篇帖子: flume kafka-sink high cpu
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表