设为首页 收藏本站
查看: 1165|回复: 0

[经验分享] RabbitMQ(二):mandatory标志的作用

[复制链接]

尚未签到

发表于 2017-7-2 12:51:15 | 显示全部楼层 |阅读模式
  本文转自:http://m.blog.csdn.net/article/details?id=54311277
  在生产者通过channel的basicPublish方法发布消息时,通常有几个参数需要设置,为此我们有必要了解清楚这些参数代表的具体含义及其作用,查看Channel接口,会发现存在3个重载的basicPublish方法



void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;
  他们共有的参数分别是:
        exchange:交换机名称
        routingKey:路由键
        props:消息属性字段,比如消息头部信息等等
        body:消息主体部分
        除此之外,还有mandatory和immediate这两个参数,鉴于RabbitMQ3.0不再支持immediate标志,因此我们重点讨论mandatory标志
        mandatory的作用:
  当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,否则就将消息return给发送者;
  下面我们通过几个实例测试下mandatory标志的作用:
        测试1:设置mandatory标志,且exchange未绑定队列



public class ProducerTest {
public static void main(String[] args) {
String exchangeName = "confirmExchange";
String queueName = "confirmQueue";
String routingKey = "confirmRoutingKey";
String bindingKey = "confirmBindingKey";
int count = 3;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.16.151.74");
factory.setUsername("test");
factory.setPassword("test");
factory.setPort(5672);
//创建生产者
Sender producer = new Sender(factory, count, exchangeName, routingKey);
producer.run();
}
}
class Sender
{
private ConnectionFactory factory;
private int count;
private String exchangeName;
private String routingKey;
public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {
this.factory = factory;
this.count = count;
this.exchangeName = exchangeName;
this.routingKey = routingKey;
}
public void run() {
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//创建exchange
channel.exchangeDeclare(exchangeName, "direct", true, false, null);
//发送持久化消息
for(int i = 0;i < count;i++)
{
//第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
  第45行我们将basicPublish的第三个参数mandatory设置成了true,表示开启了mandatory标志,但我们没有为当前exchange绑定任何队列;
  通过wireshark抓包看到下面输出:   DSC0000.jpg
  可以看到最后执行了basic.return方法,将发布者发出的消息返还给了发布者,查看协议的Arguments参数部分可以看到,Reply-Text字段值为:NO_ROUTE,表示消息并没有路由到合适的队列中;
  那么我们该怎么获取到没有被正确路由到合适队列的消息呢?这时候可以通过为channel信道设置ReturnListener监听器来实现,具体实现代码见下:



public class ProducerTest {
public static void main(String[] args) {
String exchangeName = "confirmExchange";
String queueName = "confirmQueue";
String routingKey = "confirmRoutingKey";
String bindingKey = "confirmBindingKey";
int count = 3;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.16.151.74");
factory.setUsername("test");
factory.setPassword("test");
factory.setPort(5672);
//创建生产者
Sender producer = new Sender(factory, count, exchangeName, routingKey);
producer.run();
}
}
class Sender
{
private ConnectionFactory factory;
private int count;
private String exchangeName;
private String routingKey;
public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {
this.factory = factory;
this.count = count;
this.exchangeName = exchangeName;
this.routingKey = routingKey;
}
public void run() {
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//创建exchange
channel.exchangeDeclare(exchangeName, "direct", true, false, null);
//发送持久化消息
for(int i = 0;i < count;i++)
{
//第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,
//因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话
//我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());
}
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
throws IOException {
//此处便是执行Basic.Return之后回调的地方
String message = new String(arg5);
System.out.println("Basic.Return返回的结果:  "+message);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
  在设置了ReturnListener监听器之后,broker(代理服务器)发出basic.return方法之后,就会回调第52行的handleReturn方法,在这个方法里面我们就可以进行消息的重新发布操作啦;
  测试2:设置mandatory标志,且为exchange绑定队列(路由键和绑定键一致)



public class ProducerTest {
public static void main(String[] args) {
String exchangeName = "confirmExchange";
String queueName = "confirmQueue";
String routingKey = "confirmRoutingKey";
String bindingKey = "confirmRoutingKey";
//String bindingKey = "confirmBindingKey";
int count = 3;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("172.16.151.74");
factory.setUsername("test");
factory.setPassword("test");
factory.setPort(5672);
//创建生产者
Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
producer.run();
}
}
class Sender
{
private ConnectionFactory factory;
private int count;
private String exchangeName;
private String     queueName;
private String routingKey;
private String bindingKey;
public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
this.factory = factory;
this.count = count;
this.exchangeName = exchangeName;
this.queueName = queueName;
this.routingKey = routingKey;
this.bindingKey = bindingKey;
}
public void run() {
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//创建exchange
channel.exchangeDeclare(exchangeName, "direct", true, false, null);
//创建队列
channel.queueDeclare(queueName, true, false, false, null);
//绑定exchange和queue
            channel.queueBind(queueName, exchangeName, bindingKey);
//发送持久化消息
for(int i = 0;i < count;i++)
{
//第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,
//因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话
//我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());
}
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
throws IOException {
//此处便是执行Basic.Return之后回调的地方
String message = new String(arg5);
System.out.println("Basic.Return返回的结果:  "+message);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
  通过抓包发现并不会有basic.return方法被调用,查看RabbitMQ管理界面发现消息已经到达了队列;


DSC0001.jpg

  测试3:设置mandatory标志,且exchange绑定队列(路由键和绑定键不一致)
  代码就是把测试2中第6行注释,第7行注释打开,注意到此时的routingKey和bindingKey是不一致的,此时我们运行程序,同时抓包得到下面截图:
DSC0002.jpg

  注意一点,我们发送了三条消息,那么相应的应该执行三次basic.return,其中第一次和第二次basic.return显示在一行上了,第三次是单独一行,不要误认为只执行了两次,从协议的具体返回内容里我们同样看到了Reply-Text字段值是NO_ROUTE,这种现象在测试1中已经见过了;
  到此,我们明白了mandatory标志的作用:在消息没有被路由到合适队列情况下会将消息返还给消息发布者,同时我们测试了哪些情况下消息不会到达合适的队列,测试1演示的是创建了exchange但是没有为他绑定队列导致的消息未到达合适队列,测试3演示的是创建了exchange同时创建了queue,但是在将两者绑定的时候,使用的bindingKey和消息发布者使用的rountingKey不一致导致的消息未到达合适队列;

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390337-1-1.html 上篇帖子: Outlook 客户端无法通过 MAPI over HTTP协议 连接 下篇帖子: AMQP 0-9-1模型简介
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表