设为首页 收藏本站
查看: 747|回复: 0

[经验分享] RabbitMQ(四)

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2017-7-3 06:00:33 | 显示全部楼层 |阅读模式
准备:
  这节主要讲解Rabbit的发布/订阅。前面我们所讲的是生产者将消息直接放到队列,然后消费者再从队列中获取消息。但实际上,RabbitMQ中消息传递模型的核心思想是:生产者不直接发送消息到队列。实际的运行环境中,生产者是不知道消息会发送到那个队列上,它只会将消息发送到一个交换器,交换器也像一个生产线,一边接收生产者发来的消息,另外一边则根据交换规则,将消息放到队列中。交换器必须知道它所接收的消息是什么?它应该被放到哪个队列中?它应该被添加到多个队列吗?还是应该丢弃?这些规则都是按照交换器的规则来确定的。
DSC0000.png

  交换机的规则有:
  direct(直连)、topic(主题)、headers(标题)、fanout(分发/扇出)
  匿名交换器:
    在之前的教程中,我们知道,发送消息到队列时根本没有使用交换器,但是消息也能发送到队列。这是因为RabbitMQ选择了一个空“”字符串的默认交换器。
      channel.basicPublish("", "task_queue", null, message.getBytes());
        第一个参数就是交换器的名称。如果输入""空字符串,表示使用默认的匿名交换器。
  第二个参数是【routingKey】路由线索
  根据匿名交换器规则: 发送到routingKey名称对应的队列
  临时队列:
  如果要在生产者和消费者之间创建一个新的队列,又不想使用原来的队列,临时队列就是为这个场景而生的:



    • 首先,每当我们连接到RabbitMQ,我们需要一个新的空队列,我们可以用一个随机名称来创建,或者说让服务器选择一个随机队列名称给我们。
    •   一旦我们断开消费者,队列应该立即被删除。

  在JAVA客户端,提供queuedeclare()为我们创建一个非持久化、独立、自动删除的队列名称。
  String queueName = channel.queueDeclare().getQueue();//获取到一个随机队列名称。

下面我们就讲解不同交换机的使用:

一:fanout(分发/扇出):每个消费者得到相同的消息,与队列绑定时,不需要绑定值routingKey(即queueBind的第三个参数),有也会忽略

      DSC0001.png
    首先创建一个生产类EmitLog



package com.exchange;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//创建一个交换机,并设置名称和类型
//fanout表示分发,所以的消费者得到同样的队列信息
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//分发信息
for(int i=0; i<5; i++){
String message = "Hello World:" + i;
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("EmitLog Send:" + message);
}
channel.close();
connection.close();
}
}
  然后创建两个消费者ReceiveLogs1和ReceiveLogs2



package com.exchange;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP;
public class ReceiveLogs1 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//产生一个随机队列的名称
String queueName = channel.queueDeclare().getQueue();
//将交换机和随机队列进行绑定
channel.queueBind(queueName, EXCHANGE_NAME,"");
System.out.println("ReceiveLogs1 Waiting for message");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws UnsupportedEncodingException{
String message = new String(body,"UTF-8");
System.out.println("ReceiveLogs1 Received:" + message);
}
};
channel.basicConsume(queueName, true,consumer);
}
}


package com.exchange;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class ReceiveLogs2 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//产生一个随机队列的名称
String queueName = channel.queueDeclare().getQueue();
//将交换机和随机队列进行绑定
channel.queueBind(queueName, EXCHANGE_NAME,"");
System.out.println("ReceiveLogs2 Waiting for message");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws UnsupportedEncodingException{
String message = new String(body,"UTF-8");
System.out.println("ReceiveLogs2 Received:" + message);
}
};
channel.basicConsume(queueName, true,consumer);
}
}
  然后先运行两个消费者,等待接收队列的消息,最后运行生产者,最终结果如下:
DSC0002.png

DSC0003.png

