设为首页 收藏本站
查看: 1149|回复: 0

[经验分享] _00022 Flume-1.5.0+Kafka_2.9.2-0.8.1.1+Storm-0.9.2 分布式环境整合

[复制链接]

尚未签到

发表于 2015-11-27 17:39:26 | 显示全部楼层 |阅读模式
  博文作者:妳那伊抹微笑
  itdog8 地址链接 : http://www.itdog8.com(个人链接)

博客地址:http://blog.csdn.net/u012185296
博文标题:_00022 Flume-1.5.0+Kafka_2.9.2-0.8.1.1+Storm-0.9.2 分布式环境整合
个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在
技术方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 云计算技术
转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作!
qq交流群:214293307   DSC0000.png (期待与你一起学习,共同进步)


# Flume-1.5.0+Kafka_2.9.2-0.8.1.1+Storm-0.9.2 分布式环境整合
# 学习前言
框架整合中用到的所有工程代码,jar包什么的都已经上传到群214293307共享中,需要的话自己下载研究了。

本博文整合Flume+Kafka+Storm中的Eclipse工程代码下载地址http://download.csdn.net/detail/u012185296/7633405
# Flume的学习请参考_00016 Flume的体系结构介绍以及Flume入门案例(往HDFS上传数据)这篇博文
# Kafka的学习请参考_00017 Kafka的体系结构介绍以及Kafka入门案例(初级案例+Java API的使用)这篇博文
# Storm的学习请参考_00019 Storm的体系结构介绍以及Storm入门案例(官网上的简单Java案例)这篇博文
请学习了以上直来再来进行环境整合,你懂的,不解释 ...(纳尼?路都不会走你就想跑了,饿特么就是一巴掌 、、、)# 整合场景
  使用Flume监控指定目录,出现新的日志文件后将文件数据传到Kafka,最后由StormKafka中取出数据并显示、、、
# Flume+Kafka的整合
# Flume的fks001.conf的配置文件
  监控指定目录/usr/local/yting/flume/tdata/tdir1,然后使用自定义的Sink(com.yting.cloud.flume.sink.KafkaSink),将数据传入Kafka
  
  root@rs229 fks]# pwd
  /usr/local/adsit/yting/apache/flume/apache-flume-1.5.0-bin/conf/ytconf/fks
  [iyunv@rs229 fks]# vi fks001.conf
  # fks : yting yousmile flume kafka storm integration
  fks.sources=source1
  fks.sinks=sink1
  fks.channels=channel1
  
  # configure source1
  fks.sources.source1.type=spooldir
  fks.sources.source1.spoolDir=/usr/local/yting/flume/tdata/tdir1
  fks.sources.source1.fileHeader = false
  
  # configure sink1
  fks.sinks.sink1.type=com.yting.cloud.flume.sink.KafkaSink    #(自定义SinkFlume监控数据传入Kafka
  
  # configure channel1
  fks.channels.channel1.type=file
  fks.channels.channel1.checkpointDir=/usr/local/yting/flume/checkpointdir/tcpdir/example_fks_001
  fks.channels.channel1.dataDirs=/usr/local/yting/flume/datadirs/tddirs/example_fks_001
  
  # bind source and sink
  fks.sources.source1.channels=channel1
  fks.sinks.sink1.channel=channel1
# Kafka 的server.properties配置文件
  [iyunv@rs229 ytconf]# pwd
  /usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/config/ytconf
  [iyunv@rs229 ytconf]# vi server.properties
  
  # A comma seperated list of directories under whichto store log files
  # log.dirs=/tmp/kafka-logs
  log.dirs=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/kafka-logs
  # root directory for all kafka znodes.
  zookeeper.connect=rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafka
# jar包的复制,Kafka中的jar包复制到Flume中去,因为自定义的Sink(com.yting.cloud.flume.sink.KafkaSink)会用到,如果不做这一步,会抱异常的!
  [iyunv@rs229 lib]# pwd
  /usr/local/adsit/yting/apache/flume/apache-flume-1.5.0-bin/lib
  [iyunv@rs229 lib]# cp/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/libs/* .
# 使用Eclipse将自定义的Sink(com.yting.cloud.flume.sink.KafkaSink)打成jar包放入$FLUME_HOME/libs目录下去
  纳尼?这里不会,那你还是跟饿学养猪吧 、、、
