Kafka 分区备份实战
package cn.hadoop.hdfs.kafka.partition;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;/**
* @Date Nov 3, 2016
*
* @Author dengjie
*
* @Note 通过 Kafka 的消费者 API 验证分区入库的消息 */public class PartitionerConsumer {
public static void main(String[] args) {
String topic = "ke_test";
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
Map topicCountMap = new HashMap();
topicCountMap.put(topic, new Integer(1));
Map consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream stream = consumerMap.get(topic).get(0);
ConsumerIterator it = stream.iterator(); while (it.hasNext()) {
MessageAndMetadata mam = it.next();
System.out.println("consume: Partition [" + mam.partition() + "] Message: [" + new String(mam.message()) + "] ..");
}
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("group.id", "group1");
props.put("zookeeper.connect", "zk1:2181,zk2:2181,zk3:2181");
props.put("zookeeper.session.timeout.ms", "40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
return new ConsumerConfig(props);
}
}
页:
[1]