DSC0004.png

  可以看出,两个消费者都得到了队列中的消息。

二:direct(直连):采用路由的方式对不同的消息进行过滤,与队列绑定时,根据绑定值routingKey(即queueBind的第三个参数)来分发消息。同时,多个队列可以以相同的路由关键字绑定到同一个交换器中。
     DSC0005.png
  首先创建生产端的代码。



package com.mq.exchange.direct;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RoutingSendDirect {
private static final String EXCHANGE_NAME = "direct_logs";
//路由关键字routing key
private static final String[] routingKeys = new String[]{"info","warning","error"};
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明交换机,类型是dierct直连
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//发送消息
for(String routingKey : routingKeys){
String message = "RoutingSendDirect Send the Message level:" + routingKey;
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println("RoutingSendDirect Send " + routingKey + ":" + message);
}
channel.close();
connection.close();
}
}
  接着分别创建两个消费者:



package com.mq.exchange.direct;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class ReceiveLogsDirect1 {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String[] routingKeys = new String[]{"info","warning"};
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//产生一个随机队列名称
String queueName = channel.queueDeclare().getQueue();
//根据路由关键字来进行队列和交换机的绑定
//这里一个队列绑定两个路由关键字(info,warning),即此队列可以获取交换机中routingKey为info和warning的消息
for(String routingKey : routingKeys){
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
System.out.println("ReceiveLogsDirect1 exchange:" + EXCHANGE_NAME + ",queue:"
+ queueName +",BindRoutingKey:" +routingKey );
}
System.out.println("ReceiveLogsDirect1 Waiting for message");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws UnsupportedEncodingException{
String message = new String(body,"UTF-8");
System.out.println("ReceiveLogsDirect1 Received " + envelope.getRoutingKey() +":" + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}


package com.mq.exchange.direct;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class ReceiveLogsDirect2 {
private static final String EXCHANGE_NAME = "direct_logs";
private static final String[] routingKeys = new String[]{"error"};
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//产生一个随机队列名称
String queueName = channel.queueDeclare().getQueue();
//根据路由关键字来进行队列和交换机的绑定
//这里一个队列绑定两个路由关键字(info,warning),即此队列可以获取交换机中routingKey为info和warning的消息
for(String routingKey : routingKeys){
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
System.out.println("ReceiveLogsDirect2 exchange:" + EXCHANGE_NAME + ",queue:"
+ queueName +",BindRoutingKey:" +routingKey );
}
System.out.println("ReceiveLogsDirect2 Waiting for message");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws UnsupportedEncodingException{
String message = new String(body,"UTF-8");
System.out.println("ReceiveLogsDirect2 Received " + envelope.getRoutingKey() + ":" + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
  接着先运行两个消费者,等待接收队列的消息,最后运行生产者,最终结果如下:
DSC0006.png

DSC0007.png

DSC0008.png


三:headers(标题):此类型的exchange使用的比较少,它也是忽略routingKey的一种路由方式。是使用Headers来匹配的。Headers是一个键值对,可以定义成Hashtable。
    发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。fanout,direct,topic exchange的routingKey都需要要字符串形式的,而headers exchange则没有这个要求,因为键值对的值可以是任何类型。
  实例:
    创建一个生产者类:



package com.mq.exchange.headers;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private final static String EXCHANGE_NAME = "header-exchange";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//创建一个交换机,并设置名称和类型
//参数1:交换机名称
//参数2:交换机类型
//参数3:是否持久化
//参数4:是否自动删除
//参数5:交换机的其他参数
channel.exchangeDeclare(EXCHANGE_NAME,"headers",false,true,null);  
Map<String, Object> map =new Hashtable<String, Object>();
map.put("ABC", "12345");
//创建properties
Builder properties = new Builder();
//把map放入headers中
            properties.headers(map);
//持久化,参数:1表示不持久话,2表示持久化
properties.deliveryMode(2);
//指定消息发送到交换机,绑定键值对headers
String message = "Hello RabbitMQ:headers";
channel.basicPublish(EXCHANGE_NAME, "", properties.build(),message.getBytes());
System.out.println("Producer Sent message :'" + message + "'");
channel.close();
connection.close();
}
}
  创建一个消费者类:



package com.mq.exchange.headers;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class Consumer {
private final static String EXCHANGE_NAME = "header-exchange";  
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"headers",false,true,null);  
Map<String, Object> getMap = new Hashtable<String, Object>();  
//all和any,any有一个键值对匹配即可
getMap.put("x-match", "any");
getMap.put("ABC", "12345");  
getMap.put("abc", "54321");  
//产生一个随机队列的名称
String queueName = channel.queueDeclare().getQueue();
//将交换机和随机队列进行绑定,并指定绑定的header键值对
channel.queueBind(queueName, EXCHANGE_NAME,"",getMap);
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws UnsupportedEncodingException{
String message = new String(body,"UTF-8");
System.out.println("Consumer Received:" + message);
}
};
channel.basicConsume(queueName, true,consumer);
}
}
  先运行消费者,然后运行生产者,得到结果如下:
