pengjunling 发表于 2017-7-3 06:00:33

RabbitMQ(四)

准备:
  这节主要讲解Rabbit的发布/订阅。前面我们所讲的是生产者将消息直接放到队列,然后消费者再从队列中获取消息。但实际上,RabbitMQ中消息传递模型的核心思想是:生产者不直接发送消息到队列。实际的运行环境中,生产者是不知道消息会发送到那个队列上,它只会将消息发送到一个交换器,交换器也像一个生产线,一边接收生产者发来的消息,另外一边则根据交换规则,将消息放到队列中。交换器必须知道它所接收的消息是什么?它应该被放到哪个队列中?它应该被添加到多个队列吗?还是应该丢弃?这些规则都是按照交换器的规则来确定的。

  交换机的规则有:
  direct(直连)、topic(主题)、headers(标题)、fanout(分发/扇出)
  匿名交换器:
    在之前的教程中,我们知道,发送消息到队列时根本没有使用交换器,但是消息也能发送到队列。这是因为RabbitMQ选择了一个空“”字符串的默认交换器。
      channel.basicPublish("", "task_queue", null, message.getBytes());
        第一个参数就是交换器的名称。如果输入""空字符串,表示使用默认的匿名交换器。
  第二个参数是【routingKey】路由线索
  根据匿名交换器规则: 发送到routingKey名称对应的队列。
  临时队列:
  如果要在生产者和消费者之间创建一个新的队列,又不想使用原来的队列,临时队列就是为这个场景而生的:


[*]
[*]首先,每当我们连接到RabbitMQ,我们需要一个新的空队列,我们可以用一个随机名称来创建,或者说让服务器选择一个随机队列名称给我们。
[*]  一旦我们断开消费者,队列应该立即被删除。

  在JAVA客户端,提供queuedeclare()为我们创建一个非持久化、独立、自动删除的队列名称。
  String queueName = channel.queueDeclare().getQueue();//获取到一个随机队列名称。

下面我们就讲解不同交换机的使用:

一:fanout(分发/扇出):每个消费者得到相同的消息,与队列绑定时,不需要绑定值routingKey(即queueBind的第三个参数),有也会忽略

     
    首先创建一个生产类EmitLog



package com.exchange;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//创建一个交换机,并设置名称和类型
//fanout表示分发,所以的消费者得到同样的队列信息
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//分发信息
for(int i=0; i<5; i++){
String message = "Hello World:" + i;
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("EmitLog Send:" + message);
}
channel.close();
connection.close();
}
}
  然后创建两个消费者ReceiveLogs1和ReceiveLogs2



package com.exchange;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP;
public class ReceiveLogs1 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//产生一个随机队列的名称
String queueName = channel.queueDeclare().getQueue();
//将交换机和随机队列进行绑定
channel.queueBind(queueName, EXCHANGE_NAME,"");
System.out.println("ReceiveLogs1 Waiting for message");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws UnsupportedEncodingException{
String message = new String(body,"UTF-8");
System.out.println("ReceiveLogs1 Received:" + message);
}
};
channel.basicConsume(queueName, true,consumer);
}
}


package com.exchange;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class ReceiveLogs2 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//产生一个随机队列的名称
String queueName = channel.queueDeclare().getQueue();
//将交换机和随机队列进行绑定
channel.queueBind(queueName, EXCHANGE_NAME,"");
System.out.println("ReceiveLogs2 Waiting for message");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws UnsupportedEncodingException{
String message = new String(body,"UTF-8");
System.out.println("ReceiveLogs2 Received:" + message);
}
};
channel.basicConsume(queueName, true,consumer);
}
}
  然后先运行两个消费者,等待接收队列的消息,最后运行生产者,最终结果如下:



  可以看出,两个消费者都得到了队列中的消息。

二:direct(直连):采用路由的方式对不同的消息进行过滤,与队列绑定时,根据绑定值routingKey(即queueBind的第三个参数)来分发消息。同时,多个队列可以以相同的路由关键字绑定到同一个交换器中。
    
  首先创建生产端的代码。



package com.mq.exchange.direct;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RoutingSendDirect {
private static final String EXCHANGE_NAME = "direct_logs";
//路由关键字routing key
private static final String[] routingKeys = new String[]{"info","warning","error"};
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明交换机,类型是dierct直连
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//发送消息
for(String routingKey : routingKeys){
String message = "RoutingSendDirect Send the Message level:" + routingKey;
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println("RoutingSendDirect Send " + routingKey + ":" + message);
}
channel.close();
connection.close();
}
}
  接着分别创建两个消费者:



