忧郁者 发表于 2019-1-31 09:56:17

Kafka 分区备份实战

package cn.hadoop.hdfs.kafka.partition;  

  
import java.util.HashMap;
  
import java.util.List;
  
import java.util.Map;
  
import java.util.Properties;
  

  
import kafka.consumer.Consumer;
  
import kafka.consumer.ConsumerConfig;
  
import kafka.consumer.ConsumerIterator;
  
import kafka.consumer.KafkaStream;
  
import kafka.javaapi.consumer.ConsumerConnector;
  
import kafka.message.MessageAndMetadata;/**
  
* @Date Nov 3, 2016
  
*
  
* @Author dengjie
  
*
  
* @Note 通过 Kafka 的消费者 API 验证分区入库的消息 */public class PartitionerConsumer {
  
    public static void main(String[] args) {
  
      String topic = "ke_test";
  
      ConsumerConnector consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
  
      Map topicCountMap = new HashMap();
  
      topicCountMap.put(topic, new Integer(1));
  
      Map consumerMap = consumer.createMessageStreams(topicCountMap);
  
      KafkaStream stream = consumerMap.get(topic).get(0);
  
      ConsumerIterator it = stream.iterator();      while (it.hasNext()) {
  
            MessageAndMetadata mam = it.next();
  
            System.out.println("consume: Partition [" + mam.partition() + "] Message: [" + new String(mam.message())                  + "] ..");
  
      }
  

  
    }
  

  
    private static ConsumerConfig createConsumerConfig() {
  
      Properties props = new Properties();
  
      props.put("group.id", "group1");
  
      props.put("zookeeper.connect", "zk1:2181,zk2:2181,zk3:2181");
  
      props.put("zookeeper.session.timeout.ms", "40000");
  
      props.put("zookeeper.sync.time.ms", "200");
  
      props.put("auto.commit.interval.ms", "1000");
  
      props.put("auto.offset.reset", "smallest");
  
      return new ConsumerConfig(props);
  
    }
  
}


页: [1]
查看完整版本: Kafka 分区备份实战