|
package com.ganglia.kafka;
import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class ProducerTest2 {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("metadata.broker.list","bfdbjc1:9092,test1:9092,test2:9092");
props.setProperty("serializer.class","kafka.serializer.StringEncoder");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
try {
int i =1;
while(true){
i++;
String text = new StringBuffer((i+"")).reverse()+":test-kafka_"+args[0]+"_"+i;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("test",text);
producer.send(data);
Thread.sleep(100);
System.out.println(DateUtil.fmtDateToYMDHMS(new Date())+"\t"+text);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}
1.安装zookeeper.
2.启动zookeeper.
3.启动kafka服务, 在zk1,zk2,zk3上分别运行:
kafka-server-start.sh ../config/server.properties /启动kafka
4. 新建一个TOPIC(replication-factor=num of brokers)
kafka-topics.sh --create --topic test --replication-factor 3 --partitions 2 --zookeeper zk1:2181
5.假设我们在zk2上,开一个终端,发送消息至kafka
kafka-console-producer.sh --broker-list zk1:9092 --sync --topic test
在发送消息的终端输入:Hello Kafka
6.假设我们在zk3上,开一个终端,显示消息的消费(zk3模拟consumer)
kafka-console-consumer.sh --zookeeper zk1:2181 --topic test --from-beginning
package com.ganglia.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class TestConsumer extends Thread{
private final ConsumerConnector consumer;
private final String topic;
public static void main(String[] args) {
TestConsumer consumerThread = new TestConsumer("test");
consumerThread.start();
}
public TestConsumer(String topic) {
consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic =topic;
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect","test1:2181,test2:2181,bfdbjc1:2181");
props.put("group.id", "0");
props.put("zookeeper.session.timeout.ms","10000");
return new ConsumerConfig(props);
}
public void run(){
Map<String,Integer> topickMap = new HashMap<String, Integer>();
topickMap.put(topic, 1);
Map<String, List<KafkaStream<byte[],byte[]>>> streamMap =consumer.createMessageStreams(topickMap);
KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);
ConsumerIterator<byte[],byte[]> it =stream.iterator();
System.out.println("*********Results********");
while(true){
if(it.hasNext()){
/* MessageAndMetadata<byte[], byte[]> mm = it.next();
System.err.println("get data:" +new String(mm.message())); */
System.err.println("get data:" +new String(it.next().message()));
}
}
}
} |
|
|