RabbitMQ(二)
在学习RabbitMQ之前,我们先简单了解几个概念。RabbitMQ是什么:
RabbitMQ 是一个消息代理。主要的原理就是通过接受和转发消息。
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,消息中间件主要用于组件之间的解耦。服务器端用Erlang语言编写,
支持多种客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。
RabbitMQ优势是什么:
1、高并发2、负载均衡 3、消息持久4、耦合低 5、响应速度快
几个术语介绍:
Connection:是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。
ConnectionFactory:Connection的制造工厂。
Channel:是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、
定义 Exchange、绑定Queue与Exchange、发布消息等。
生产者:生产过程就像发送过程,发送消息的程序就是一个生产者,我们使用“P”来描述它。
消费者:消费过程与接收相似,一个消费者通常是一个等着接受消息的程序,我们使用"C"来描述。
Queue(队列):是RabbitMQ的内部对象,用于存储消息,用下图表示
生成者生产消息,发送消息到队列,消费者可以从队列获取消息并消费。
多个消费者订阅同一个队列,消息平摊:
Exchange(交换机):用下图表示
实际情况:生产者投递消息永远只会先经过交换机,交换机根据交换机类型或规则路由到队列。
如果没有路由匹配到队列,则消息会丢失。
Routing key:用于绑定交换机与队列间的key。
生产者发送消息给交换机时,一般都会指定routing key,用于绑定交换机与队列。
这还跟交换机类型(Exchange Type)有关:四种类型,fanout,direct,topic,headers
Binding:交换机与队列绑定,指向消息流向
使用流程:
1客户端连接到消息队列服务器,打开一个channel。
2客户端声明一个exchange,并设置相关属性。
3客户端声明一个queue,并设置相关属性。
4客户端使用routing key,在exchange和queue之间建立好绑定关系。
5客户端投递消息到exchange。exchange接收到消息后,就根据消息的key和已经设置的binding,
进行消息路由,将消息投递到一个或多个队列里。
下面我们就正式进入RabbitMQ的学习了。我们将带着下面的问题进行RabbitMQ系列的学习:
1:如果消费者连接中断,这期间我们应该怎么办
2:如何做到负载均衡
3:如何有效的将数据发送到相关的接收者?就是怎么样过滤
4:如何保证消费者收到完整正确的数据
5:如何让优先级高的接收者先收到数据
项目开始
1:创建一个Maven项目,然后引入jar包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
2:创建一个消息生产者类,Producer
package com.mq;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
*
* <p>消息生产者</p>
*/
public class Producer {
public final static String QUEUE_NAME = "rabbitMQ.test";
public static void main(String[] args) throws IOException, TimeoutException{
//创建RabbitMQ的连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ相关信息
factory.setHost("localhost");//创建一个新连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//创建一个队列
//第一个参数:表示队列名称
//第二个参数:是否持久化,true表示是,队列将在服务器重启时生存
//第三个参数:是否是独占队列,创建者可以使用的私有队列,断开后自动删除
//第四个参数:当所有消费者客户端连接断开时是否自动删除队列
//第五个参数:队列的其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello RabbitMQ";
//发送消息到队列中
//第一个参数:交换机的名称
//第二个参数:队列映射的路由key
//第三个参数:消息的其他属性
//第四个参数:发送信息的主体
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("Producer Send:"+message);
//关闭通知和连接
channel.close();
connection.close();
}
}
3:创建一个Customer消费者类
package com.mq;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
public class Customer {
private final static String QUEUE_NAME = "rabbitMQ.test";
public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ的地址
factory.setHost("localhost");
//创建一个新连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明要关注的队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("Customer Waiting Received message");
/*
//接收方式一:
//DefaultCoustomer类实现了Consumer接口,通过传入一个频道
//告诉服务器我们需要哪个频道的消息,如果频道到有消息,就会执行回调函数handleDeliver
//envelope主要存放生产者相关的信息,比如交换机,路由key等,body是消息实体
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws UnsupportedEncodingException{
String message = new String(body,"UTF-8");
System.out.println("Customer Received:" + message);
}
};
//自动回复队列应答----RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, true,consumer);
*/
//接收方式二:
QueueingConsumer consumer = new QueueingConsumer(channel);
//自动回复队列应答----RabbitMQ中的消息确认机制
//注册一个新消息到达的回调,第二个参数表示ack,默认是false,表示字段回复,不是自动回复则需要basicAck来手动回复
//显示的告诉MQ已经取得了消息,否则MQ会将该消息重新分配给另外一个绑定在该队列上的消费者
channel.basicConsume(QUEUE_NAME, true,consumer);
while(true){
//线程阻塞,挂起知道队列传输消息到来
Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("Customer Received2:" + message);
}
}
}
4:在测试之前必须记得先运行sbin下面的rabbitmq-server.bat。
之后各自运行Producer和Customer,没有顺序之分,结果Producer和Customer控制台内容为:
接收方式一:
接收方式二:
注:
如果运行的时候保报错:com.rabbitmq.client.ShutdownSignalException
或com.rabbitmq.client.PossibleAuthenticationFailureException
你可以尝试一下几种操作:
一: rabbitmq服务通道是持久通道,该queue 已经存在, 而且通道属性跟最近修改后的属性不一致
而导致无法更新queue。你可以在客户端中清除队列缓存并删除。可参看:
https://www.oschina.net/question/190643_155714?sort=time
二: 一些权限的配置,可参看
http://www.iyunv.com/Linux/2014-10/107917.htm
注:由于本教程中rabbitmq是在本机安装,使用的是默认端口(5672)。
如果你的例子运行中的主机、端口不同,请进行必要设置,否则可能无法运行。
页:
[1]