RabbitMQ学习笔记1-hello world
安装过程略过,一搜一大把。rabbitmq管理控制台:http://localhost:15672/ 默认账户:guest/guest
RabbitMQ默认监听端口:5672
JAVA API地址:http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.6.5/rabbitmq-java-client-javadoc-3.6.5/
引入rabbitmq-java-client-bin-3.6.5下的jar包:
简单的基本流程是:
生产者:
1、连接到RabbitMQ
2、获取channel
3、声明exchange
4、创建并发布消息
5、关闭信道和连接
消费者:
1、连接到RabbitMQ
2、获取channel
3、声明exchange
4、声明Queue
5、使用路由键将exchange和queue绑定
6、消费消息
7、关闭信道和连接
1、消息生产者SenderWithExchange.java
1 package com.yzl.test1;
2
3 import com.rabbitmq.client.Channel;
4 import com.rabbitmq.client.Connection;
5 import com.rabbitmq.client.ConnectionFactory;
6
7 /**
8* 使用交换器的生产者
9* @author: yzl
10* @date: 2016-10-16
11*/
12 public class SenderWithExchange {
13 //路由键名称
14 private final static String ROUTING_KEY = "rk";
15
16 public static void main(String[] args) throws Exception {
17 //连接到rabbitmq服务器
18 ConnectionFactory factory = new ConnectionFactory();
19 factory.setHost("localhost");
20 Connection connection = factory.newConnection();
21 //创建一个信道
22 Channel channel = connection.createChannel();
23 //声明direct类型的交换器
24 channel.exchangeDeclare("myDirectExchange","direct");
25
26 String msg = null;
27 for(int i=1; i<100; i++){
28 msg = "hello world" + i;
29 //发送消息给myDirectExchange交换器,并且路由键是rk
30 channel.basicPublish("myDirectExchange", ROUTING_KEY, null, msg.getBytes());
31 System.out.println("Sender send new msg:" + msg);
32 Thread.sleep(2000);
33 }
34
35 //关闭信道
36 channel.close();
37 //关闭连接
38 connection.close();
39 }
40 }
2、消息消费者ReceiverWithExchange.java
1 package com.yzl.test1;
2
3 import java.io.IOException;
4
5 import com.rabbitmq.client.AMQP.BasicProperties;
6 import com.rabbitmq.client.Channel;
7 import com.rabbitmq.client.Connection;
8 import com.rabbitmq.client.ConnectionFactory;
9 import com.rabbitmq.client.Consumer;
10 import com.rabbitmq.client.DefaultConsumer;
11 import com.rabbitmq.client.Envelope;
12
13 /**
14* 消息消费者
15* @author: yzl
16* @date: 2016-10-16
17*/
18 public class ReceiverWithExchange {
19 //消息队列名称
20 private final static String QUEUE_NAME = "hello";
21 //路由键名称
22 private final static String ROUTING_KEY = "rk";
23 /**
24 * @param args
25 */
26 public static void main(String[] args) throws Exception{
27 ConnectionFactory factory = new ConnectionFactory();
28 factory.setHost("localhost");
29 Connection connection = factory.newConnection();
30 Channel channel = connection.createChannel();
31 //定义队列
32 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
33 //定义direct类型的交换器
34 channel.exchangeDeclare("myDirectExchange","direct");
35 //通过路由键 将 队列和交换器绑定起来
36 channel.queueBind(QUEUE_NAME, "myDirectExchange", ROUTING_KEY);
37 System.out.println("Receiver waiting to accept msg");
38
39 //消息消费处理器
40 Consumer consumer = new DefaultConsumer(channel){
41 @Override
42 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
43 throws IOException {
44 String msg = new String(body);
45 System.out.println("accept a new msg :" + msg);
46 }
47 };
48 //绑定消息处理器
49 channel.basicConsume(QUEUE_NAME, true, consumer);
50 }
51
52 }
如果先运行SenderWithExchange后运行ReceiverWithExchange将会丢失一部分时间内的数据。反之则没问题,具体原因可以看RabbitMQ学习笔记2-理解消息通信
direct交换器是使用for循环的形式一个一个交替发给所有绑定的队列上去
页:
[1]