DSC0009.png

DSC00010.png


四:topic(主题):这种应该属于模糊匹配  *:可以替代一个词   #:可以替代0或者更多的词
DSC00011.png

  下面我们看下实例来了解了解。
  首先创建一个发送类TopicSend



package com.mq.exchange.topic;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class TopicSend {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//待发送的消息
String[] routingKeys=new String[]{
"quick.orange.rabbit",
"lazy.orange.elephant",
"quick.orange.fox",
"lazy.brown.fox",
"quick.brown.fox",
"quick.orange.male.rabbit",
"lazy.orange.male.rabbit"
};
//发送消息
for(String routing : routingKeys){
String message = "From " + routing + "routingKeys message" ;
channel.basicPublish(EXCHANGE_NAME, routing, null, message.getBytes());
System.out.println("TopicSend Send " + routing + ":" + message);
}
channel.close();
connection.close();
}
}
  创建两个接收类:



package com.mq.exchange.topic;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class ReceiveLogsTopic1 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String queueName = channel.queueDeclare().getQueue();
//路由关键字
String[] routingKeys = new String[]{"*.orange.*"};
//绑定路由
for(String routing : routingKeys){
channel.queueBind(queueName, EXCHANGE_NAME, routing);
System.out.println("ReceiveLogsTopic1 exchange :" + EXCHANGE_NAME + ",queue:"
+ queueName +",BindRountingKey:" + routing );
}
System.out.println("ReceiveLogsTopic1 Waiting for messages");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws UnsupportedEncodingException{
String message = new String(body,"UTF-8");
System.out.println("ReceiveLogsTopic1 Received " + envelope.getRoutingKey() + ":" + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}


package com.mq.exchange.topic;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class ReceiveLogsTopic2 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String queueName = channel.queueDeclare().getQueue();
//路由关键字
String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"};
//绑定路由
for(String routing : routingKeys){
channel.queueBind(queueName, EXCHANGE_NAME, routing);
System.out.println("ReceiveLogsTopic2 exchange :" + EXCHANGE_NAME + ",queue:"
+ queueName +",BindRountingKey:" + routing );
}
System.out.println("ReceiveLogsTopic2 Waiting for messages");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws UnsupportedEncodingException{
String message = new String(body,"UTF-8");
System.out.println("ReceiveLogsTopic2 Received " + envelope.getRoutingKey() + ":" + message);
}
};
channel.basicConsume(queueName, true, consumer);
}
}
  最后先运行两个接收类,在运行发送类,结果如下所示:
DSC00012.png

DSC00013.png

DSC00014.png

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390571-1-1.html 上篇帖子: RabbitMQ二----' helllo world ' 下篇帖子: 菜鸟前端学算法
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表