kafka学习四: flume+kafka+storm
转自:http://blog.csdn.net/desilting/article/details/23194039配置flume:
http://blog.csdn.net/desilting/article/details/22811593
conf/flume-conf.properties文件:
viewplaincopyprint?http://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg
[*]producer.sources = s
[*]producer.channels = c
[*]producer.sinks = r
[*]
[*]producer.sources.s.channels = c
[*]producer.sources.s.type= netcat
[*]producer.sources.s.bind= 192.168.40.134
[*]producer.sources.s.port= 44444
[*]
[*]producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
[*]producer.sinks.r.metadata.broker.list=192.168.40.134:9092
[*]producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
[*]producer.sinks.r.request.required.acks=1
[*]producer.sinks.r.max.message.size=1000000
[*]producer.sinks.r.custom.topic.name=mykafka
[*]producer.sinks.r.channel = c
[*]
[*]producer.channels.c.type = memory
[*]producer.channels.c.capacity = 1000
配置kafka:
http://blog.csdn.net/desilting/article/details/22872839
启动zookeeper、kafka及storm
创建topic:
bin/kafka-topics.sh --create --zookeeper 192.168.40.132:2181 --replication-factor 3 --partitions 1 --topicmykafka
查看topic:
bin/kafka-topics.sh --describe --zookeeper 192.168.40.132:2181
Topic:mykafka PartitionCount:1ReplicationFactor:3Configs:
Topic: mykafka Partition: 0Leader: 134Replicas: 133,134,132Isr: 134,133,132
partition同一个topic下可以设置多个partition,将topic下的message存储到不同的partition下,目的是为了提高并行性leader负责此partition的读写操作,每个broker都有可能成为某partition的leaderreplicas副本,即此partition在哪几个broker上有备份,不管broker是否存活isr存活的replicas 启动flume:
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console
KafkaSink类:
viewplaincopyprint?http://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg
[*]import org.slf4j.Logger;
[*]import org.slf4j.LoggerFactory;
[*]
[*]import java.util.Map;
[*]import java.util.Properties;
[*]import kafka.javaapi.producer.Producer;
[*]import kafka.producer.KeyedMessage;
[*]import kafka.producer.ProducerConfig;
[*]
[*]import org.apache.flume.Context;
[*]import org.apache.flume.Channel;
[*]import org.apache.flume.Event;
[*]import org.apache.flume.Transaction;
[*]import org.apache.flume.conf.Configurable;
[*]import org.apache.flume.sink.AbstractSink;
[*]import com.google.common.base.Preconditions;
[*]import com.google.common.collect.ImmutableMap;
[*]
[*]public class KafkaSink extends AbstractSink implements Configurable {
[*]
[*] private Context context;
[*] private Properties parameters;
[*] private Producer<String, String> producer;
[*]
[*] private static final String PARTITION_KEY_NAME = "custom.partition.key";
[*] private static final String CUSTOME_TOPIC_KEY_NAME = "custom.topic.name";
[*] private static final String DEFAULT_ENCODING = "UTF-8";
[*] private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSink.class);
[*]
[*] public void configure(Context context) {
[*] this.context = context;
[*] ImmutableMap<String, String> props = context.getParameters();
[*] this.parameters = new Properties();
[*] for (Map.Entry<String,String> entry : props.entrySet()) {
[*] this.parameters.put(entry.getKey(), entry.getValue());
[*] }
[*] }
[*]
[*] @Override
[*] public synchronized void start() {
[*] super.start();
[*] ProducerConfig config = new ProducerConfig(this.parameters);
[*] this.producer = new Producer<String, String>(config);
[*] }
[*]
[*] public Status process() {
[*] Status status = null;
[*] Channel channel = getChannel();
[*] Transaction transaction = channel.getTransaction();
[*]
[*] try {
[*] transaction.begin();
[*] Event event = channel.take();
[*] if (event != null) {
[*] String partitionKey = (String) parameters.get(PARTITION_KEY_NAME);
[*] String topic = Preconditions.checkNotNull((String) this.parameters.get(CUSTOME_TOPIC_KEY_NAME),
[*] "topic name is required");
[*] String eventData = new String(event.getBody(), DEFAULT_ENCODING);
[*] KeyedMessage<String, String> data = (partitionKey.isEmpty()) ? new KeyedMessage<String, String>(topic,
[*] eventData) : new KeyedMessage<String, String>(topic, partitionKey, eventData);
[*] LOGGER.info("Sending Message to Kafka : [" + topic + ":" + eventData + "]");
[*] producer.send(data);
[*] transaction.commit();
[*] LOGGER.info("Send message success");
[*] status = Status.READY;
[*] } else {
[*] transaction.rollback();
[*] status = Status.BACKOFF;
[*] }
[*] } catch (Exception e) {
[*] e.printStackTrace();
[*] LOGGER.info("Send message failed!");
[*] transaction.rollback();
[*] status = Status.BACKOFF;
[*] } finally {
[*] transaction.close();
[*] }
[*] return status;
[*] }
[*]
[*] @Override
[*] public void stop() {
[*] producer.close();
[*] }
[*]}
KafkaSpout参考其他人的代码:
https://github.com/HolmesNL/kafka-spout需要做一些修改。
storm测试程序:
viewplaincopyprint?http://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg
[*]public class ExclamationTopology {
[*]
[*]public static class ExclamationBolt extends BaseRichBolt {
[*] OutputCollector _collector;
[*] transient CountMetric _countMetric;
[*] transient MultiCountMetric _wordCountMetric;
[*] transient ReducedMetric _wordLengthMeanMetric;
[*] public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
[*] _collector = collector;
[*] initMetrics(context);
[*] }
[*]
[*] public void execute(Tuple tuple) {
[*] _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
[*] _collector.ack(tuple);
[*] updateMetrics(tuple.getString(0));
[*] }
[*]
[*] void updateMetrics(String word)
[*] {
[*] _countMetric.incr();
[*] _wordCountMetric.scope(word).incr();
[*] _wordLengthMeanMetric.update(word.length());
[*] }
[*] public void declareOutputFields(OutputFieldsDeclarer declarer) {
[*] declarer.declare(new Fields("word"));
[*] }
[*]
[*] void initMetrics(TopologyContext context)
[*] {
[*] _countMetric = new CountMetric();
[*] _wordCountMetric = new MultiCountMetric();
[*] _wordLengthMeanMetric = new ReducedMetric(new MeanReducer());
[*]
[*] context.registerMetric("execute_count", _countMetric, 5);
[*] context.registerMetric("word_count", _wordCountMetric, 60);
[*] context.registerMetric("word_length", _wordLengthMeanMetric, 60);
[*] }
[*]}
[*]
[*]public static void main(String[] args) throws Exception {
[*] TopologyBuilder builder = new TopologyBuilder();
[*] String topic = args.length==2 ? args[1] : args[0];
[*] KafkaSpout kafkaSpout = new KafkaSpout(topic,"testKafkaGroup","192.168.40.132:2181");
[*]
[*] builder.setSpout("word", kafkaSpout, 10);
[*] builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
[*] builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
[*]
[*] Config conf = new Config();
[*] conf.setDebug(true);
[*] conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 2);
[*]
[*] if (args != null && args.length == 2) {
[*] conf.setNumWorkers(3);
[*]
[*] StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
[*] }
[*] else {
[*]
[*] LocalCluster cluster = new LocalCluster();
[*] cluster.submitTopology("test", conf, builder.createTopology());
[*] Utils.sleep(10000);
[*] cluster.killTopology("test");
[*] cluster.shutdown();
[*] }
[*]}
[*]}
测试结果:
telnet 192.168.40.134 44444
在telnet端随便输入ASDF字符:
viewplaincopyprint?http://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg
[*]ASDF
[*]OK
[*]ASD
[*]OK
[*]F
[*]OK
[*]ASDF
[*]OK
flume端显示:
viewplaincopyprint?http://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg
[*]2014-04-08 01:30:44,379 (SinkRunner-PollingRunner-DefaultSinkProcessor) 77)] Sending Message t] Kafka :
[*]2014-04-08 01:30:44,387 (SinkRunner-PollingRunner-DefaultSinkProcessor) 80)] Send message success
[*]2014-04-08 01:30:44,604 (SinkRunner-PollingRunner-DefaultSinkProcessor) 77)] Sending Message t] Kafka :
[*]2014-04-08 01:30:44,611 (SinkRunner-PollingRunner-DefaultSinkProcessor) 80)] Send message success
[*]2014-04-08 01:30:44,794 (SinkRunner-PollingRunner-DefaultSinkProcessor) 77)] Sending Message t] Kafka :
[*]2014-04-08 01:30:44,799 (SinkRunner-PollingRunner-DefaultSinkProcessor) 80)] Send message success
[*]2014-04-08 01:30:45,038 (SinkRunner-PollingRunner-DefaultSinkProcessor) 77)] Sending Message t] Kafka :
最终在storm的logs/metrics.log文件中,会找到这样的记录:
viewplaincopyprint?http://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/CODE_ico.pnghttp://onexin.iyunv.com/source/plugin/onexin_bigdata/https://code.csdn.net/assets/ico_fork.svg
[*]2014-04-08 01:30:28,446 495106 1396945828 ubun
页:
[1]