设为首页 收藏本站
查看: 3211|回复: 0

[经验分享] kafka安装和使用远程代码进行访问

[复制链接]

尚未签到

发表于 2019-1-31 11:38:13 | 显示全部楼层 |阅读模式
  kafka安装和使用java连接远程服务器进行消息的生成与消费
首先要使用kafka,要有jdk和zookeeper的环境
本文在阿里云的centos7环境上进行
jdk版本选择的是1.8.0_181
zookeeper的版本是3.4.12
kafka的版本是2.12-1.1.1
关于kafka命令的介绍 本文不介绍了 只介绍怎么搭建一个kafka单点服务器 以及怎么使用代码 远程连接kafka服务器
下载地址
kafka下载地址 :http://kafka.apache.org/downloads
  zookeeper下载地址:https://zookeeper.apache.org/
  jdk下载地址:https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
  操作步骤

1、首先 使用tar命令对jdk进行解压
tar -zxvf tar -zxvf jdk-8u181-linux-x64.tar.gz
目录下面会多出一个jdk1.8.0_181  进入里面去  使用pwd命令查看绝对路径  并且复制找个路径
最后进行jdk环境变量的配置
编辑 vim /etc/profile文件
在文件后面加上:
export JAVA_HOME=(刚才pwd命令看到的路径)
export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/            lib/    tools.jar
export PATH=$PATH:${JAVA_HOME}/bin
最后使用source /etc/profile 刷新文件
使用java -version 查看环境变量是否配置成功
2、成功之后进行zookeeper的安装
使用 tar -zxvf zookeeper-3.4.12.tar.gz 接下下载好的zookeeper安装包
将zookeeper下的/conf/zookeeper.example改名成zoo.cfg
使用mv 和cp命令都可以  然后vim这个文件 加上下面两行
dataLogDir=/tmp/zookeeper-log #日志路径
quorumListenOnAllIPs=true #在阿里云的服务器上保证外网可以访问到  刚开始没设置这个折腾了好久
3、最后,安装kafka
使用 tar -zxvf kafka_2.12-1.1.1.tgz 解压下载好的kafka
cd 到解压后的文件里面去  编辑配置文件  vim config/server.properties
加上下面几行
listeners=PLAINTEXT://:9092
advertised.host.name=阿里云服务器公网ip #
advertised.port=9092
将zookeeper.connect的值改为阿里云的公网ip
至此,所有的环境的安装已经完成,下面使用kafka的命令进行消息的生成和消费
```
首先cd到zookeeper的bin目录下  使用 ./zkServer.sh start 启动zookeeper
再cd到kafka的bin目录下 使用 ./kafka-server-start.sh ../config/server.properties 启动kafka
新建一个会话或者打开一个新的终端  
这时候使用jps命令  可以看到 Kafka和QuorumPeerMain表示启动全部成功,下面创建一个主题
cd到kafka的bin目录下面,执行
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --     partitions 1 --topic Hello-world
输出Created topic "Hello-world".  表示topic创建成功
使用./kafka-topics.sh --list --zookeeper localhost:2181 查看主题的列表
输出里面会含有Hello-world
下面进行消息的生产和消费
先启动生产者 ./kafka-console-producer.sh --broker-list 阿里云公网ip:9092 --topic Hello-        world
会出现一个 >  类似于交互界面 这时候就可以生产消息了
启动消费者 ./kafka-console-consumer.sh --zookeeper 阿里云公网ip:2181 --topic Hello-       world --from-beginning
这时候当生产者生产消息的时候  消费者这边就可以看到了  
在服务器上面进行消息的生产和消费就完成了 下面介绍怎么使用java代码进行远程连接kafka服务器
这个地方真的踩了好多好多坑、有次晚上下班搞到了快两点 百度、谷歌、维基、Stack Overflow 能找解决问题的地方都找了浪费了好多不必要的时间

  首先、新建一个Maven工程(此处不再多描述),在pom文件中加入kafka的依赖

org.apache.kafka
kafka_2.10
0.8.2.0


新建一个KafkaProducerDemo和KafkaConsumerDemo类(名字可以自定义):
话不多说  上代码
KafkaProducerDemo类:
public class KafkaProducerDemo {
public static void main(String[] args) {
//创建properties文件
Properties properties = new Properties();
//设置kafka服务器地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里云公网ip:9092");
//设置key进行序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//设置value进行序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//创建消息生产者
KafkaProducer producer = new KafkaProducer(properties);
//创建消息实体 制定主题、key、value
ProducerRecord record = new ProducerRecord("Hello-world","haha","from java client");
//发送消息
producer.send(record);
System.out.println("消息发送成功");
//关闭生产者
producer.close();
}
}
KafkaConsumerDemo类:
public class KafkaConsumerDemo {
public static void main(String[] args) {
//新建配置文件
Properties properties = new Properties();
//设置kafka服务器地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"阿里云公网ip:9092");
//设置key的反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//设置value的反序列化
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//设置groupid
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//创建消费者对象
KafkaConsumer consumer = new KafkaConsumer(properties);
//订阅主题
consumer.subscribe(Arrays.asList("Hello-world"));
while (true) {
//消费消息
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records)
System.out.println("消息的主题是:" + record.topic()+",消息的key是:" + record.key()+",消息的value是:"+record.value());
}
}
}
上面就是连接kafka远程服务器代码

但是上述过程做完之后还是不能正确运行、这个地方折腾了好久、最后在哪里看到解决的办法记不大清了
就是要阿里云服务器服务安全设置里面加个规则 将2181和9092端口开放就可以,但是我中间也使用命令的方式
关闭了防火墙、没什么用,不知道什么鬼。 搞得我头皮发麻



运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-670002-1-1.html 上篇帖子: 【3分钟读懂Kafka原理系列】细数MQ那些不得不说的8大好处 下篇帖子: 史上最全、最详细的 kafka 学习笔记!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表