设为首页 收藏本站
查看: 1999|回复: 0

[经验分享] 第四章 第一个rabbitmq程序

[复制链接]

尚未签到

发表于 2017-7-4 20:18:08 | 显示全部楼层 |阅读模式
  rabbitmq消息发送模型
DSC0000.png

  要素:


  • 生产者
  • 消费者
  • 交换器:生产者将消息发送到交换器
  • 队列:交换器通过某种路由规则绑定到指定队列,将消息加入队列,消费者从队列消费消息
  前提:
  引入rabbitmq的java客户端jar包



        <!-- import rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.6</version>
</dependency>
  一、消息生产者
  1、代码:



1 package com.xxx.producer;
2
3 import java.io.IOException;
4 import java.util.concurrent.TimeoutException;
5
6 import com.rabbitmq.client.Channel;
7 import com.rabbitmq.client.Connection;
8 import com.rabbitmq.client.ConnectionFactory;
9
10 /**
11  * 消息生产者
12  */
13 public class HelloWorldProducer {
14     private static final String QUEUE_NAME    = "helloQueue";
15     private static final String EXCHANGE_NAME = "helloExchange";
16
17     public static void main(String[] args) throws IOException, TimeoutException {
18         ConnectionFactory factory = new ConnectionFactory();// 建立连接工厂
19         factory.setHost("192.168.20.238");// 设置rabbitmq服务器地址
20         factory.setPort(5672);// 设置rabbitmq服务器端口
21         factory.setUsername("zhaojigang");
22         factory.setPassword("wangna");
23         factory.setVirtualHost("zhaojigangvhost");
24
25         Connection connection = factory.newConnection();// 建立连接
26         Channel channel = connection.createChannel();// 建立信道
27
28         /**
29          * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
30          * durable:队列是否持久化
31          * exclusive:当最后一个消费者取消订阅时,是否自动删除
32          * autoDelete:只有当前应用程序才能够消费队列消息(场景:限制一个队列只有一个消费者)
33          * arguments:other properties (construction arguments) for the queue
34          */
35         channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 创建队列(如果队列不存在,创建;如果存在,什么都不做)
36         /**
37          * exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)
38          * exchange:交换器名字
39          * type:3种类型 direct/fanout/topic
40          */
41         channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null);
42
43         for (int i = 0; i < 10; i++) {
44             String msg = "helloworld_" + i;// 创建消息
45             /**
46              * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
47              * exchange:交换器
48              * routingKey:路由键
49              * props:other properties for the message - routing headers etc
50              * body:消息体
51              */
52             channel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, msg.getBytes());// 发布消息
53             System.out.println("发送消息:msg-->" + msg);
54         }
55
56         channel.close();// 关闭信道
57         connection.close();// 关闭连接
58     }
59 }
  2、步骤:


  • 创建并设置连接工厂

    • host、port、username、password、vhost
    • 值得注意的是,一定要现在rabbitmq server上把username和password设置好,并且开启该用户在指定vhost上的权限,才可以设置连接工厂成功


  • 创建连接
  • 创建信道
  • 创建队列
  • 创建交换器
  • 创建(创建之后也可以配置消息)并发送消息
  • 关闭信道
  • 关闭连接
  3、注意点:



  • queueDeclare方法:如果队列不存在,创建;如果存在,什么都不做


  • basicPublish:发布消息到指定的交换器,并制定路由规则(用于消费者部分的绑定操作)

  二、消息消费者
  1、代码:



1 package com.xxx.consumer;
2
3 import java.io.IOException;
4 import java.util.concurrent.TimeoutException;
5
6 import com.rabbitmq.client.Channel;
7 import com.rabbitmq.client.Connection;
8 import com.rabbitmq.client.ConnectionFactory;
9 import com.rabbitmq.client.Consumer;
10 import com.rabbitmq.client.DefaultConsumer;
11 import com.rabbitmq.client.Envelope;
12 import com.rabbitmq.client.AMQP.BasicProperties;
13
14 /**
15  * 消息消费者
16  */
17 public class HelloWorldConsumer {
18     private final static String QUEUE_NAME = "helloQueue";
19     private static final String EXCHANGE_NAME = "helloExchange";
20
21     public static void main(String[] args) throws IOException, TimeoutException {
22         ConnectionFactory factory = new ConnectionFactory();// 建立连接工厂
23         factory.setHost("192.168.20.238");// 设置rabbitmq服务器地址
24         factory.setPort(5672);// 设置rabbitmq服务器端口
25         factory.setUsername("zhaojigang");
26         factory.setPassword("wangna");
27         factory.setVirtualHost("zhaojigangvhost");
28
29         Connection connection = factory.newConnection();// 建立连接
30         Channel channel = connection.createChannel();// 建立信道
31
32         /**
33          * Note that we declare the queue here, as well.
34          * Because we might start the receiver before the sender,
35          * we want to make sure the queue exists before we try to consume messages from it.
36          */
37         channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 创建队列(如果队列不存在,创建;如果存在,什么都不做)
38         channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null);
39         /**
40          * queueBind(String queue, String exchange, String routingKey)
41          */
42         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, QUEUE_NAME);
43         Consumer consumer = new DefaultConsumer(channel){
44             @Override
45             public void handleDelivery(String consumerTag,
46                                        Envelope envelope,
47                                        BasicProperties properties,
48                                        byte[] body) throws IOException {
49                 String msg = new String(body,"UTF-8");
50                 System.out.println("接收消息:msg-->" + msg);
51             }
52         };54             /**
55              * basicConsume(String queue, boolean autoAck, Consumer callback)
56              * autoAck true if the server should consider messages acknowledged once delivered;
57              * false if the server should expect explicit acknowledgements
          * 这里启动一个consume,该consume会不断的接收消息,如果此处用while(true)包起来的话,就会不断的启动consume
58              */
59           channel.basicConsume(QUEUE_NAME, true, consumer);61         
62 //        channel.close();// 关闭信道
63 //        connection.close();// 关闭连接
64     }
65 }
  2、步骤:


  • 创建并设置连接工厂

    • host、port、username、password、vhost


  • 创建连接
  • 创建信道
  • 创建队列
  • 创建交换器
  • 通过路由规则绑定队列和交换器
  • 创建消息处理函数
  • 从队列获取消息并消费消息(根据消息处理函数)
  三、测试
  1、启动rabbitmq服务器
  2、启动消费者进程
  3、启动生产者进程
  4、查看console即可或者查看rabbitmq的webUI

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390768-1-1.html 上篇帖子: nbtstat Linux版源码, 通过IP获取主机名 下篇帖子: 1,排序算法
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表