_00022 Flume-1.5.0+Kafka_2.9.2-0.8.1.1+Storm-0.9.2 分布式环境整合
博文作者:妳那伊抹微笑itdog8 地址链接 : http://www.itdog8.com(个人链接)
博客地址:http://blog.csdn.net/u012185296
博文标题:_00022 Flume-1.5.0+Kafka_2.9.2-0.8.1.1+Storm-0.9.2 分布式环境整合
个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在
技术方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 云计算技术
转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作!
qq交流群:214293307(期待与你一起学习,共同进步)
# Flume-1.5.0+Kafka_2.9.2-0.8.1.1+Storm-0.9.2 分布式环境整合
# 学习前言
框架整合中用到的所有工程代码,jar包什么的都已经上传到群214293307共享中,需要的话自己下载研究了。
本博文整合Flume+Kafka+Storm中的Eclipse工程代码下载地址http://download.csdn.net/detail/u012185296/7633405
# Flume的学习请参考_00016 Flume的体系结构介绍以及Flume入门案例(往HDFS上传数据)这篇博文
# Kafka的学习请参考_00017 Kafka的体系结构介绍以及Kafka入门案例(初级案例+Java API的使用)这篇博文
# Storm的学习请参考_00019 Storm的体系结构介绍以及Storm入门案例(官网上的简单Java案例)这篇博文
请学习了以上直来再来进行环境整合,你懂的,不解释 ...(纳尼?路都不会走你就想跑了,饿特么就是一巴掌 、、、)# 整合场景
使用Flume监控指定目录,出现新的日志文件后将文件数据传到Kafka,最后由Storm从Kafka中取出数据并显示、、、
# Flume+Kafka的整合
# Flume的fks001.conf的配置文件
监控指定目录/usr/local/yting/flume/tdata/tdir1,然后使用自定义的Sink(com.yting.cloud.flume.sink.KafkaSink),将数据传入Kafka
root@rs229 fks]# pwd
/usr/local/adsit/yting/apache/flume/apache-flume-1.5.0-bin/conf/ytconf/fks
# vi fks001.conf
# fks : yting yousmile flume kafka storm integration
fks.sources=source1
fks.sinks=sink1
fks.channels=channel1
# configure source1
fks.sources.source1.type=spooldir
fks.sources.source1.spoolDir=/usr/local/yting/flume/tdata/tdir1
fks.sources.source1.fileHeader = false
# configure sink1
fks.sinks.sink1.type=com.yting.cloud.flume.sink.KafkaSink #(自定义Sink,Flume监控数据传入Kafka)
# configure channel1
fks.channels.channel1.type=file
fks.channels.channel1.checkpointDir=/usr/local/yting/flume/checkpointdir/tcpdir/example_fks_001
fks.channels.channel1.dataDirs=/usr/local/yting/flume/datadirs/tddirs/example_fks_001
# bind source and sink
fks.sources.source1.channels=channel1
fks.sinks.sink1.channel=channel1
# Kafka 的server.properties配置文件
# pwd
/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/config/ytconf
# vi server.properties
# A comma seperated list of directories under whichto store log files
# log.dirs=/tmp/kafka-logs
log.dirs=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/kafka-logs
# root directory for all kafka znodes.
zookeeper.connect=rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafka
# jar包的复制,Kafka中的jar包复制到Flume中去,因为自定义的Sink(com.yting.cloud.flume.sink.KafkaSink)会用到,如果不做这一步,会抱异常的!
# pwd
/usr/local/adsit/yting/apache/flume/apache-flume-1.5.0-bin/lib
# cp/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/libs/* .
# 使用Eclipse将自定义的Sink(com.yting.cloud.flume.sink.KafkaSink)打成jar包放入$FLUME_HOME/libs目录下去
纳尼?这里不会,那你还是跟饿学养猪吧 、、、
# Kafka的启动
# pwd
/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1
# bin/kafka-server-start.sh config/ytconf/server.properties&
24672
# INFO Verifying properties (kafka.utils.VerifiableProperties)
INFO Property broker.id isoverridden to 0 (kafka.utils.VerifiableProperties)
INFO Propertylog.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
INFO Property log.dirs isoverridden to/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/kafka-logs(kafka.utils.VerifiableProperties)
INFO Propertylog.retention.check.interval.ms is overridden to 60000(kafka.utils.VerifiableProperties)
INFO Propertylog.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)
INFO Propertylog.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)
INFO Propertynum.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)
INFO Propertynum.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)
INFO Property num.partitionsis overridden to 2 (kafka.utils.VerifiableProperties)
INFO Property port isoverridden to 9092 (kafka.utils.VerifiableProperties)
INFO Propertysocket.receive.buffer.bytes is overridden to 1048576(kafka.utils.VerifiableProperties)
INFO Propertysocket.request.max.bytes is overridden to 104857600(kafka.utils.VerifiableProperties)
INFO Propertysocket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
INFO Propertyzookeeper.connect is overridden tors229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafka(kafka.utils.VerifiableProperties)
INFO Property zookeeper.connection.timeout.msis overridden to 1000000 (kafka.utils.VerifiableProperties)
INFO ,starting (kafka.server.KafkaServer)
INFO ,Connecting to zookeeper on rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafka(kafka.server.KafkaServer)
INFO Starting ZkClientevent thread. (org.I0Itec.zkclient.ZkEventThread)
INFO Clientenvironment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT(org.apache.zookeeper.ZooKeeper)
INFO Clientenvironment:host.name=rs229 (org.apache.zookeeper.ZooKeeper)
INFO Clientenvironment:java.version=1.7.0_60 (org.apache.zookeeper.ZooKeeper)
INFO Clientenvironment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
INFO Clientenvironment:java.home=/usr/local/adsit/yting/jdk/jdk1.7.0_60/jre(org.apache.zookeeper.ZooKeeper)
INFO Clientenvironment:java.class.path=:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../core/build/dependant-libs-2.8.0/*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../clients/build/libs//kafka-clients*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../examples/build/libs//kafka-examples*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/jopt-simple-3.2.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-javadoc.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-scaladoc.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/kafka_2.9.2-0.8.1.1-sources.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/log4j-1.2.15.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/metrics-core-2.2.0.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/scala-library-2.9.2.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/slf4j-api-1.7.2.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/snappy-java-1.0.5.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/zkclient-0.3.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../libs/zookeeper-3.3.4.jar:/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/bin/../core/build/libs/kafka_2.8.0*.jar(org.apache.zookeeper.ZooKeeper)
INFO Clientenvironment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib(org.apache.zookeeper.ZooKeeper)
INFO Clientenvironment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
INFO Clientenvironment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
INFO Clientenvironment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
INFO Clientenvironment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
INFO Clientenvironment:os.version=2.6.32-279.el6.x86_64 (org.apache.zookeeper.ZooKeeper)
INFO Clientenvironment:user.name=root (org.apache.zookeeper.ZooKeeper)
INFO Client environment:user.home=/root(org.apache.zookeeper.ZooKeeper)
INFO Clientenvironment:user.dir=/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1(org.apache.zookeeper.ZooKeeper)
INFO Initiating client connection,connectString=rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafkasessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@7c8b7ac9(org.apache.zookeeper.ZooKeeper)
INFO Opening socketconnection to server rs198/116.255.234.198:2181(org.apache.zookeeper.ClientCnxn)
INFO Socket connectionestablished to rs198/116.255.234.198:2181, initiating session(org.apache.zookeeper.ClientCnxn)
INFO Session establishmentcomplete on server rs198/116.255.234.198:2181, sessionid = 0xc6472c07f50b0000,negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
INFO zookeeper statechanged (SyncConnected) (org.I0Itec.zkclient.ZkClient)
INFO Found clean shutdownfile. Skipping recovery for all logs in data directory'/usr/local/adsit/yting/apache/kafka/kafka_2.9.2-0.8.1.1/kafka-logs'(kafka.log.LogManager)
INFO Loading log'flume-kafka-storm-001-0' (kafka.log.LogManager)
INFO Completed load of logflume-kafka-storm-001-0 with log end offset 18 (kafka.log.Log)
SLF4J: Failed to load class"org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) loggerimplementation
SLF4J: Seehttp://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
INFO Loading log'flume-kafka-storm-001-1' (kafka.log.LogManager)
INFO Completed load of logflume-kafka-storm-001-1 with log end offset 7 (kafka.log.Log)
INFO Loading log'test001-1' (kafka.log.LogManager)
INFO Completed load of logtest001-1 with log end offset 0 (kafka.log.Log)
INFO Loading log'test003-1' (kafka.log.LogManager)
INFO Completed load of logtest003-1 with log end offset 47 (kafka.log.Log)
INFO Loading log'test004-0' (kafka.log.LogManager)
INFO Completed load of logtest004-0 with log end offset 51 (kafka.log.Log)
INFO Loading log'test004-1' (kafka.log.LogManager)
INFO Completed load of logtest004-1 with log end offset 49 (kafka.log.Log)
INFO Loading log'test002-0' (kafka.log.LogManager)
INFO Completed load of logtest002-0 with log end offset 0 (kafka.log.Log)
INFO Loading log'test001-0' (kafka.log.LogManager)
INFO Completed load of logtest001-0 with log end offset 0 (kafka.log.Log)
INFO Loading log'test002-1' (kafka.log.LogManager)
INFO Completed load of logtest002-1 with log end offset 0 (kafka.log.Log)
INFO Loading log'test003-0' (kafka.log.LogManager)
INFO Completed load of logtest003-0 with log end offset 53 (kafka.log.Log)
INFO Starting log cleanup witha period of 60000 ms. (kafka.log.LogManager)
INFO Starting log flusherwith a default period of 9223372036854775807 ms. (kafka.log.LogManager)
INFO Awaiting socketconnections on 0.0.0.0:9092. (kafka.network.Acceptor)
INFO , Started (kafka.network.SocketServer)
INFO Will not load MX4J,mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
INFO 0 successfully electedas leader (kafka.server.ZookeeperLeaderElector)
INFO New leader is 0(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
INFO Registered broker 0 atpath /brokers/ids/0 with address rs229:9092. (kafka.utils.ZkUtils$)
INFO ,started (kafka.server.KafkaServer)
INFO Removed fetcher for partitions ,,,,,,,,,(kafka.server.ReplicaFetcherManager)
INFO Removed fetcher for partitions,,,,,,,,,(kafka.server.ReplicaFetcherManager)
# Flume的启动
# bin/flume-ng agent -n fks -c conf/ -fconf/ytconf/fks/fks001.conf -Dflume.root.logger=INFO,console &
2014-07-14 11:50:13,882 (lifecycleSupervisor-1-0)SpoolDirectorySource source starting with directory:/usr/local/yting/flume/tdata/tdir1
2014-07-14 11:50:13,912 (lifecycleSupervisor-1-0)Monitored counter group for type: SOURCE, name:source1:Successfully registered new MBean.
2014-07-14 11:50:13,916 (lifecycleSupervisor-1-0)Component type: SOURCE, name: source1 started
2014-07-14 11:50:13,916 (pool-4-thread-1) Spooling Directory Source runner has shutdown.
2014-07-14 11:50:14,417 (pool-4-thread-1) Spooling Directory Source runner has shutdown.
这样Flume就算启动成功了,并且如果你的监控目录下出现新的日志文件的话,日志文件中的信息会传到Kafka中去,你懂的!
# 在Flume的监控目录下新建一个文件试试
# cd/usr/local/yting/flume/tdata/tdir1/
# ll
total 0
# vi yousmile.log
The you smile until forever .....................
# ll
total 1
-rw-r--r-- 1 root root 50 Jul 14 13:57 yousmile.log.COMPLETED(说明已经被Flume处理过了)
#
# Eclipse下链接服务器查看Flume的自定义Sink是否将数据传到Kafka中去了
# MySimpleConsumer.java(Eclipse下运行即可得到结果)
package com.yting.cloud.kafa.consumer;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Kafka官网给的案例SimpleConsumer,饿在Eclipse本地连接服务器测试,所以修改了一些代码
*
* @Author 王扬庭
* @Time2014-07-14
*
*/
public class MySimpleConsumer {
publicstatic void main(String args[]) {
MySimpleConsumerexample = new MySimpleConsumer();
//long maxReads = Long.parseLong(args);
//String topic = args;
//int partition = Integer.parseInt(args);
//seeds.add(args);
//int port = Integer.parseInt(args);
longmaxReads = 100;
// Stringtopic = "yting_page_visits";
// Stringtopic = "test003";
Stringtopic = "flume-kafka-storm-001";
// intpartition = 0;
intpartition = 1; // The you smile until forever .....................日志文件中的这条信息被送到分区1中去了,默认2分区
List<String>seeds = new ArrayList<String>();
// seeds.add("rs229");
seeds.add("rs229");
seeds.add("rs227");
seeds.add("rs226");
seeds.add("rs198");
seeds.add("rs197");
intport = Integer.parseInt("9092");
try{
example.run(maxReads,topic, partition, seeds, port);
}catch (Exception e) {
System.out.println("Oops:"+ e);
e.printStackTrace();
}
}
privateList<String> m_replicaBrokers = new ArrayList<String>();
publicMySimpleConsumer() {
m_replicaBrokers= new ArrayList<String>();
}
publicvoid run(long a_maxReads, String a_topic, int a_partition, List<String>a_seedBrokers, int a_port) throws Exception {
//find the meta data about the topic and partition we are interested in
//
PartitionMetadatametadata = findLeader(a_seedBrokers, a_port, a_topic,
a_partition);
if(metadata == null) {
System.out.println("Can'tfind metadata for Topic and Partition. Exiting");
return;
}
if(metadata.leader() == null) {
System.out.println("Can'tfind Leader for Topic and Partition. Exiting");
return;
}
StringleadBroker = metadata.leader().host();
StringclientName = "Client_" + a_topic + "_" + a_partition;
SimpleConsumerconsumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024,clientName);
longreadOffset = getLastOffset(consumer, a_topic, a_partition,kafka.api.OffsetRequest.EarliestTime(), clientName);
intnumErrors = 0;
while(a_maxReads > 0) {
if(consumer == null) {
consumer= new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
}
FetchRequestreq = new FetchRequestBuilder().clientId(clientName)
.addFetch(a_topic,a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might needto be increased if large batches are written to Kafka
.build();
FetchResponsefetchResponse = consumer.fetch(req);
if(fetchResponse.hasError()) {
numErrors++;
//Something went wrong!
shortcode = fetchResponse.errorCode(a_topic, a_partition);
System.out.println("Errorfetching data from the Broker:" + leadBroker + " Reason: " +code);
if(numErrors > 5)
break;
if(code == ErrorMapping.OffsetOutOfRangeCode()) {
//We asked for an invalid offset. For simple case ask for
//the last element to reset
readOffset= getLastOffset(consumer, a_topic, a_partition,kafka.api.OffsetRequest.LatestTime(), clientName);
continue;
}
consumer.close();
consumer= null;
leadBroker= findNewLeader(leadBroker, a_topic, a_partition, a_port);
continue;
}
numErrors= 0;
longnumRead = 0;
for(MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic,a_partition)) {
longcurrentOffset = messageAndOffset.offset();
if(currentOffset < readOffset) {
System.out.println("Foundan old offset: " + currentOffset + " Expecting: " + readOffset);
continue;
}
readOffset =messageAndOffset.nextOffset();
ByteBufferpayload = messageAndOffset.message().payload();
byte[]bytes = new byte;
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset())+ ": " + new String(bytes, "UTF-8"));
numRead++;
a_maxReads--;
}
if(numRead == 0) {
try{
Thread.sleep(1000);
}catch (InterruptedException ie) {
}
}
}
if(consumer != null)
consumer.close();
}
publicstatic long getLastOffset(SimpleConsumer consumer, String topic,
intpartition, long whichTime, String clientName) {
TopicAndPartitiontopicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition,PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition,PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition,new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequestrequest = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
OffsetResponseresponse = consumer.getOffsetsBefore(request);
if(response.hasError()) {
System.out.println("Errorfetching data Offset Data the Broker. Reason: "+ response.errorCode(topic,partition));
return0;
}
long[]offsets = response.offsets(topic, partition);
returnoffsets;
}
privateString findNewLeader(String a_oldLeader, String a_topic,
inta_partition, int a_port) throws Exception {
for(int i = 0; i < 3; i++) {
booleangoToSleep = false;
PartitionMetadatametadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
if(metadata == null) {
goToSleep= true;
}else if (metadata.leader() == null) {
goToSleep= true;
}else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())
&&i == 0) {
//first time through if the leader hasn't changed give
//ZooKeeper a second to recover
//second time, assume the broker did recover before failover,
//or it was a non-Broker issue
//
goToSleep= true;
}else {
returnmetadata.leader().host();
}
if(goToSleep) {
try{
Thread.sleep(1000);
}catch (InterruptedException ie) {
}
}
}
System.out.println("Unableto find new leader after Broker failure. Exiting");
thrownew Exception("Unable to find new leader after Broker failure.Exiting");
}
privatePartitionMetadata findLeader(List<String> a_seedBrokers, int a_port,String a_topic, int a_partition) {
PartitionMetadatareturnMetaData = null;
loop:for (String seed : a_seedBrokers) {
SimpleConsumerconsumer = null;
try{
consumer= new SimpleConsumer(seed, a_port, 100000, 64 * 1024,"leaderLookup");
List<String>topics = Collections.singletonList(a_topic);
TopicMetadataRequestreq = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponseresp = consumer.send(req);
List<TopicMetadata>metaData = resp.topicsMetadata();
for(TopicMetadata item : metaData) {
for (PartitionMetadata part: item.partitionsMetadata()) {
if(part.partitionId() == a_partition) {
returnMetaData= part;
breakloop;
}
}
}
}catch (Exception e) {
System.out.println("Errorcommunicating with Broker [" + seed + "] to find Leader for [" +a_topic + ", " + a_partition + "] Reason: " + e);
}finally {
if(consumer != null)
consumer.close();
}
}
if(returnMetaData != null) {
m_replicaBrokers.clear();
for(kafka.cluster.Broker replica : returnMetaData.replicas()) {
m_replicaBrokers.add(replica.host());
}
}
returnreturnMetaData;
}
}
这张图是Eclipse下运行结果 、、、
# Flume+Kafka整合成功,下面开始Kafka+Storm整合
# Kafka+Storm的整合
# Kafka的启动(这个在Flume+Kafka的整合中Kafka已经启动了,所以不需要再启动了)
# Storm的启动
# Storm nimbus的启动
#bin/storm nimbus &
26815
#Running: /usr/local/adsit/yting/jdk/jdk1.7.0_60/bin/java -server-Dstorm.options= -Dstorm.home=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib-Dstorm.conf.file=-cp / /usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/lib/zkclient-0.3.jar:/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/conf-Xmx1024m -Dlogfile.name=nimbus.log-Dlogback.configurationFile=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/logback/cluster.xmlbacktype.storm.daemon.nimbus
# Storm ui的启动
# bin/storm ui &
26912
# Running:/usr/local/adsit/yting/jdk/jdk1.7.0_60/bin/java -server -Dstorm.options=-Dstorm.home=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib-Dstorm.conf.file=-cp /、/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/conf-Xmx768m -Dlogfile.name=ui.log -Dlogback.configurationFile=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/logback/cluster.xmlbacktype.storm.ui.core
# Storm supervisor的启动
# rs226上启动Supervisor
# bin/storm supervisor &
15273
# Running:/usr/local/adsit/yting/jdk/jdk1.7.0_60/bin/java -server -Dstorm.options=-Dstorm.home=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib-Dstorm.conf.file=-cp /…:/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/conf-Xmx256m -Dlogfile.name=supervisor.log -Dlogback.configurationFile=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/logback/cluster.xmlbacktype.storm.daemon.supervisor
# rs198上启动Supervisor
# bin/storm supervisor &
15274
# Running:/usr/local/adsit/yting/jdk/jdk1.7.0_60/bin/java -server -Dstorm.options=-Dstorm.home=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib-Dstorm.conf.file=-cp /…:/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/conf-Xmx256m -Dlogfile.name=supervisor.log -Dlogback.configurationFile=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/logback/cluster.xmlbacktype.storm.daemon.supervisor
# rs197上启动Supervisor
# bin/stormsupervisor &
25262
# Running:/root/jdk1.6.0_26/bin/java -server -Dstorm.options=-Dstorm.home=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib-Dstorm.conf.file=-cp /…:/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/conf-Xmx256m -Dlogfile.name=supervisor.log -Dlogback.configurationFile=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/logback/cluster.xmlbacktype.storm.daemon.supervisor
# rs167上启动Supervisor
# bin/stormsupervisor &
17330
# Running:/root/jdk1.6.0_26/bin/java -server -Dstorm.options=-Dstorm.home=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib-Dstorm.conf.file=-cp /…:/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/conf-Xmx256m -Dlogfile.name=supervisor.log-Dlogback.configurationFile=/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/logback/cluster.xmlbacktype.storm.daemon.supervisor
# 需要打成jar包的3个Class,放到$STORM_HOME/lib目录下去
# MyHighLevelConsumer.java
package com.yting.cloud.kafa.consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.ConsumerIterator;
//import kafka.consumer.KafkaStream;
/**
* KafkaSink(custom)
*
* @Author王扬庭(妳那伊抹微笑)
* @Time2014-07-14
* @Problem youshould run this consumer class before producer
*
*/
//@SuppressWarnings("all")
public class MyHighLevelConsumer {
privatefinal ConsumerConnector consumer;
privatefinal String topic;
privateExecutorService executor;
publicMyHighLevelConsumer(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));
this.topic = a_topic;
}
public voidshutdown() {
if(consumer != null) consumer.shutdown();
if(executor != null) executor.shutdown();
}
public voidrun(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String,Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>>consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams =consumerMap.get(topic);
// nowlaunch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// nowcreate an object to consume the messages
//
intthreadNumber = 0;
for(final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
privatestatic ConsumerConfig createConsumerConfig(String a_zookeeper, Stringa_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms","4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
// props.put("auto.offset.reset", "smallest");
returnnew ConsumerConfig(props);
}
classConsumerTest implements Runnable {
privateKafkaStream<byte[], byte[]> m_stream;
privateint m_threadNumber;
publicConsumerTest(KafkaStream<byte[], byte[]> a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
publicvoid run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext())
System.out.println("Thread " + m_threadNumber + ": "+ new String(it.next().message()));
System.out.println("Shutting down Thread: " + m_threadNumber);
}
}
publicstatic void main(String[] args) {
// StringzooKeeper = args;
// String groupId = args;
// Stringtopic = args;
// intthreads = Integer.parseInt(args);
// StringzooKeeper = "116.255.224.229:2182";
StringzooKeeper ="rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafka";
String groupId = "1";
Stringtopic = "flume-kafka-storm-001";
intthreads = 1;
MyHighLevelConsumer example = new MyHighLevelConsumer(zooKeeper,groupId, topic);
example.run(threads);
// try {
// Thread.sleep(1000);
// } catch(InterruptedException ie) {
//
// }
// example.shutdown();
}
}
# HighLevelKafkaSpout.java
package com.yting.cloud.storm.spout;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
importcom.yting.cloud.kafa.consumer.MyHighLevelConsumer;
import kafka.javaapi.consumer.ConsumerConnector;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
/**
* Storm spout
*
* @Author王扬庭(妳那伊抹微笑)
* @Time2014-07-14
*
*/
public class HighLevelKafkaSpout implementsIRichSpout {
privatestatic final Log log = LogFactory.getLog(HighLevelKafkaSpout.class);
privateSpoutOutputCollector collector;
privateConsumerConnector consumer;
privateString topic;
privateint a_numThreads = 1;
publicHighLevelKafkaSpout() {
}
public HighLevelKafkaSpout(Stringtopic) {
this.topic= topic;
}
@Override
publicvoid nextTuple() {
}
@Override
publicvoid open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector= collector;
}
@Override
public voidack(Object msgId) {
log.info("--------->ack");
}
@Override
publicvoid activate() {
log.info("--------->activatestart--------->");
MyHighLevelConsumer.main(null);
//这里的具体代码可以重构出来,还有collector.emit(newValues("need to emit"));这样的代码也还没写的,先意思一下了
log.info("--------->activateend--------->");
}
@Override
publicvoid close() {
log.info("--------->close");
}
@Override
publicvoid deactivate() {
log.info("--------->deactivate");
}
@Override
publicvoid fail(Object msgId) {
log.info("--------->fail");
}
@Override
publicvoid declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(newFields("highLevelKafkaSpout"));
}
@Override
publicMap<String, Object> getComponentConfiguration() {
log.info("--------->getComponentConfiguration");
returnnull;
}
}
# KafkaTopology.java
package com.yting.cloud.storm.topology;
import java.util.HashMap;
import java.util.Map;
import com.yting.cloud.storm.spout.HighLevelKafkaSpout;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
/**
* Stormtopology
*
* @Author 王扬庭(妳那伊抹微笑)
* @Time2014-07-14
*
*/
public class KafkaTopology {
publicstatic void main(String[] args) {
TopologyBuilderbuilder = new TopologyBuilder();
builder.setSpout("1",new HighLevelKafkaSpout(""), 1);
Mapconf = new HashMap();
conf.put(Config.TOPOLOGY_WORKERS,1);
conf.put(Config.TOPOLOGY_DEBUG,true);
LocalClustercluster = new LocalCluster();
cluster.submitTopology("my-flume-kafka-storm-topology-integration",conf, builder.createTopology());
Utils.sleep(1000*60*5);// local cluster test ...
cluster.shutdown();
}
}
# Storm jar 命令运行storm程序
# ll
total 72
-rw-r--r-- 1 root root 19826 Jul 14 10:27fks-storm-0013.jar
-rw-r--r-- 1 root root 19833 Jul 14 10:31fks-storm-high-001.jar
-rw-r--r-- 1 root root 15149 Jul7 17:43 storm-wordcount-official-cluster.jar
-rw-r--r-- 1 root root 15192 Jul7 17:47 storm-wordcount-official-local.jar
# pwd
/usr/local/adsit/yting/apache/storm/apache-storm-0.9.2-incubating/yjar
# storm jarfks-storm-high-001.jar com.yting.cloud.storm.topology.KafkaTopology
日志信息太多了,随便弄一点吧!
14650 INFO com.yting.cloud.storm.spout.HighLevelKafkaSpout - --------->close
14650 INFObacktype.storm.daemon.executor -Shut down executor 1:
14650 INFObacktype.storm.daemon.worker - Shutdown executors
14650 INFObacktype.storm.daemon.worker -Shutting down transfer thread
14651 INFObacktype.storm.util - Async loop interrupted!
14651 INFObacktype.storm.daemon.worker - Shutdown transfer thread
14652 INFObacktype.storm.daemon.worker -Shutting down default resources
14653 INFObacktype.storm.daemon.worker - Shutdown default resources
14661 INFObacktype.storm.daemon.worker -Disconnecting from storm cluster state context
14664 INFObacktype.storm.daemon.worker - Shutdown worker my-flume-kafka-storm-topology-integration-1-1405320692e0d44e3c-5b2a-4263-8dab-4aacf4215d2d 1024
14667 INFObacktype.storm.daemon.supervisor -Shut downe0d44e3c-5b2a-4263-8dab-4aacf4215d2d:14d31572-9e60-4a16-9638-56c22530826d
14667 INFObacktype.storm.daemon.supervisor -Shutting down supervisor e0d44e3c-5b2a-4263-8dab-4aacf4215d2d
14668 INFObacktype.storm.event - Eventmanager interrupted
14668 INFObacktype.storm.event - Eventmanager interrupted
14671 INFObacktype.storm.daemon.supervisor - Shuttingdown supervisor cd464efd-fa69-4566-8cba-7e10d51dae6c
14671 INFObacktype.storm.event - Eventmanager interrupted
14672 INFObacktype.storm.event - Eventmanager interrupted
14674 INFObacktype.storm.testing - Shuttingdown in process zookeeper
14675 INFObacktype.storm.testing - Doneshutting down in process zookeeper
14675 INFObacktype.storm.testing - Deletingtemporary path /tmp/c88e689e-c97e-4822-886d-ddcc1e7b9e9d
14677 INFObacktype.storm.testing - Deletingtemporary path /tmp/88f3f052-cfaf-4d53-91ad-489965612e94
14678 INFObacktype.storm.testing - Deletingtemporary path /tmp/17f2f7cf-7844-4077-8ad9-b3503fa21fb6
14681 INFObacktype.storm.testing - Deletingtemporary path /tmp/548e5d3e-b05d-42a7-bd71-ac4ece8e93c4
#在Flume的监控目录下新建一个测试文件,看看Storm程序能否打印出文件内容的数据
# 新建fks-yousmile-20140714.log文件
# vi fks-yousmile-20140714.log
So i miss the change to see youagain .
# ll
-rw-r--r-- 1root root 40 Jul 14 14:59fks-yousmile-20140714.log.COMPLETED
# Storm控制台信息
14675 INFObacktype.storm.testing - Deletingtemporary path /tmp/c88e689e-c97e-4822-886d-ddcc1e7b9e9d
14677 INFObacktype.storm.testing - Deletingtemporary path /tmp/88f3f052-cfaf-4d53-91ad-489965612e94
14678 INFObacktype.storm.testing - Deletingtemporary path /tmp/17f2f7cf-7844-4077-8ad9-b3503fa21fb6
14681 INFObacktype.storm.testing - Deletingtemporary path /tmp/548e5d3e-b05d-42a7-bd71-ac4ece8e93c4
Thread 0: So i miss the change tosee you again .(Storm程序已经打印出来刚刚新建文件的内容信息了)
# Flume+Kafka+Storm整合完毕
# Storm那里写的太简单了,可以把Kafka的HighConsumer提取到Storm里面去,便于控制,比如下面的代码:
package com.yting.cloud.storm.spout;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
/**
* Storm spout
*
* @Author 王扬庭(妳那伊抹微笑)
* @Time 2014-07-14
*
*/
public classKafkaSpoutimplementsIRichSpout {
private static final Loglog =LogFactory.getLog(KafkaSpout.class);
private SpoutOutputCollector collector;
private ConsumerConnector consumer;
private String topic;
private int a_numThreads = 1;
public KafkaSpout(String topic) {
this.topic = topic;
}
@Override
public void nextTuple() {
}
@Override
public void open(Map conf, TopologyContextcontext, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void ack(Object msgId) {
log.info("--------->ack");
}
@Override
public void activate() {
log.info("--------->activate");
this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
Map<String, Integer> topicCountMap =newHashMap<String, Integer>();
topicCountMap.put(topic,new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[],byte[]>>>consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[],byte[]> streams =consumerMap.get(topic).get(0);
ConsumerIterator<byte[],byte[]> it =streams.iterator();
while (it.hasNext()) {
String value =new String(it.next().message());
log.info("(--------->Storm kafkaconsumer)------->" + value);
collector.emit(newValues(value), value);
}
}
private static ConsumerConfigcreateConsumerConfig() {
Properties props =new Properties();
props.put("zookeeper.connect", "rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181/kafka");
// props.put("zookeeper.connect","rs229:2181,rs227:2181,rs226:2181,rs198:2181,rs197:2181");
// props.put("zookeeper.connect","rs229");
props.put("group.id", "2");
props.put("zookeeper.session.timeout.ms","4000");
props.put("zookeeper.sync.time.ms","200");
props.put("auto.commit.interval.ms","1000");
return new ConsumerConfig(props);
}
@Override
public void close() {
log.info("--------->close");
}
@Override
public void deactivate() {
log.info("--------->deactivate");
}
@Override
public void fail(Object msgId) {
log.info("--------->fail");
}
@Override
public voiddeclareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("kafkaSpout"));
}
@Override
public Map<String, Object>getComponentConfiguration() {
log.info("--------->getComponentConfiguration");
return null;
}
}
本人有点小懒,就不弄的更精细了 ,等到搞数据架构的时候再弄好点吧!
# 结束感言
框架整合中用到的所有工程代码,jar包什么的都已经上传到群214293307共享中,需要的话自己下载研究了。
本博文整合Flume+Kafka+Storm中的Eclipse工程代码下载地址http://download.csdn.net/detail/u012185296/7633405
好好学习,天天向上,本来这个早该搞好了,由于种种原因延迟了许久,真心计划往往赶不上变化啊!
Good good study , day day up high
The you smile until forever .....................
# Time 2014-07-14
页:
[1]