设为首页 收藏本站
查看: 597|回复: 0

[经验分享] kafka之java编程模型

[复制链接]

尚未签到

发表于 2017-5-23 15:37:58 | 显示全部楼层 |阅读模式
package com.ganglia.kafka;
import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class ProducerTest2 {   
public static void main(String[] args) {   
Properties props = new Properties();   
props.setProperty("metadata.broker.list","bfdbjc1:9092,test1:9092,test2:9092");   
props.setProperty("serializer.class","kafka.serializer.StringEncoder");   
ProducerConfig config = new ProducerConfig(props);   
Producer<String, String> producer = new Producer<String, String>(config);   
try {   
int i =1;
while(true){
i++;
String text = new StringBuffer((i+"")).reverse()+":test-kafka_"+args[0]+"_"+i;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("test",text);   
producer.send(data);   
Thread.sleep(100);
System.out.println(DateUtil.fmtDateToYMDHMS(new Date())+"\t"+text);
}
} catch (Exception e) {   
e.printStackTrace();   
}   
producer.close();   
}   
}

  1.安装zookeeper.
  2.启动zookeeper.
  3.启动kafka服务, 在zk1,zk2,zk3上分别运行:
   kafka-server-start.sh  ../config/server.properties /启动kafka
4. 新建一个TOPIC(replication-factor=num of brokers)
   kafka-topics.sh --create --topic test --replication-factor 3 --partitions 2 --zookeeper zk1:2181
5.假设我们在zk2上,开一个终端,发送消息至kafka
   kafka-console-producer.sh --broker-list zk1:9092 --sync --topic test
  在发送消息的终端输入:Hello Kafka
6.假设我们在zk3上,开一个终端,显示消息的消费(zk3模拟consumer)
   kafka-console-consumer.sh --zookeeper zk1:2181 --topic test --from-beginning

package com.ganglia.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class TestConsumer extends Thread{   
private final ConsumerConnector consumer;   
private final String topic;   
public static void main(String[] args) {   
TestConsumer consumerThread = new TestConsumer("test");   
consumerThread.start();   
}   
public TestConsumer(String topic) {   
consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());   
this.topic =topic;   
}   
private static ConsumerConfig createConsumerConfig() {   
Properties props = new Properties();   
props.put("zookeeper.connect","test1:2181,test2:2181,bfdbjc1:2181");   
props.put("group.id", "0");   
props.put("zookeeper.session.timeout.ms","10000");   
return new ConsumerConfig(props);   
}   
public void run(){   
Map<String,Integer> topickMap = new HashMap<String, Integer>();   
topickMap.put(topic, 1);   
Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap =consumer.createMessageStreams(topickMap);   
KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);   
ConsumerIterator<byte[],byte[]> it =stream.iterator();   
System.out.println("*********Results********");   
while(true){   
if(it.hasNext()){
/* MessageAndMetadata<byte[], byte[]> mm = it.next();
System.err.println("get data:" +new String(mm.message())); */  
System.err.println("get data:" +new String(it.next().message()));
}
}   
}   
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-379826-1-1.html 上篇帖子: kafka简单介绍 下篇帖子: 【Kafka十】关于Kafka的offset管理
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表