package com.mq.exchange.direct;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class ReceiveLogsDirect1 {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String[] routingKeys = new String[]{"info","warning"};
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//产生一个随机队列名称
String queueName = channel.queueDeclare().getQueue();
//根据路由关键字来进行队列和交换机的绑定
//这里一个队列绑定两个路由关键字(info,warning),即此队列可以获取交换机中routingKey为info和warning的消息
for(String routingKey : routingKeys){
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
System.out.println("ReceiveLogsDirect1 exchange:" + EXCHANGE_NAME + ",queue:"
+ queueName +",BindRoutingKey:" +routingKey );
}
System.out.println("ReceiveLogsDirect1 Waiting for message");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws UnsupportedEncodingException{
String message = new String(body,"UTF-8");
System.out.println("ReceiveLogsDirect1 Received " + envelope.getRoutingKey() +":" + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}


package com.mq.exchange.direct;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class ReceiveLogsDirect2 {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String[] routingKeys = new String[]{"error"};
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//产生一个随机队列名称
String queueName = channel.queueDeclare().getQueue();
//根据路由关键字来进行队列和交换机的绑定
//这里一个队列绑定两个路由关键字(info,warning),即此队列可以获取交换机中routingKey为info和warning的消息
for(String routingKey : routingKeys){
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
System.out.println("ReceiveLogsDirect2 exchange:" + EXCHANGE_NAME + ",queue:"
+ queueName +",BindRoutingKey:" +routingKey );
}
System.out.println("ReceiveLogsDirect2 Waiting for message");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws UnsupportedEncodingException{
String message = new String(body,"UTF-8");
System.out.println("ReceiveLogsDirect2 Received " + envelope.getRoutingKey() + ":" + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
  接着先运行两个消费者,等待接收队列的消息,最后运行生产者,最终结果如下:




三:headers(标题):此类型的exchange使用的比较少,它也是忽略routingKey的一种路由方式。是使用Headers来匹配的。Headers是一个键值对,可以定义成Hashtable。
    发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。fanout,direct,topic exchange的routingKey都需要要字符串形式的,而headers exchange则没有这个要求,因为键值对的值可以是任何类型。
  实例:
    创建一个生产者类:



package com.mq.exchange.headers;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private final static String EXCHANGE_NAME = "header-exchange";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//创建一个交换机,并设置名称和类型
//参数1:交换机名称
//参数2:交换机类型
//参数3:是否持久化
//参数4:是否自动删除
//参数5:交换机的其他参数
channel.exchangeDeclare(EXCHANGE_NAME,"headers",false,true,null);
Map<String, Object> map =new Hashtable<String, Object>();
map.put("ABC", "12345");
//创建properties
Builder properties = new Builder();
//把map放入headers中
            properties.headers(map);
//持久化,参数:1表示不持久话,2表示持久化
properties.deliveryMode(2);
//指定消息发送到交换机,绑定键值对headers
String message = "Hello RabbitMQ:headers";
channel.basicPublish(EXCHANGE_NAME, "", properties.build(),message.getBytes());
System.out.println("Producer Sent message :'" + message + "'");
channel.close();
connection.close();
}
}
  创建一个消费者类:



package com.mq.exchange.headers;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Consumer {
private final static String EXCHANGE_NAME = "header-exchange";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"headers",false,true,null);
Map<String, Object> getMap = new Hashtable<String, Object>();
//all和any,any有一个键值对匹配即可
getMap.put("x-match", "any");
getMap.put("ABC", "12345");
getMap.put("abc", "54321");
//产生一个随机队列的名称
String queueName = channel.queueDeclare().getQueue();
//将交换机和随机队列进行绑定,并指定绑定的header键值对
channel.queueBind(queueName, EXCHANGE_NAME,"",getMap);
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws UnsupportedEncodingException{
String message = new String(body,"UTF-8");
System.out.println("Consumer Received:" + message);
}
};
channel.basicConsume(queueName, true,consumer);
}
}
  先运行消费者,然后运行生产者,得到结果如下:



四:topic(主题):这种应该属于模糊匹配*:可以替代一个词   #:可以替代0或者更多的词

  下面我们看下实例来了解了解。
  首先创建一个发送类TopicSend



package com.mq.exchange.topic;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class TopicSend {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//待发送的消息
String[] routingKeys=new String[]{
"quick.orange.rabbit",
"lazy.orange.elephant",
"quick.orange.fox",
"lazy.brown.fox",
"quick.brown.fox",
"quick.orange.male.rabbit",
"lazy.orange.male.rabbit"
};
//发送消息
for(String routing : routingKeys){
String message = "From " + routing + "routingKeys message" ;
channel.basicPublish(EXCHANGE_NAME, routing, null, message.getBytes());
System.out.println("TopicSend Send " + routing + ":" + message);
}
channel.close();
connection.close();
}
}
  创建两个接收类:



package com.mq.exchange.topic;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class ReceiveLogsTopic1 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String queueName = channel.queueDeclare().getQueue();
//路由关键字
String[] routingKeys = new String[]{"*.orange.*"};
//绑定路由
for(String routing : routingKeys){
channel.queueBind(queueName, EXCHANGE_NAME, routing);
System.out.println("ReceiveLogsTopic1 exchange :" + EXCHANGE_NAME + ",queue:"
+ queueName +",BindRountingKey:" + routing );
}
System.out.println("ReceiveLogsTopic1 Waiting for messages");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws UnsupportedEncodingException{
String message = new String(body,"UTF-8");
System.out.println("ReceiveLogsTopic1 Received " + envelope.getRoutingKey() + ":" + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}


package com.mq.exchange.topic;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class ReceiveLogsTopic2 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String queueName = channel.queueDeclare().getQueue();
//路由关键字
String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"};
//绑定路由
for(String routing : routingKeys){
channel.queueBind(queueName, EXCHANGE_NAME, routing);
System.out.println("ReceiveLogsTopic2 exchange :" + EXCHANGE_NAME + ",queue:"
+ queueName +",BindRountingKey:" + routing );
}
System.out.println("ReceiveLogsTopic2 Waiting for messages");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws UnsupportedEncodingException{
String message = new String(body,"UTF-8");
System.out.println("ReceiveLogsTopic2 Received " + envelope.getRoutingKey() + ":" + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
  最后先运行两个接收类,在运行发送类,结果如下所示:


页: [1]
查看完整版本: RabbitMQ(四)