设为首页 收藏本站
查看: 921|回复: 0

[经验分享] Kafka+Storm+HDFS整合实践

[复制链接]

尚未签到

发表于 2019-1-31 10:16:14 | 显示全部楼层 |阅读模式
  在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的需求Hive就不合适了。实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理。为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系统,分别进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接一个消息中间件,如Kafka,可以整合Flume+Kafka,Flume作为消息的Producer,生产的消息数据(日志数据、业务请求数据等等)发布到Kafka中,然后通过订阅的方式,使用Storm的Topology作为消息的Consumer,在Storm集群中分别进行如下两个需求场景的处理:
  直接使用Storm的Topology对数据进行实时分析处理
  整合Storm+HDFS,将消息处理后写入HDFS进行离线分析处理
  实时处理,只要开发满足业务需要的Topology即可,不做过多说明。这里,我们主要从安装配置Kafka、Storm,以及整合Kafka+Storm、整合Storm+HDFS、整合Kafka+Storm+HDFS这几点来配置实践,满足上面提出的一些需求。配置实践使用的软件包如下所示:
  zookeeper-3.4.5.tar.gz
  kafka_2.9.2-0.8.1.1.tgz
  apache-storm-0.9.2-incubating.tar.gz
  hadoop-2.2.0.tar.gz
  程序配置运行所基于的操作系统为CentOS 5.11。
  Kafka安装配置
  我们使用3台机器搭建Kafka集群:
  1
  192.168.4.142   h1
  2
  192.168.4.143   h2
  3
  192.168.4.144   h3
  在安装Kafka集群之前,这里没有使用Kafka自带的Zookeeper,而是独立安装了一个Zookeeper集群,也是使用这3台机器,保证Zookeeper集群正常运行。
  首先,在h1上准备Kafka安装文件,执行如下命令:
  1
  cd /usr/local/
  2
  wget http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
  3
  tar xvzf kafka_2.9.2-0.8.1.1.tgz
  4
  ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
  5
  chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
  修改配置文件/usr/local/kafka/config/server.properties,修改如下内容:
  1
  broker.id=0
  2
  zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka
  这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在zookeeper.connect配置项中指定:
  1
  zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka
  而且,需要手动在ZooKeeper中创建路径/kafka,使用如下命令连接到任意一台ZooKeeper服务器:
  1
  cd /usr/local/zookeeper
  2
  bin/zkCli.sh
  在ZooKeeper执行如下命令创建chroot路径:
  1
  create /kafka ''
  这样,每次连接Kafka集群的时候(使用--zookeeper选项),也必须使用带chroot路径的连接字符串,后面会看到。
  然后,将配置好的安装文件同步到其他的h2、h3节点上:
  1
  scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h2:/usr/local/
  2
  scp -r /usr/local/kafka_2.9.2-0.8.1.1/ h3:/usr/local/
  最后,在h2、h3节点上配置,执行如下命令:
  1
  cd /usr/local/
  2
  ln -s /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
  3
  chown -R kafka:kafka /usr/local/kafka_2.9.2-0.8.1.1 /usr/local/kafka
  并修改配置文件/usr/local/kafka/config/server.properties内容如下所示:
  1
  broker.id=1  # 在h1修改
  2
  3
  broker.id=2  # 在h2修改
  因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值(如果在单机上,可以通过建立多个Broker进程来模拟分布式的Kafka集群,也需要Broker的id唯一,还需要修改一些配置目录的信息)。
  在集群中的h1、h2、h3这三个节点上分别启动Kafka,分别执行如下命令:
  1
  bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
  可以通过查看日志,或者检查进程状态,保证Kafka集群启动成功。
  我们创建一个名称为my-replicated-topic5的Topic,5个分区,并且复制因子为3,执行如下命令:
  1
  bin/kafka-topics.sh --create --zookeeper h1:2181,h2:2181,h3:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5
  查看创建的Topic,执行如下命令:
  1
  bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5
  结果信息如下所示:
  1
  Topic:my-replicated-topic5     PartitionCount:5     ReplicationFactor:3     Configs:
  2
  Topic: my-replicated-topic5     Partition: 0     Leader: 0     Replicas: 0,2,1     Isr: 0,2,1
  3
  Topic: my-replicated-topic5     Partition: 1     Leader: 0     Replicas: 1,0,2     Isr: 0,2,1
  4
  Topic: my-replicated-topic5     Partition: 2     Leader: 2     Replicas: 2,1,0     Isr: 2,0,1
  5
  Topic: my-replicated-topic5     Partition: 3     Leader: 0     Replicas: 0,1,2     Isr: 0,2,1
  6
  Topic: my-replicated-topic5     Partition: 4     Leader: 2     Replicas: 1,2,0     Isr: 2,0,1
  上面Leader、Replicas、Isr的含义如下:
  1
  Partition: 分区
  2
  Leader   : 负责读写指定分区的节点
  3
  Replicas : 复制该分区log的节点列表
  4
  Isr      : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader
  我们可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
  在一个终端,启动Producer,并向我们上面创建的名称为my-replicated-topic5的Topic中生产消息,执行如下脚本:
  1
  bin/kafka-console-producer.sh --broker-list h1:9092,h2:9092,h3:9092 --topic my-replicated-topic5
  在另一个终端,启动Consumer,并订阅我们上面创建的名称为my-replicated-topic5的Topic中生产的消息,执行如下脚本:
  1
  bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5
  可以在Producer终端上输入字符串消息行,然后回车,就可以在Consumer终端上看到消费者消费的消息内容。
  也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。
  Storm安装配置
  Storm集群也依赖Zookeeper集群,要保证Zookeeper集群正常运行。Storm的安装配置比较简单,我们仍然使用下面3台机器搭建:
  1
  192.168.4.142   h1
  2
  192.168.4.143   h2
  3
  192.168.4.144   h3
  首先,在h1节点上,执行如下命令安装:
  1
  cd /usr/local/
  2
  wget http://mirror.bit.edu.cn/apache/incubator/storm/apache-storm-0.9.2-incubating/apache-storm-0.9.2-incubating.tar.gz
  3
  tar xvzf apache-storm-0.9.2-incubating.tar.gz
  4
  ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
  5
  chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
  然后,修改配置文件conf/storm.yaml,内容如下所示:
  01
  storm.zookeeper.servers:
  02
  - "h1"
  03
  - "h2"
  04
  - "h3"
  05
  storm.zookeeper.port: 2181
  06
  #
  07
  nimbus.host: "h1"
  08
  09
  supervisor.slots.ports:
  10
  - 6700
  11
  - 6701
  12
  - 6702
  13
  - 6703
  14
  15
  storm.local.dir: "/tmp/storm"
  将配置好的安装文件,分发到其他节点上:
  1
  scp -r /usr/local/apache-storm-0.9.2-incubating/ h2:/usr/local/
  2
  scp -r /usr/local/apache-storm-0.9.2-incubating/ h3:/usr/local/
  最后,在h2、h3节点上配置,执行如下命令:
  1
  cd /usr/local/
  2
  ln -s /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
  3
  chown -R storm:storm /usr/local/apache-storm-0.9.2-incubating /usr/local/storm
  Storm集群的主节点为Nimbus,从节点为Supervisor,我们需要在h1上启动Nimbus服务,在从节点h2、h3上启动Supervisor服务:
  1
  bin/storm nimbus &
  2
  bin/storm supervisor &
  为了方便监控,可以启动Storm UI,可以从Web页面上监控Storm Topology的运行状态,例如在h2上启动:
  1
  bin/storm ui &
  这样可以通过访问http://h2:8080/来查看Topology的运行状况。
  整合Kafka+Storm
  消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,然后再由实时计算程序Storm做实时分析,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理。实际上,apache-storm-0.9.2-incubating这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依赖配置,如下所示:
  01
  
  02
  org.apache.storm
  03
  storm-core
  04
  0.9.2-incubating
  05
  provided
  06
  
  07
  
  08
  org.apache.storm
  09
  storm-kafka
  10
  0.9.2-incubating
  11
  
  12
  
  13
  org.apache.kafka
  14
  kafka_2.9.2
  15
  0.8.1.1
  16
  
  17
  
  18
  org.apache.zookeeper
  19
  zookeeper
  20
  
  21
  
  22
  log4j
  23
  log4j
  24
  
  25
  
  26
  
  下面,我们开发了一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,实现的Topology的代码,如下所示:
  001
  package org.shirdrn.storm.examples;
  002
  003
  import java.util.Arrays;
  004
  import java.util.HashMap;
  005
  import java.util.Iterator;
  006
  import java.util.Map;
  007
  import java.util.Map.Entry;
  008
  import java.util.concurrent.atomic.AtomicInteger;
  009
  010
  import org.apache.commons.logging.Log;
  011
  import org.apache.commons.logging.LogFactory;
  012
  013
  import storm.kafka.BrokerHosts;
  014
  import storm.kafka.KafkaSpout;
  015
  import storm.kafka.SpoutConfig;
  016
  import storm.kafka.StringScheme;
  017
  import storm.kafka.ZkHosts;
  018
  import backtype.storm.Config;
  019
  import backtype.storm.LocalCluster;
  020
  import backtype.storm.StormSubmitter;
  021
  import backtype.storm.generated.AlreadyAliveException;
  022
  import backtype.storm.generated.InvalidTopologyException;
  023
  import backtype.storm.spout.SchemeAsMultiScheme;
  024
  import backtype.storm.task.OutputCollector;
  025
  import backtype.storm.task.TopologyContext;
  026
  import backtype.storm.topology.OutputFieldsDeclarer;
  027
  import backtype.storm.topology.TopologyBuilder;
  028
  import backtype.storm.topology.base.BaseRichBolt;
  029
  import backtype.storm.tuple.Fields;
  030
  import backtype.storm.tuple.Tuple;
  031
  import backtype.storm.tuple.Values;
  032
  033

  public>  034
  035

  public static>  036
  037
  private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);
  038
  private static final long serialVersionUID = 886149197481637894L;
  039
  private OutputCollector collector;
  040
  041
  @Override
  042
  public void prepare(Map stormConf, TopologyContext context,
  043
  OutputCollector collector) {
  044
  this.collector = collector;
  045
  }
  046
  047
  @Override
  048
  public void execute(Tuple input) {
  049
  String line = input.getString(0);
  050
  LOG.info("RECV[kafka -> splitter] " + line);
  051
  String[] words = line.split("\\s+");
  052
  for(String word : words) {
  053
  LOG.info("EMIT[splitter -> counter] " + word);
  054
  collector.emit(input, new Values(word, 1));
  055
  }
  056
  collector.ack(input);
  057
  }
  058
  059
  @Override
  060
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
  061
  declarer.declare(new Fields("word", "count"));
  062
  }
  063
  064
  }
  065
  066

  public static>  067
  068
  private static final Log LOG = LogFactory.getLog(WordCounter.class);
  069
  private static final long serialVersionUID = 886149197481637894L;
  070
  private OutputCollector collector;
  071
  private Map counterMap;
  072
  073
  @Override
  074
  public void prepare(Map stormConf, TopologyContext context,
  075
  OutputCollector collector) {
  076
  this.collector = collector;
  077
  this.counterMap = new HashMap();
  078
  }
  079
  080
  @Override
  081
  public void execute(Tuple input) {
  082
  String word = input.getString(0);
  083
  int count = input.getInteger(1);
  084
  LOG.info("RECV[splitter -> counter] " + word + " : " + count);
  085
  AtomicInteger ai = this.counterMap.get(word);
  086
  if(ai == null) {
  087
  ai = new AtomicInteger();
  088
  this.counterMap.put(word, ai);
  089
  }
  090
  ai.addAndGet(count);
  091
  collector.ack(input);
  092
  LOG.info("CHECK statistics map: " + this.counterMap);
  093
  }
  094
  095
  @Override
  096
  public void cleanup() {
  097
  LOG.info("The final result:");
  098
  Iterator iter = this.counterMap.entrySet().iterator();
  099
  while(iter.hasNext()) {
  100
  Entry entry = iter.next();
  101
  LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get());
  102
  }
  103
  104
  }
  105
  106
  @Override
  107
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
  108
  declarer.declare(new Fields("word", "count"));
  109
  }
  110
  }
  111
  112
  public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
  113
  String zks = "h1:2181,h2:2181,h3:2181";
  114
  String topic = "my-replicated-topic5";
  115
  String zkRoot = "/storm"; // default zookeeper root configuration for storm
  116

  String>  117
  118
  BrokerHosts brokerHosts = new ZkHosts(zks);
  119

  SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot,>  120
  spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
  121
  spoutConf.forceFromStart = false;
  122
  spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"});
  123
  spoutConf.zkPort = 2181;
  124
  125
  TopologyBuilder builder = new TopologyBuilder();
  126
  builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5
  127
  builder.setBolt("word-splitter", new KafkaWordSplitter(), 2).shuffleGrouping("kafka-reader");
  128
  builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-splitter", new Fields("word"));
  129
  130
  Config conf = new Config();
  131
  132
  String name = MyKafkaTopology.class.getSimpleName();
  133
  if (args != null && args.length > 0) {
  134
  // Nimbus host name passed from command line
  135
  conf.put(Config.NIMBUS_HOST, args[0]);
  136
  conf.setNumWorkers(3);
  137
  StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
  138
  } else {
  139
  conf.setMaxTaskParallelism(3);
  140
  LocalCluster cluster = new LocalCluster();
  141
  cluster.submitTopology(name, conf, builder.createTopology());
  142
  Thread.sleep(60000);
  143
  cluster.shutdown();
  144
  }
  145
  }
  146
  }
  上面程序,在本地调试(使用LocalCluster)不需要输入任何参数,提交到实际集群中运行时,需要传递一个参数,该参数为Nimbus的主机名称。
  通过Maven构建,生成一个包含依赖的single jar文件(不要把Storm的依赖包添加进去),例如storm-examples-0.0.1-SNAPSHOT.jar,在提交Topology程序到Storm集群之前,因为用到了Kafka,需要拷贝一下依赖jar文件到Storm集群中的lib目录下面:
  1
  cp /usr/local/kafka/libs/kafka_2.9.2-0.8.1.1.jar /usr/local/storm/lib/
  2
  cp /usr/local/kafka/libs/scala-library-2.9.2.jar /usr/local/storm/lib/
  3
  cp /usr/local/kafka/libs/metrics-core-2.2.0.jar /usr/local/storm/lib/
  4
  cp /usr/local/kafka/libs/snappy-java-1.0.5.jar /usr/local/storm/lib/
  5
  cp /usr/local/kafka/libs/zkclient-0.3.jar /usr/local/storm/lib/
  6
  cp /usr/local/kafka/libs/log4j-1.2.15.jar /usr/local/storm/lib/
  7
  cp /usr/local/kafka/libs/slf4j-api-1.7.2.jar /usr/local/storm/lib/
  8
  cp /usr/local/kafka/libs/jopt-simple-3.2.jar /usr/local/storm/lib/
  然后,就可以提交我们开发的Topology程序了:
  1
  bin/storm jar /home/storm/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1
  可以通过查看日志文件(logs/目录下)或者Storm UI来监控Topology的运行状况。如果程序没有错误,可以使用前面我们使用的Kafka Producer来生成消息,就能看到我们开发的Storm Topology能够实时接收到并进行处理。
  上面Topology实现代码中,有一个很关键的配置对象SpoutConfig,配置属性如下所示:
  1
  spoutConf.forceFromStart = false;
  该配置是指,如果该Topology因故障停止处理,下次正常运行时是否从Spout对应数据源Kafka中的该订阅Topic的起始位置开始读取,如果forceFromStart=true,则之前处理过的Tuple还要重新处理一遍,否则会从上次处理的位置继续处理,保证Kafka中的Topic数据不被重复处理,是在数据源的位置进行状态记录。
  整合Storm+HDFS
  Storm实时计算集群从Kafka消息中间件中消费消息,有实时处理需求的可以走实时处理程序,还有需要进行离线分析的需求,如写入到HDFS进行分析。下面实现了一个Topology,代码如下所示:
  001
  package org.shirdrn.storm.examples;
  002
  003
  import java.text.DateFormat;
  004
  import java.text.SimpleDateFormat;
  005
  import java.util.Date;
  006
  import java.util.Map;
  007
  import java.util.Random;
  008
  009
  import org.apache.commons.logging.Log;
  010
  import org.apache.commons.logging.LogFactory;
  011
  import org.apache.storm.hdfs.bolt.HdfsBolt;
  012
  import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
  013
  import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
  014
  import org.apache.storm.hdfs.bolt.format.FileNameFormat;
  015
  import org.apache.storm.hdfs.bolt.format.RecordFormat;
  016
  import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
  017
  import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
  018
  import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
  019
  import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
  020
  import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
  021
  022
  import backtype.storm.Config;
  023
  import backtype.storm.LocalCluster;
  024
  import backtype.storm.StormSubmitter;
  025
  import backtype.storm.generated.AlreadyAliveException;
  026
  import backtype.storm.generated.InvalidTopologyException;
  027
  import backtype.storm.spout.SpoutOutputCollector;
  028
  import backtype.storm.task.TopologyContext;
  029
  import backtype.storm.topology.OutputFieldsDeclarer;
  030
  import backtype.storm.topology.TopologyBuilder;
  031
  import backtype.storm.topology.base.BaseRichSpout;
  032
  import backtype.storm.tuple.Fields;
  033
  import backtype.storm.tuple.Values;
  034
  import backtype.storm.utils.Utils;
  035
  036

  public>  037
  038

  public static>  039
  040
  private static final Log LOG = LogFactory.getLog(EventSpout.class);
  041
  private static final long serialVersionUID = 886149197481637894L;
  042
  private SpoutOutputCollector collector;
  043
  private Random rand;
  044
  private String[] records;
  045
  046
  @Override
  047
  public void open(Map conf, TopologyContext context,
  048
  SpoutOutputCollector collector) {
  049
  this.collector = collector;
  050
  rand = new Random();
  051
  records = new String[] {
  052
  "10001     ef2da82d4c8b49c44199655dc14f39f6     4.2.1     HUAWEI G610-U00     HUAWEI     2     70:72:3c:73:8b:22     2014-10-13 12:36:35",
  053
  "10001     ffb52739a29348a67952e47c12da54ef     4.3     GT-I9300     samsung     2     50:CC:F8:E4:22:E2     2014-10-13 12:36:02",
  054
  "10001     ef2da82d4c8b49c44199655dc14f39f6     4.2.1     HUAWEI G610-U00     HUAWEI     2     70:72:3c:73:8b:22     2014-10-13 12:36:35"
  055
  };
  056
  }
  057
  058
  059
  @Override
  060
  public void nextTuple() {
  061
  Utils.sleep(1000);
  062
  DateFormat df = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
  063
  Date d = new Date(System.currentTimeMillis());
  064
  String minute = df.format(d);
  065
  String record = records[rand.nextInt(records.length)];
  066
  LOG.info("EMIT[spout -> hdfs] " + minute + " : " + record);
  067
  collector.emit(new Values(minute, record));
  068
  }
  069
  070
  @Override
  071
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
  072
  declarer.declare(new Fields("minute", "record"));
  073
  }
  074
  075
  076
  }
  077
  078
  public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
  079
  // use "|" instead of "," for field delimiter
  080
  RecordFormat format = new DelimitedRecordFormat()
  081
  .withFieldDelimiter(" : ");
  082
  083
  // sync the filesystem after every 1k tuples
  084
  SyncPolicy syncPolicy = new CountSyncPolicy(1000);
  085
  086
  // rotate files
  087
  FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES);
  088
  089
  FileNameFormat fileNameFormat = new DefaultFileNameFormat()
  090
  .withPath("/storm/").withPrefix("app_").withExtension(".log");
  091
  092
  HdfsBolt hdfsBolt = new HdfsBolt()
  093
  .withFsUrl("hdfs://h1:8020")
  094
  .withFileNameFormat(fileNameFormat)
  095
  .withRecordFormat(format)
  096
  .withRotationPolicy(rotationPolicy)
  097
  .withSyncPolicy(syncPolicy);
  098
  099
  TopologyBuilder builder = new TopologyBuilder();
  100
  builder.setSpout("event-spout", new EventSpout(), 3);
  101
  builder.setBolt("hdfs-bolt", hdfsBolt, 2).fieldsGrouping("event-spout", new Fields("minute"));
  102
  103
  Config conf = new Config();
  104
  105
  String name = StormToHDFSTopology.class.getSimpleName();
  106
  if (args != null && args.length > 0) {
  107
  conf.put(Config.NIMBUS_HOST, args[0]);
  108
  conf.setNumWorkers(3);
  109
  StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
  110
  } else {
  111
  conf.setMaxTaskParallelism(3);
  112
  LocalCluster cluster = new LocalCluster();
  113
  cluster.submitTopology(name, conf, builder.createTopology());
  114
  Thread.sleep(60000);
  115
  cluster.shutdown();
  116
  }
  117
  }
  118
  119
  }
  上面的处理逻辑,可以对HdfsBolt进行更加详细的配置,如FileNameFormat、SyncPolicy、FileRotationPolicy(可以设置在满足什么条件下,切出一个新的日志,如可以指定多长时间切出一个新的日志文件,可以指定一个日志文件大小达到设置值后,再写一个新日志文件),更多设置可以参考storm-hdfs,。
  上面代码在打包的时候,需要注意,使用storm-starter自带的Maven打包配置,可能在将Topology部署运行的时候,会报错,可以使用maven-shade-plugin这个插件,如下配置所示:
  01
  
  02
  org.apache.maven.plugins
  03
  maven-shade-plugin
  04
  1.4
  05
  
  06
  true
  07
  
  08
  
  09
  
  10
  package
  11
  
  12
  shade
  13
  
  14
  
  15
  
  16
  
  18
  
  20
  
  21
  
  22
  
  23
  
  24
  
  25
  
  26
  
  整合Kafka+Storm+HDFS
  上面分别对整合Kafka+Storm和Storm+HDFS做了实践,可以将后者的Spout改成前者的Spout,从Kafka中消费消息,在Storm中可以做简单处理,然后将数据写入HDFS,最后可以在Hadoop平台上对数据进行离线分析处理。下面,写了一个简单的例子,从Kafka消费消息,然后经由Storm处理,写入到HDFS存储,代码如下所示:
  001
  package org.shirdrn.storm.examples;
  002
  003
  import java.util.Arrays;
  004
  import java.util.Map;
  005
  006
  import org.apache.commons.logging.Log;
  007
  import org.apache.commons.logging.LogFactory;
  008
  import org.apache.storm.hdfs.bolt.HdfsBolt;
  009
  import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
  010
  import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
  011
  import org.apache.storm.hdfs.bolt.format.FileNameFormat;
  012
  import org.apache.storm.hdfs.bolt.format.RecordFormat;
  013
  import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
  014
  import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
  015
  import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
  016
  import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
  017
  import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
  018
  019
  import storm.kafka.BrokerHosts;
  020
  import storm.kafka.KafkaSpout;
  021
  import storm.kafka.SpoutConfig;
  022
  import storm.kafka.StringScheme;
  023
  import storm.kafka.ZkHosts;
  024
  import backtype.storm.Config;
  025
  import backtype.storm.LocalCluster;
  026
  import backtype.storm.StormSubmitter;
  027
  import backtype.storm.generated.AlreadyAliveException;
  028
  import backtype.storm.generated.InvalidTopologyException;
  029
  import backtype.storm.spout.SchemeAsMultiScheme;
  030
  import backtype.storm.task.OutputCollector;
  031
  import backtype.storm.task.TopologyContext;
  032
  import backtype.storm.topology.OutputFieldsDeclarer;
  033
  import backtype.storm.topology.TopologyBuilder;
  034
  import backtype.storm.topology.base.BaseRichBolt;
  035
  import backtype.storm.tuple.Fields;
  036
  import backtype.storm.tuple.Tuple;
  037
  import backtype.storm.tuple.Values;
  038
  039

  public>  040
  041

  public static>  042
  043
  private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
  044
  private static final long serialVersionUID = -5207232012035109026L;
  045
  private OutputCollector collector;
  046
  047
  @Override
  048
  public void prepare(Map stormConf, TopologyContext context,
  049
  OutputCollector collector) {
  050
  this.collector = collector;
  051
  }
  052
  053
  @Override
  054
  public void execute(Tuple input) {
  055
  String line = input.getString(0).trim();
  056
  LOG.info("RECV[kafka -> splitter] " + line);
  057
  if(!line.isEmpty()) {
  058
  String upperLine = line.toUpperCase();
  059
  LOG.info("EMIT[splitter -> counter] " + upperLine);
  060
  collector.emit(input, new Values(upperLine, upperLine.length()));
  061
  }
  062
  collector.ack(input);
  063
  }
  064
  065
  @Override
  066
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
  067
  declarer.declare(new Fields("line", "len"));
  068
  }
  069
  070
  }
  071
  072

  public static>  073
  074
  private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);
  075
  private static final long serialVersionUID = -4115132557403913367L;
  076
  private OutputCollector collector;
  077
  078
  @Override
  079
  public void prepare(Map stormConf, TopologyContext context,
  080
  OutputCollector collector) {
  081
  this.collector = collector;
  082
  }
  083
  084
  @Override
  085
  public void execute(Tuple input) {
  086
  String line = input.getString(0).trim();
  087
  LOG.info("REALTIME: " + line);
  088
  collector.ack(input);
  089
  }
  090
  091
  @Override
  092
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
  093
  094
  }
  095
  096
  }
  097
  098
  public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
  099
  100
  // Configure Kafka
  101
  String zks = "h1:2181,h2:2181,h3:2181";
  102
  String topic = "my-replicated-topic5";
  103
  String zkRoot = "/storm"; // default zookeeper root configuration for storm
  104

  String>  105
  BrokerHosts brokerHosts = new ZkHosts(zks);
  106

  SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot,>  107
  spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
  108
  spoutConf.forceFromStart = false;
  109
  spoutConf.zkServers = Arrays.asList(new String[] {"h1", "h2", "h3"});
  110
  spoutConf.zkPort = 2181;
  111
  112
  // Configure HDFS bolt
  113
  RecordFormat format = new DelimitedRecordFormat()
  114
  .withFieldDelimiter("\t"); // use "\t" instead of "," for field delimiter
  115
  SyncPolicy syncPolicy = new CountSyncPolicy(1000); // sync the filesystem after every 1k tuples
  116
  FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES); // rotate files
  117
  FileNameFormat fileNameFormat = new DefaultFileNameFormat()
  118
  .withPath("/storm/").withPrefix("app_").withExtension(".log"); // set file name format
  119
  HdfsBolt hdfsBolt = new HdfsBolt()
  120
  .withFsUrl("hdfs://h1:8020")
  121
  .withFileNameFormat(fileNameFormat)
  122
  .withRecordFormat(format)
  123
  .withRotationPolicy(rotationPolicy)
  124
  .withSyncPolicy(syncPolicy);
  125
  126
  // configure & build topology
  127
  TopologyBuilder builder = new TopologyBuilder();
  128
  builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);
  129
  builder.setBolt("to-upper", new KafkaWordToUpperCase(), 3).shuffleGrouping("kafka-reader");
  130
  builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping("to-upper");
  131
  builder.setBolt("realtime", new RealtimeBolt(), 2).shuffleGrouping("to-upper");
  132
  133
  // submit topology
  134
  Config conf = new Config();
  135
  String name = DistributeWordTopology.class.getSimpleName();
  136
  if (args != null && args.length > 0) {
  137
  String nimbus = args[0];
  138
  conf.put(Config.NIMBUS_HOST, nimbus);
  139
  conf.setNumWorkers(3);
  140
  StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
  141
  } else {
  142
  conf.setMaxTaskParallelism(3);
  143
  LocalCluster cluster = new LocalCluster();
  144
  cluster.submitTopology(name, conf, builder.createTopology());
  145
  Thread.sleep(60000);
  146
  cluster.shutdown();
  147
  }
  148
  }
  149
  150
  }
  上面代码中,名称为to-upper的Bolt将接收到的字符串行转换成大写以后,会将处理过的数据向后面的hdfs-bolt、realtime这两个Bolt各发一份拷贝,然后由这两个Bolt分别根据实际需要(实时/离线)单独处理。
  打包后,在Storm集群上部署并运行这个Topology:
  1
  bin/storm jar ~/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.DistributeWordTopology h1
  可以通过Storm UI查看Topology运行情况,可以查看HDFS上生成的数据。
  参考链接
  http://kafka.apache.org/
  http://kafka.apache.org/documentation.html
  https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
  http://storm.apache.org/
  http://storm.apache.org/documentation/Tutorial.html
  http://storm.apache.org/documentation/FAQ.html
  https://github.com/ptgoetz/storm-hdfs
  转载:http://shiyanjun.cn/archives/934.html


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669934-1-1.html 上篇帖子: kafka 一些基本知识 下篇帖子: Idea下Kafka源码阅读编译环境搭建
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表