# Kafka的启动
  [iyunv@rs229 kafka_2.9.2-0.8.1.1]# pwd
  /usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1
  [iyunv@rs229 kafka_2.9.2-0.8.1.1]# bin/kafka-server-start.sh config/ytconf/server.properties&
  [1] 24672
  [iyunv@rs229 kafka_2.9.2-0.8.1.1]# [2014-07-1411:48:24,533] INFO Verifying properties (kafka.utils.VerifiableProperties)
  [2014-07-14 11:48:24,572] INFO Property broker.id isoverridden to 0 (kafka.utils.VerifiableProperties)
  [2014-07-14 11:48:24,572] INFO Propertylog.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
  [2014-07-14 11:48:24,572] INFO Property log.dirs isoverridden to/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/kafka-logs(kafka.utils.VerifiableProperties)
  [2014-07-14 11:48:24,572] INFO Propertylog.retention.check.interval.ms is overridden to 60000(kafka.utils.VerifiableProperties)
  [2014-07-14 11:48:24,572] INFO Propertylog.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
  [2014-07-14 11:48:24,573] INFO Propertylog.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
  [2014-07-14 11:48:24,573] INFO Propertynum.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
  [2014-07-14 11:48:24,573] INFO Propertynum.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)
  [2014-07-14 11:48:24,573] INFO Property num.partitionsis overridden to 2 (kafka.utils.VerifiableProperties)
  [2014-07-14 11:48:24,573] INFO Property port isoverridden to 9092 (kafka.utils.VerifiableProperties)
  [2014-07-14 11:48:24,573] INFO Propertysocket.receive.buffer.bytes is overridden to 1048576(kafka.utils.VerifiableProperties)
  [2014-07-14 11:48:24,574] INFO Propertysocket.request.max.bytes is overridden to 104857600(kafka.utils.VerifiableProperties)
  [2014-07-14 11:48:24,574] INFO Propertysocket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
  [2014-07-14 11:48:24,574] INFO Propertyzookeeper.connect is overridden tors229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafka(kafka.utils.VerifiableProperties)
  [2014-07-14 11:48:24,574] INFO Property zookeeper.connection.timeout.msis overridden to 1000000 (kafka.utils.VerifiableProperties)
  [2014-07-14 11:48:24,590] INFO [Kafka Server 0],starting (kafka.server.KafkaServer)
  [2014-07-14 11:48:24,592] INFO [Kafka Server 0],Connecting to zookeeper on rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafka(kafka.server.KafkaServer)
  [2014-07-14 11:48:24,603] INFO Starting ZkClientevent thread. (org.I0Itec.zkclient.ZkEventThread)
  [2014-07-14 11:48:24,610] INFO Clientenvironment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT(org.apache.zookeeper.ZooKeeper)
  [2014-07-14 11:48:24,610] INFO Clientenvironment:host.name=rs229 (org.apache.zookeeper.ZooKeeper)
  [2014-07-14 11:48:24,610] INFO Clientenvironment:java.version=1.7.0_60 (org.apache.zookeeper.ZooKeeper)
  [2014-07-14 11:48:24,610] INFO Clientenvironment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
  [2014-07-14 11:48:24,610] INFO Clientenvironment:java.home=/usr/local/adsit/yting/jdk/jdk1.7.0_60/jre(org.apache.zookeeper.ZooKeeper)
  [2014-07-14 11:48:24,610] INFO Clientenvironment:java.class.path=:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../core/build/dependant-libs-2.8.0/*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../clients/build/libs//kafka-clients*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../examples/build/libs//kafka-examples*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/jopt-simple-3.2.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-javadoc.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-scaladoc.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-sources.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/log4j-1.2.15.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/metrics-core-2.2.0.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/scala-library-2.9.2.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/slf4j-api-1.7.2.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/snappy-java-1.0.5.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/zkclient-0.3.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/zookeeper-3.3.4.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../core/build/libs/kafka_2.8.0*.jar(org.apache.zookeeper.ZooKeeper)
  [2014-07-14 11:48:24,610] INFO Clientenvironment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib(org.apache.zookeeper.ZooKeeper)
  [2014-07-14 11:48:24,610] INFO Clientenvironment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
  [2014-07-14 11:48:24,610] INFO Clientenvironment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
  [2014-07-14 11:48:24,610] INFO Clientenvironment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
  [2014-07-14 11:48:24,610] INFO Clientenvironment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
  [2014-07-14 11:48:24,610] INFO Clientenvironment:os.version=2.6.32-279.el6.x86_64 (org.apache.zookeeper.ZooKeeper)
  [2014-07-14 11:48:24,610] INFO Clientenvironment:user.name=root (org.apache.zookeeper.ZooKeeper)
  [2014-07-14 11:48:24,610] INFO Client environment:user.home=/root(org.apache.zookeeper.ZooKeeper)
  [2014-07-14 11:48:24,610] INFO Clientenvironment:user.dir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1(org.apache.zookeeper.ZooKeeper)
  [2014-07-14 11:48:24,611] INFO Initiating client connection,connectString=rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafkasessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@7c8b7ac9(org.apache.zookeeper.ZooKeeper)
  [2014-07-14 11:48:24,625] INFO Opening socketconnection to server rs198/116.255.234.198:2181(org.apache.zookeeper.ClientCnxn)
  [2014-07-14 11:48:24,631] INFO Socket connectionestablished to rs198/116.255.234.198:2181, initiating session(org.apache.zookeeper.ClientCnxn)
  [2014-07-14 11:48:24,642] INFO Session establishmentcomplete on server rs198/116.255.234.198:2181, sessionid = 0xc6472c07f50b0000,negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
  [2014-07-14 11:48:24,645] INFO zookeeper statechanged (SyncConnected) (org.I0Itec.zkclient.ZkClient)
  [2014-07-14 11:48:24,892] INFO Found clean shutdownfile. Skipping recovery for all logs in data directory'/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/kafka-logs'(kafka.log.LogManager)
  [2014-07-14 11:48:24,894] INFO Loading log'flume-kafka-storm-001-0' (kafka.log.LogManager)
  [2014-07-14 11:48:24,945] INFO Completed load of logflume-kafka-storm-001-0 with log end offset 18 (kafka.log.Log)
  SLF4J: Failed to load class&quot;org.slf4j.impl.StaticLoggerBinder&quot;.
  SLF4J: Defaulting to no-operation (NOP) loggerimplementation
  SLF4J: Seehttp://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
  [2014-07-14 11:48:24,966] INFO Loading log'flume-kafka-storm-001-1' (kafka.log.LogManager)
  [2014-07-14 11:48:24,969] INFO Completed load of logflume-kafka-storm-001-1 with log end offset 7 (kafka.log.Log)
  [2014-07-14 11:48:24,970] INFO Loading log'test001-1' (kafka.log.LogManager)
  [2014-07-14 11:48:24,973] INFO Completed load of logtest001-1 with log end offset 0 (kafka.log.Log)
  [2014-07-14 11:48:24,974] INFO Loading log'test003-1' (kafka.log.LogManager)
  [2014-07-14 11:48:24,976] INFO Completed load of logtest003-1 with log end offset 47 (kafka.log.Log)
  [2014-07-14 11:48:24,977] INFO Loading log'test004-0' (kafka.log.LogManager)
  [2014-07-14 11:48:24,980] INFO Completed load of logtest004-0 with log end offset 51 (kafka.log.Log)
  [2014-07-14 11:48:24,981] INFO Loading log'test004-1' (kafka.log.LogManager)
  [2014-07-14 11:48:24,984] INFO Completed load of logtest004-1 with log end offset 49 (kafka.log.Log)
  [2014-07-14 11:48:24,985] INFO Loading log'test002-0' (kafka.log.LogManager)
  [2014-07-14 11:48:24,987] INFO Completed load of logtest002-0 with log end offset 0 (kafka.log.Log)
  [2014-07-14 11:48:24,987] INFO Loading log'test001-0' (kafka.log.LogManager)
  [2014-07-14 11:48:24,991] INFO Completed load of logtest001-0 with log end offset 0 (kafka.log.Log)
  [2014-07-14 11:48:24,991] INFO Loading log'test002-1' (kafka.log.LogManager)
  [2014-07-14 11:48:24,993] INFO Completed load of logtest002-1 with log end offset 0 (kafka.log.Log)
  [2014-07-14 11:48:24,994] INFO Loading log'test003-0' (kafka.log.LogManager)
  [2014-07-14 11:48:24,997] INFO Completed load of logtest003-0 with log end offset 53 (kafka.log.Log)
  [2014-07-14 11:48:24,999] INFO Starting log cleanup witha period of 60000 ms. (kafka.log.LogManager)
  [2014-07-14 11:48:25,003] INFO Starting log flusherwith a default period of 9223372036854775807 ms. (kafka.log.LogManager)
  [2014-07-14 11:48:25,031] INFO Awaiting socketconnections on 0.0.0.0:9092. (kafka.network.Acceptor)
  [2014-07-14 11:48:25,032] INFO [Socket Server onBroker 0], Started (kafka.network.SocketServer)
  [2014-07-14 11:48:25,143] INFO Will not load MX4J,mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
  [2014-07-14 11:48:25,163] INFO 0 successfully electedas leader (kafka.server.ZookeeperLeaderElector)
  [2014-07-14 11:48:25,639] INFO New leader is 0(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
  [2014-07-14 11:48:25,645] INFO Registered broker 0 atpath /brokers/ids/0 with address rs229:9092. (kafka.utils.ZkUtils$)
  [2014-07-14 11:48:25,660] INFO [Kafka Server 0],started (kafka.server.KafkaServer)
  [2014-07-14 11:48:25,942] INFO [ReplicaFetcherManageron broker 0] Removed fetcher for partitions [test001,0],[test003,1],[test003,0],[flume-kafka-storm-001,1],[flume-kafka-storm-001,0],[test004,1],[test004,0],[test001,1],[test002,0],[test002,1](kafka.server.ReplicaFetcherManager)
  [2014-07-14 11:48:26,045] INFO [ReplicaFetcherManageron broker 0] Removed fetcher for partitions[test001,0],[test003,1],[test003,0],[flume-kafka-storm-001,1],[flume-kafka-storm-001,0],[test004,1],[test004,0],[test001,1],[test002,0],[test002,1](kafka.server.ReplicaFetcherManager)
# Flume的启动
  [iyunv@rs229 apache-flume-1.5.0-bin]# bin/flume-ng agent -n fks -c conf/ -fconf/ytconf/fks/fks001.conf -Dflume.root.logger=INFO,console &
  2014-07-14 11:50:13,882 (lifecycleSupervisor-1-0)[INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:77)]SpoolDirectorySource source starting with directory:/usr/local/yting/flume/tdata/tdir1
  2014-07-14 11:50:13,912 (lifecycleSupervisor-1-0)[INFO -org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)]Monitored counter group for type: SOURCE, name:source1:Successfully registered new MBean.
  2014-07-14 11:50:13,916 (lifecycleSupervisor-1-0)[INFO -org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)]Component type: SOURCE, name: source1 started
  2014-07-14 11:50:13,916 (pool-4-thread-1) [INFO -org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:254)]Spooling Directory Source runner has shutdown.
  2014-07-14 11:50:14,417 (pool-4-thread-1) [INFO -org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:254)]Spooling Directory Source runner has shutdown.
  这样Flume就算启动成功了,并且如果你的监控目录下出现新的日志文件的话,日志文件中的信息会传到Kafka中去,你懂的!
# 在Flume的监控目录下新建一个文件试试
  [iyunv@rs229 ytconf]# cd/usr/local/yting/flume/tdata/tdir1/
  [iyunv@rs229 tdir1]# ll
  total 0
  [iyunv@rs229 tdir1]# vi yousmile.log
  The you smile until forever .....................
  [iyunv@rs229 tdir1]# ll
  total 1
  -rw-r--r-- 1 root root   50 Jul 14 13:57 yousmile.log.COMPLETED(说明已经被Flume处理过了)
  [iyunv@rs229 tdir1]#
# Eclipse下链接服务器查看Flume的自定义Sink是否将数据传到Kafka中去了
# MySimpleConsumer.java(Eclipse下运行即可得到结果)
  package com.yting.cloud.kafa.consumer;
  
  import kafka.api.FetchRequest;
  import kafka.api.FetchRequestBuilder;
  import kafka.api.PartitionOffsetRequestInfo;
  import kafka.common.ErrorMapping;
  import kafka.common.TopicAndPartition;
  import kafka.javaapi.*;
  import kafka.javaapi.consumer.SimpleConsumer;
  import kafka.message.MessageAndOffset;
  
  import java.nio.ByteBuffer;
  import java.util.ArrayList;
  import java.util.Collections;
  import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
  
  /**
   * Kafka官网给的案例SimpleConsumer,饿在Eclipse本地连接服务器测试,所以修改了一些代码
   *
   * @Author 王扬庭
   * @Time2014-07-14
   *
   */
  public class MySimpleConsumer {
         publicstatic void main(String args[]) {
                MySimpleConsumerexample = new MySimpleConsumer();
                //long maxReads = Long.parseLong(args[0]);
                //String topic = args[1];
                //int partition = Integer.parseInt(args[2]);
                //seeds.add(args[3]);
                //int port = Integer.parseInt(args[4]);
                longmaxReads = 100;
  //            Stringtopic = &quot;yting_page_visits&quot;;
  //            Stringtopic = &quot;test003&quot;;
                Stringtopic = &quot;flume-kafka-storm-001&quot;;
  //            intpartition = 0;
                intpartition = 1; // The you smile until forever .....................日志文件中的这条信息被送到分区1中去了,默认2分区
                List<String>seeds = new ArrayList<String>();
  //            seeds.add(&quot;rs229&quot;);
                seeds.add(&quot;rs229&quot;);
                seeds.add(&quot;rs227&quot;);
                seeds.add(&quot;rs226&quot;);
                seeds.add(&quot;rs198&quot;);
                seeds.add(&quot;rs197&quot;);
                intport = Integer.parseInt(&quot;9092&quot;);
                try{
                       example.run(maxReads,topic, partition, seeds, port);
                }catch (Exception e) {
                       System.out.println(&quot;Oops:&quot;&#43; e);
                       e.printStackTrace();
                }
         }
  
         privateList<String> m_replicaBrokers = new ArrayList<String>();
  
         publicMySimpleConsumer() {
                m_replicaBrokers= new ArrayList<String>();
         }
  
         publicvoid run(long a_maxReads, String a_topic, int a_partition, List<String>a_seedBrokers, int a_port) throws Exception {
                //find the meta data about the topic and partition we are interested in
                //
                PartitionMetadatametadata = findLeader(a_seedBrokers, a_port, a_topic,
                              a_partition);
                if(metadata == null) {
                       System.out.println(&quot;Can'tfind metadata for Topic and Partition. Exiting&quot;);
                       return;
                }
                if(metadata.leader() == null) {
                       System.out.println(&quot;Can'tfind Leader for Topic and Partition. Exiting&quot;);
                       return;
                }
                StringleadBroker = metadata.leader().host();
                StringclientName = &quot;Client_&quot; &#43; a_topic &#43; &quot;_&quot; &#43; a_partition;
  
                SimpleConsumerconsumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024,clientName);
                longreadOffset = getLastOffset(consumer, a_topic, a_partition,kafka.api.OffsetRequest.EarliestTime(), clientName);
  
                intnumErrors = 0;
                while(a_maxReads > 0) {
                       if(consumer == null) {
                              consumer= new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
                       }
                       FetchRequestreq = new FetchRequestBuilder().clientId(clientName)
                                     .addFetch(a_topic,a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might needto be increased if large batches are written to Kafka
                                     .build();
                       FetchResponsefetchResponse = consumer.fetch(req);
  
                       if(fetchResponse.hasError()) {
                              numErrors&#43;&#43;;
                              //Something went wrong!
                              shortcode = fetchResponse.errorCode(a_topic, a_partition);
                              System.out.println(&quot;Errorfetching data from the Broker:&quot; &#43; leadBroker &#43; &quot; Reason: &quot; &#43;code);
                              if(numErrors > 5)
                                     break;
                              if(code == ErrorMapping.OffsetOutOfRangeCode()) {
                                     //We asked for an invalid offset. For simple case ask for
                                     //the last element to reset
                                     readOffset= getLastOffset(consumer, a_topic, a_partition,kafka.api.OffsetRequest.LatestTime(), clientName);
                                     continue;
                              }
                              consumer.close();
                              consumer= null;
                              leadBroker= findNewLeader(leadBroker, a_topic, a_partition, a_port);
                              continue;
                       }
                       numErrors= 0;
  
                       longnumRead = 0;
                       for(MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic,a_partition)) {
                              longcurrentOffset = messageAndOffset.offset();
                              if(currentOffset < readOffset) {
                                     System.out.println(&quot;Foundan old offset: &quot; &#43; currentOffset &#43; &quot; Expecting: &quot; &#43; readOffset);
                                     continue;
                              }
                              readOffset =messageAndOffset.nextOffset();
                              ByteBufferpayload = messageAndOffset.message().payload();
  
                              byte[]bytes = new byte[payload.limit()];
                              payload.get(bytes);
                              System.out.println(String.valueOf(messageAndOffset.offset())&#43; &quot;: &quot; &#43; new String(bytes, &quot;UTF-8&quot;));
                              numRead&#43;&#43;;
                              a_maxReads--;
                       }
  
                       if(numRead == 0) {
                              try{
                                     Thread.sleep(1000);
                              }catch (InterruptedException ie) {
                              }
                       }
                }
                if(consumer != null)
                       consumer.close();
         }
  
         publicstatic long getLastOffset(SimpleConsumer consumer, String topic,
                       intpartition, long whichTime, String clientName) {
                TopicAndPartitiontopicAndPartition = new TopicAndPartition(topic, partition);
                Map<TopicAndPartition,PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition,PartitionOffsetRequestInfo>();
                requestInfo.put(topicAndPartition,new PartitionOffsetRequestInfo(whichTime, 1));
                kafka.javaapi.OffsetRequestrequest = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
                OffsetResponseresponse = consumer.getOffsetsBefore(request);
  
                if(response.hasError()) {
                       System.out.println(&quot;Errorfetching data Offset Data the Broker. Reason: &quot;&#43; response.errorCode(topic,partition));
                       return0;
                }
                long[]offsets = response.offsets(topic, partition);
                returnoffsets[0];
         }
  
         privateString findNewLeader(String a_oldLeader, String a_topic,
                       inta_partition, int a_port) throws Exception {
                for(int i = 0; i < 3; i&#43;&#43;) {
                       booleangoToSleep = false;
                       PartitionMetadatametadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
                       if(metadata == null) {
                              goToSleep= true;
                       }else if (metadata.leader() == null) {
                              goToSleep= true;
                       }else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())
                                     &&i == 0) {
                              //first time through if the leader hasn't changed give
                              //ZooKeeper a second to recover
                              //second time, assume the broker did recover before failover,
                              //or it was a non-Broker issue
                              //
                              goToSleep= true;
                       }else {
                              returnmetadata.leader().host();
                       }
                       if(goToSleep) {
                              try{
                                     Thread.sleep(1000);
                              }catch (InterruptedException ie) {
                              }
                       }
                }
                System.out.println(&quot;Unableto find new leader after Broker failure. Exiting&quot;);
                thrownew Exception(&quot;Unable to find new leader after Broker failure.Exiting&quot;);
         }
  
         privatePartitionMetadata findLeader(List<String> a_seedBrokers, int a_port,String a_topic, int a_partition) {
                PartitionMetadatareturnMetaData = null;
                loop:for (String seed : a_seedBrokers) {
                       SimpleConsumerconsumer = null;
                       try{
                              consumer= new SimpleConsumer(seed, a_port, 100000, 64 * 1024,&quot;leaderLookup&quot;);
                              List<String>topics = Collections.singletonList(a_topic);
                              TopicMetadataRequestreq = new TopicMetadataRequest(topics);
                              kafka.javaapi.TopicMetadataResponseresp = consumer.send(req);
  
                              List<TopicMetadata>metaData = resp.topicsMetadata();
                              for(TopicMetadata item : metaData) {
                                     for (PartitionMetadata part: item.partitionsMetadata()) {
                                            if(part.partitionId() == a_partition) {
                                                   returnMetaData= part;
                                                   breakloop;
                                            }
                                     }
                              }
                       }catch (Exception e) {
                              System.out.println(&quot;Errorcommunicating with Broker [&quot; &#43; seed &#43; &quot;] to find Leader for [&quot; &#43;a_topic &#43; &quot;, &quot; &#43; a_partition &#43; &quot;] Reason: &quot; &#43; e);
                       }finally {
                              if(consumer != null)
                                     consumer.close();
                       }
                }
                if(returnMetaData != null) {
                       m_replicaBrokers.clear();
                       for(kafka.cluster.Broker replica : returnMetaData.replicas()) {
                              m_replicaBrokers.add(replica.host());
                       }
                }
                returnreturnMetaData;
         }
  }
   DSC0001.jpg

  这张图是Eclipse下运行结果 、、、
# Flume&#43;Kafka整合成功,下面开始Kafka&#43;Storm整合
# Kafka&#43;Storm的整合
# Kafka的启动(这个在Flume&#43;Kafka的整合中Kafka已经启动了,所以不需要再启动了)
# Storm的启动
# Storm nimbus的启动
  [iyunv@rs229 apache-storm-0.9.2-incubating]#bin/storm nimbus &
  [1] 26815
  [iyunv@rs229 apache-storm-0.9.2-incubating]#Running: /usr/local/adsit/yting/jdk/jdk1.7.0_60/bin/java -server-Dstorm.options= -Dstorm.home=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib-Dstorm.conf.file=-cp / /usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/lib/zkclient-0.3.jar:/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/conf-Xmx1024m -Dlogfile.name=nimbus.log-Dlogback.configurationFile=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/logback/cluster.xmlbacktype.storm.daemon.nimbus
# Storm ui的启动
  [iyunv@rs229 apache-storm-0.9.2-incubating]# bin/storm ui &
  [2] 26912
  [iyunv@rs229 apache-storm-0.9.2-incubating]# Running:/usr/local/adsit/yting/jdk/jdk1.7.0_60/bin/java -server -Dstorm.options=-Dstorm.home=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib-Dstorm.conf.file=-cp /、/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/conf-Xmx768m -Dlogfile.name=ui.log -Dlogback.configurationFile=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/logback/cluster.xmlbacktype.storm.ui.core
# Storm supervisor的启动
# rs226上启动Supervisor
  [iyunv@rs226 apache-storm-0.9.2-incubating]# bin/storm supervisor &
  [1] 15273
  [iyunv@rs226 apache-storm-0.9.2-incubating]# Running:/usr/local/adsit/yting/jdk/jdk1.7.0_60/bin/java -server -Dstorm.options=-Dstorm.home=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib-Dstorm.conf.file=-cp /…:/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/conf-Xmx256m -Dlogfile.name=supervisor.log -Dlogback.configurationFile=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/logback/cluster.xmlbacktype.storm.daemon.supervisor
  
# rs198上启动Supervisor
  [iyunv@rs198 apache-storm-0.9.2-incubating]# bin/storm supervisor &
  [1] 15274
  [iyunv@RS198 apache-storm-0.9.2-incubating]# Running:/usr/local/adsit/yting/jdk/jdk1.7.0_60/bin/java -server -Dstorm.options=-Dstorm.home=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib-Dstorm.conf.file=-cp /…:/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/conf-Xmx256m -Dlogfile.name=supervisor.log -Dlogback.configurationFile=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/logback/cluster.xmlbacktype.storm.daemon.supervisor
# rs197上启动Supervisor
  [iyunv@RS197 apache-storm-0.9.2-incubating]# bin/stormsupervisor &
  [1] 25262
  [iyunv@RS197 apache-storm-0.9.2-incubating]# Running:/root/jdk1.6.0_26/bin/java -server -Dstorm.options=-Dstorm.home=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib-Dstorm.conf.file=-cp /:/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/conf-Xmx256m -Dlogfile.name=supervisor.log -Dlogback.configurationFile=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/logback/cluster.xmlbacktype.storm.daemon.supervisor
# rs167上启动Supervisor
  [iyunv@RS196 apache-storm-0.9.2-incubating]# bin/stormsupervisor &
  [1] 17330
  [iyunv@RS196 apache-storm-0.9.2-incubating]# Running:/root/jdk1.6.0_26/bin/java -server -Dstorm.options=-Dstorm.home=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib-Dstorm.conf.file=-cp /…:/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/conf-Xmx256m -Dlogfile.name=supervisor.log-Dlogback.configurationFile=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/logback/cluster.xmlbacktype.storm.daemon.supervisor
# 需要打成jar包的3个Class,放到$STORM_HOME/lib目录下去
# MyHighLevelConsumer.java
  package com.yting.cloud.kafa.consumer;
  
  import kafka.consumer.ConsumerConfig;
  import kafka.consumer.KafkaStream;
  import kafka.javaapi.consumer.ConsumerConnector;
  
  import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
  import java.util.Properties;
  import java.util.concurrent.ExecutorService;
  import java.util.concurrent.Executors;
  
  import kafka.consumer.ConsumerIterator;
  //import kafka.consumer.KafkaStream;
  
  /**
   * KafkaSink(custom)
   *
   * @Author王扬庭(妳那伊抹微笑)
   * @Time2014-07-14
   * @Problem youshould run this consumer class before producer
   *
   */
  //@SuppressWarnings(&quot;all&quot;)
  public class MyHighLevelConsumer {
      privatefinal ConsumerConnector consumer;
      privatefinal String topic;
      privateExecutorService executor;
  
      publicMyHighLevelConsumer(String a_zookeeper, String a_groupId, String a_topic) {
         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));
         this.topic = a_topic;
      }
  
      public voidshutdown() {
          if(consumer != null) consumer.shutdown();
          if(executor != null) executor.shutdown();
      }
  
      public voidrun(int a_numThreads) {
         Map<String, Integer> topicCountMap = new HashMap<String,Integer>();
         topicCountMap.put(topic, new Integer(a_numThreads));
         Map<String, List<KafkaStream<byte[], byte[]>>>consumerMap = consumer.createMessageStreams(topicCountMap);
         List<KafkaStream<byte[], byte[]>> streams =consumerMap.get(topic);
  
          // nowlaunch all the threads
          //
         executor = Executors.newFixedThreadPool(a_numThreads);
  
          // nowcreate an object to consume the messages
          //
          intthreadNumber = 0;
          for(final KafkaStream stream : streams) {
             executor.submit(new ConsumerTest(stream, threadNumber));
             threadNumber&#43;&#43;;
          }
      }
  
      privatestatic ConsumerConfig createConsumerConfig(String a_zookeeper, Stringa_groupId) {
         Properties props = new Properties();
         props.put(&quot;zookeeper.connect&quot;, a_zookeeper);
         props.put(&quot;group.id&quot;, a_groupId);
          props.put(&quot;zookeeper.session.timeout.ms&quot;,&quot;4000&quot;);
         props.put(&quot;zookeeper.sync.time.ms&quot;, &quot;200&quot;);
         props.put(&quot;auto.commit.interval.ms&quot;, &quot;1000&quot;);
  //       props.put(&quot;auto.offset.reset&quot;, &quot;smallest&quot;);
  
          returnnew ConsumerConfig(props);
      }
     
  
      classConsumerTest implements Runnable {
          privateKafkaStream<byte[], byte[]> m_stream;
          privateint m_threadNumber;
       
          publicConsumerTest(KafkaStream<byte[], byte[]> a_stream, int a_threadNumber) {
             m_threadNumber = a_threadNumber;
             m_stream = a_stream;
          }
       
          publicvoid run() {
             ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
             while (it.hasNext())
                 System.out.println(&quot;Thread &quot; &#43; m_threadNumber &#43; &quot;: &quot;&#43; new String(it.next().message()));
             System.out.println(&quot;Shutting down Thread: &quot; &#43; m_threadNumber);
          }
      }
     
      publicstatic void main(String[] args) {
  //      StringzooKeeper = args[0];
  //      String groupId = args[1];
  //      Stringtopic = args[2];
  //      intthreads = Integer.parseInt(args[3]);
  
  //      StringzooKeeper = &quot;116.255.224.229:2182&quot;;
        StringzooKeeper =&quot;rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafka&quot;;
        String groupId = &quot;1&quot;;
        Stringtopic = &quot;flume-kafka-storm-001&quot;;
        intthreads = 1;
  
       MyHighLevelConsumer example = new MyHighLevelConsumer(zooKeeper,groupId, topic);
       example.run(threads);
  
  //      try {
  //         Thread.sleep(1000);
  //      } catch(InterruptedException ie) {
  //
  //      }
  //     example.shutdown();
    }
  }
# HighLevelKafkaSpout.java
  package com.yting.cloud.storm.spout;
  
  import java.util.Map;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  
  importcom.yting.cloud.kafa.consumer.MyHighLevelConsumer;
  import kafka.javaapi.consumer.ConsumerConnector;
  
  import backtype.storm.spout.SpoutOutputCollector;
  import backtype.storm.task.TopologyContext;
  import backtype.storm.topology.IRichSpout;
  import backtype.storm.topology.OutputFieldsDeclarer;
  import backtype.storm.tuple.Fields;
  /**
   * Storm spout
   *
   * @Author王扬庭(妳那伊抹微笑)
   * @Time2014-07-14
   *
   */
  public class HighLevelKafkaSpout implementsIRichSpout {
         privatestatic final Log log = LogFactory.getLog(HighLevelKafkaSpout.class);
         privateSpoutOutputCollector collector;
         privateConsumerConnector consumer;
         privateString topic;
         privateint a_numThreads = 1;
  
         publicHighLevelKafkaSpout() {
         }
        
         public HighLevelKafkaSpout(Stringtopic) {
                this.topic= topic;
         }
  
         @Override
         publicvoid nextTuple() {
  
         }
  
         @Override
         publicvoid open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                this.collector= collector;
         }
  
         @Override
         public voidack(Object msgId) {
                log.info(&quot;--------->ack&quot;);
         }
  
         @Override
         publicvoid activate() {
                log.info(&quot;--------->activatestart--------->&quot;);
                MyHighLevelConsumer.main(null);
                //这里的具体代码可以重构出来,还有collector.emit(newValues(&quot;need to emit&quot;));这样的代码也还没写的,先意思一下了
                log.info(&quot;--------->activateend--------->&quot;);
         }
  
         @Override
         publicvoid close() {
                log.info(&quot;--------->close&quot;);
         }
  
         @Override
         publicvoid deactivate() {
                log.info(&quot;--------->deactivate&quot;);
         }
  
         @Override
         publicvoid fail(Object msgId) {
                log.info(&quot;--------->fail&quot;);
         }
  
         @Override
         publicvoid declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(newFields(&quot;highLevelKafkaSpout&quot;));
         }
  
         @Override
         publicMap<String, Object> getComponentConfiguration() {
                log.info(&quot;--------->getComponentConfiguration&quot;);
                returnnull;
         }
  
  }
# KafkaTopology.java
  package com.yting.cloud.storm.topology;
  
  import java.util.HashMap;
  import java.util.Map;
  
  import com.yting.cloud.storm.spout.HighLevelKafkaSpout;
  
  import backtype.storm.Config;
  import backtype.storm.LocalCluster;
  import backtype.storm.topology.TopologyBuilder;
  import backtype.storm.utils.Utils;
  
  /**
   * Stormtopology
   *
   * @Author 王扬庭(妳那伊抹微笑)
   * @Time2014-07-14
   *
   */
  public class KafkaTopology {
         publicstatic void main(String[] args) {
                TopologyBuilderbuilder = new TopologyBuilder();
  
                builder.setSpout(&quot;1&quot;,new HighLevelKafkaSpout(&quot;&quot;), 1);
  
                Mapconf = new HashMap();
                conf.put(Config.TOPOLOGY_WORKERS,1);
                conf.put(Config.TOPOLOGY_DEBUG,true);
  
                LocalClustercluster = new LocalCluster();
                cluster.submitTopology(&quot;my-flume-kafka-storm-topology-integration&quot;,conf, builder.createTopology());
                
                Utils.sleep(1000*60*5);// local cluster test ...
                cluster.shutdown();
         }
  }
# Storm jar 命令运行storm程序
  [iyunv@rs229 yjar]# ll
  total 72
  -rw-r--r-- 1 root root 19826 Jul 14 10:27fks-storm-0013.jar
  -rw-r--r-- 1 root root 19833 Jul 14 10:31fks-storm-high-001.jar
  -rw-r--r-- 1 root root 15149 Jul  7 17:43 storm-wordcount-official-cluster.jar
  -rw-r--r-- 1 root root 15192 Jul  7 17:47 storm-wordcount-official-local.jar
  [iyunv@rs229 yjar]# pwd
  /usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/yjar
  [iyunv@rs229 yjar]# storm jarfks-storm-high-001.jar com.yting.cloud.storm.topology.KafkaTopology
  日志信息太多了,随便弄一点吧!
  14650 [main]INFO com.yting.cloud.storm.spout.HighLevelKafkaSpout - --------->close
  14650 [main]INFO  backtype.storm.daemon.executor -Shut down executor 1:[1 1]
  14650 [main]INFO  backtype.storm.daemon.worker - Shutdown executors
  14650 [main]INFO  backtype.storm.daemon.worker -Shutting down transfer thread
  14651[Thread-14-disruptor-worker-transfer-queue] INFO  backtype.storm.util - Async loop interrupted!
  14651 [main]INFO  backtype.storm.daemon.worker - Shutdown transfer thread
  14652 [main]INFO  backtype.storm.daemon.worker -Shutting down default resources
  14653 [main]INFO  backtype.storm.daemon.worker - Shutdown default resources
  14661 [main]INFO  backtype.storm.daemon.worker -Disconnecting from storm cluster state context
  14664 [main]INFO  backtype.storm.daemon.worker - Shutdown worker my-flume-kafka-storm-topology-integration-1-1405320692e0d44e3c-5b2a-4263-8dab-4aacf4215d2d 1024
  14667 [main]INFO  backtype.storm.daemon.supervisor -Shut downe0d44e3c-5b2a-4263-8dab-4aacf4215d2d:14d31572-9e60-4a16-9638-56c22530826d
  14667 [main]INFO  backtype.storm.daemon.supervisor -Shutting down supervisor e0d44e3c-5b2a-4263-8dab-4aacf4215d2d
  14668 [Thread-3]INFO  backtype.storm.event - Eventmanager interrupted
  14668 [Thread-4]INFO  backtype.storm.event - Eventmanager interrupted
  14671 [main]INFO  backtype.storm.daemon.supervisor - Shuttingdown supervisor cd464efd-fa69-4566-8cba-7e10d51dae6c
  14671 [Thread-5]INFO  backtype.storm.event - Eventmanager interrupted
  14672 [Thread-6]INFO  backtype.storm.event - Eventmanager interrupted
  14674 [main]INFO  backtype.storm.testing - Shuttingdown in process zookeeper
  14675 [main]INFO  backtype.storm.testing - Doneshutting down in process zookeeper
  14675 [main]INFO  backtype.storm.testing - Deletingtemporary path /tmp/c88e689e-c97e-4822-886d-ddcc1e7b9e9d
  14677 [main]INFO  backtype.storm.testing - Deletingtemporary path /tmp/88f3f052-cfaf-4d53-91ad-489965612e94
  14678 [main]INFO  backtype.storm.testing - Deletingtemporary path /tmp/17f2f7cf-7844-4077-8ad9-b3503fa21fb6
  14681 [main]INFO  backtype.storm.testing - Deletingtemporary path /tmp/548e5d3e-b05d-42a7-bd71-ac4ece8e93c4
#在Flume的监控目录下新建一个测试文件,看看Storm程序能否打印出文件内容的数据
# 新建fks-yousmile-20140714.log文件
  [iyunv@rs229tdir1]# vi fks-yousmile-20140714.log
  So i miss the change to see youagain .
  [iyunv@rs229tdir1]# ll
  -rw-r--r-- 1root root   40 Jul 14 14:59fks-yousmile-20140714.log.COMPLETED
# Storm控制台信息
  14675 [main]INFO  backtype.storm.testing - Deletingtemporary path /tmp/c88e689e-c97e-4822-886d-ddcc1e7b9e9d
  14677 [main]INFO  backtype.storm.testing - Deletingtemporary path /tmp/88f3f052-cfaf-4d53-91ad-489965612e94
  14678 [main]INFO  backtype.storm.testing - Deletingtemporary path /tmp/17f2f7cf-7844-4077-8ad9-b3503fa21fb6
  14681 [main]INFO  backtype.storm.testing - Deletingtemporary path /tmp/548e5d3e-b05d-42a7-bd71-ac4ece8e93c4
  Thread 0: So i miss the change tosee you again .Storm程序已经打印出来刚刚新建文件的内容信息了)
# Flume&#43;Kafka&#43;Storm整合完毕
# Storm那里写的太简单了,可以把Kafka的HighConsumer提取到Storm里面去,便于控制,比如下面的代码:
package com.yting.cloud.storm.spout;


import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;


import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;


import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;


import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.IRichSpout;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;


/**

* Storm spout

*

* @Author 王扬庭(妳那伊抹微笑)

* @Time 2014-07-14

*

*/

public classKafkaSpoutimplementsIRichSpout {

    private static final Loglog =LogFactory.getLog(KafkaSpout.class);

    private SpoutOutputCollector collector;

    private ConsumerConnector consumer;

    private String topic;

    private int a_numThreads = 1;


    public KafkaSpout(String topic) {

       this.topic = topic;

    }


    @Override

    public void nextTuple() {


    }


    @Override

    public void open(Map conf, TopologyContextcontext, SpoutOutputCollector collector) {

       this.collector = collector;

    }


    @Override

    public void ack(Object msgId) {

       log.info(&quot;--------->ack&quot;);

    }


    @Override

    public void activate() {

       log.info(&quot;--------->activate&quot;);

       this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());

       Map<String, Integer> topicCountMap =newHashMap<String, Integer>();

       topicCountMap.put(topic,new Integer(a_numThreads));

       Map<String, List<KafkaStream<byte[],byte[]>>>consumerMap = consumer.createMessageStreams(topicCountMap);

       KafkaStream<byte[],byte[]> streams =consumerMap.get(topic).get(0);

       ConsumerIterator<byte[],byte[]> it =streams.iterator();

       while (it.hasNext()) {

           String value =new String(it.next().message());

           log.info(&quot;(--------->Storm kafkaconsumer)------->&quot; &#43; value);

           collector.emit(newValues(value), value);

       }

    }


    private static ConsumerConfigcreateConsumerConfig() {

       Properties props =new Properties();

       props.put(&quot;zookeeper.connect&quot;, &quot;rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafka&quot;);

//     props.put(&quot;zookeeper.connect&quot;,&quot;rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181&quot;);

//     props.put(&quot;zookeeper.connect&quot;,&quot;rs229&quot;);

       props.put(&quot;group.id&quot;, &quot;2&quot;);

        props.put(&quot;zookeeper.session.timeout.ms&quot;,&quot;4000&quot;);

        props.put(&quot;zookeeper.sync.time.ms&quot;,&quot;200&quot;);

        props.put(&quot;auto.commit.interval.ms&quot;,&quot;1000&quot;);


       return new ConsumerConfig(props);

    }


    @Override

    public void close() {

       log.info(&quot;--------->close&quot;);

    }


    @Override

    public void deactivate() {

       log.info(&quot;--------->deactivate&quot;);

    }


    @Override

    public void fail(Object msgId) {

       log.info(&quot;--------->fail&quot;);

    }


    @Override

    public voiddeclareOutputFields(OutputFieldsDeclarer declarer) {

       declarer.declare(new Fields(&quot;kafkaSpout&quot;));

    }


    @Override

    public Map<String, Object>getComponentConfiguration() {

       log.info(&quot;--------->getComponentConfiguration&quot;);

       return null;

    }


}

  本人有点小懒,就不弄的更精细了 ,等到搞数据架构的时候再弄好点吧!
# 结束感言
  框架整合中用到的所有工程代码,jar包什么的都已经上传到群214293307共享中,需要的话自己下载研究了。
  本博文整合Flume&#43;Kafka&#43;Storm中的Eclipse工程代码下载地址http://download.csdn.net/detail/u012185296/7633405

  好好学习,天天向上,本来这个早该搞好了,由于种种原因延迟了许久,真心计划往往赶不上变化啊!
  Good good study , day day up high
  The you smile until forever .....................
  

# Time 2014-07-14

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-144287-1-1.html 上篇帖子: Flume与Kafka整合 下篇帖子: kafka和flume的对比
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表