lb20309 发表于 2017-5-23 15:37:58

kafka之java编程模型

package com.ganglia.kafka;
import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class ProducerTest2 {   
public static void main(String[] args) {   
Properties props = new Properties();   
props.setProperty("metadata.broker.list","bfdbjc1:9092,test1:9092,test2:9092");   
props.setProperty("serializer.class","kafka.serializer.StringEncoder");   
ProducerConfig config = new ProducerConfig(props);   
Producer<String, String> producer = new Producer<String, String>(config);   
try {   
int i =1;
while(true){
i++;
String text = new StringBuffer((i+"")).reverse()+":test-kafka_"+args+"_"+i;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("test",text);   
producer.send(data);   
Thread.sleep(100);
System.out.println(DateUtil.fmtDateToYMDHMS(new Date())+"\t"+text);
}
} catch (Exception e) {   
e.printStackTrace();   
}   
producer.close();   
}   
}

  1.安装zookeeper.
  2.启动zookeeper.
  3.启动kafka服务, 在zk1,zk2,zk3上分别运行:
   kafka-server-start.sh  ../config/server.properties /启动kafka
4. 新建一个TOPIC(replication-factor=num of brokers)
   kafka-topics.sh --create --topic test --replication-factor 3 --partitions 2 --zookeeper zk1:2181
5.假设我们在zk2上,开一个终端,发送消息至kafka
   kafka-console-producer.sh --broker-list zk1:9092 --sync --topic test
  在发送消息的终端输入:Hello Kafka
6.假设我们在zk3上,开一个终端,显示消息的消费(zk3模拟consumer)
   kafka-console-consumer.sh --zookeeper zk1:2181 --topic test --from-beginning

package com.ganglia.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class TestConsumer extends Thread{   
private final ConsumerConnector consumer;   
private final String topic;   
public static void main(String[] args) {   
TestConsumer consumerThread = new TestConsumer("test");   
consumerThread.start();   
}   
public TestConsumer(String topic) {   
consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());   
this.topic =topic;   
}   
private static ConsumerConfig createConsumerConfig() {   
Properties props = new Properties();   
props.put("zookeeper.connect","test1:2181,test2:2181,bfdbjc1:2181");   
props.put("group.id", "0");   
props.put("zookeeper.session.timeout.ms","10000");   
return new ConsumerConfig(props);   
}   
public void run(){   
Map<String,Integer> topickMap = new HashMap<String, Integer>();   
topickMap.put(topic, 1);   
Map<String, List<KafkaStream<byte[],byte[]>>>streamMap =consumer.createMessageStreams(topickMap);   
KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);   
ConsumerIterator<byte[],byte[]> it =stream.iterator();   
System.out.println("*********Results********");   
while(true){   
if(it.hasNext()){
/* MessageAndMetadata<byte[], byte[]> mm = it.next();
System.err.println("get data:" +new String(mm.message())); */
System.err.println("get data:" +new String(it.next().message()));
}
}   
}   
}
页: [1]
查看完整版本: kafka之java编程模型