设为首页 收藏本站
查看: 1460|回复: 0

[经验分享] RabbitMQ】三种Exchange模式——订阅、路由、通配符模式

[复制链接]

尚未签到

发表于 2018-7-5 06:08:17 | 显示全部楼层 |阅读模式
  这篇博客介绍订阅、路由和通配符模式,之所以放在一起介绍,是因为这三种模式都是用了Exchange交换机,消息没有直接发送到队列,而是发送到了交换机,经过队列绑定交换机到达队列。
  性能排序:fanout > direct >> topic。比例大约为11:10:6
一、订阅模式(Fanout Exchange):
  一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的exchange上面,那么该消息将会丢失,这是因为在rabbitMQ中exchange不具备存储消息的能力,只有队列具备存储消息的能力。


  任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。
  1.可以理解为路由表的模式
  2.这种模式不需要RouteKey
  3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。
  4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。
  示例代码:
  生产者:
  [java] view plain copy

  •   public class Send {

  •   private final static String EXCHANGE_NAME = "test_exchange_fanout";

  •   public static void main(String[] argv) throws Exception {
  •   // 获取到连接以及mq通道
  •   Connection connection = ConnectionUtil.getConnection();
  •   //从连接中创建通道
  •   Channel channel = connection.createChannel();

  •   // 声明exchange
  •   channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

  •   // 消息内容
  •   String message = "商品已经新增,id = 1000";
  •   channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

  •   System.out.println(" [x] Sent '" + message + "'");

  •   channel.close();
  •   connection.close();
  •   }
  •   }
  消费者1:
  [java] view plain copy

  •   public class Recv {

  •   private final static String QUEUE_NAME = "test_queue_fanout_1";

  •   private final static String EXCHANGE_NAME = "test_exchange_fanout";

  •   public static void main(String[] argv) throws Exception {

  •   // 获取到连接以及mq通道
  •   Connection connection = ConnectionUtil.getConnection();
  •   Channel channel = connection.createChannel();

  •   // 声明队列
  •   channel.queueDeclare(QUEUE_NAME, false, false, false, null);

  •   // 绑定队列到交换机
  •   channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

  •   // 同一时刻服务器只会发一条消息给消费者
  •   channel.basicQos(1);

  •   // 定义队列的消费者
  •   QueueingConsumer consumer = new QueueingConsumer(channel);
  •   // 监听队列,手动返回完成
  •   channel.basicConsume(QUEUE_NAME, true, consumer);

  •   // 获取消息
  •   while (true) {
  •   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  •   String message = new String(delivery.getBody());
  •   System.out.println(" 前台系统: '" + message + "'");
  •   Thread.sleep(10);

  •   channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  •   }
  •   }
  •   }
  消费者2的代码和消费者1的代码大致相同,只是队列的名称不一样,这样两个消费者有自己的队列,都可以接收到生产者发送的消息
  但是如果生产者有新增商品,修改商品,删除商品的消息,消费者包快前台系统和搜索系统,要求前台系统接收修改和删除商品的消息,搜索系统接收新增商品、修改商品和删除商品的消息。所以使用这种订阅模式实现商品数据的同步并不合理。因此我们介绍下一种模式:路由模式。
二、路由模式(Direct Exchange)
  这种模式添加了一个路由键,生产者发布消息的时候添加路由键,消费者绑定队列到交换机时添加键值,这样就可以接收到需要接收的消息。


  任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue
  1.一般情况可以使用rabbitMQ自带的Exchange:”"(该Exchange的名字为空字符串,下文称其为default Exchange)。
  2.这种模式下不需要将Exchange进行任何绑定(binding)操作
  3.消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。
  4.如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。
  示例代码:
  生产者:
  [java] view plain copy

  •   public class Send {

  •   private final static String EXCHANGE_NAME = "test_exchange_direct";

  •   public static void main(String[] argv) throws Exception {
  •   // 获取到连接以及mq通道
  •   Connection connection = ConnectionUtil.getConnection();
  •   Channel channel = connection.createChannel();

  •   // 声明exchange
  •   channel.exchangeDeclare(EXCHANGE_NAME, "direct");

  •   // 消息内容
  •   String message = "删除商品, id = 1001";
  •   channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
  •   System.out.println(" [x] Sent '" + message + "'");

  •   channel.close();
  •   connection.close();
  •   }
  •   }
  消费者1:接收更新和删除消息
  [java] view plain copy

  •   public class Recv {

  •   private final static String QUEUE_NAME = "test_queue_direct_1";

  •   private final static String EXCHANGE_NAME = "test_exchange_direct";

  •   public static void main(String[] argv) throws Exception {

  •   // 获取到连接以及mq通道
  •   Connection connection = ConnectionUtil.getConnection();
  •   Channel channel = connection.createChannel();

  •   // 声明队列
  •   channel.queueDeclare(QUEUE_NAME, false, false, false, null);

  •   // 绑定队列到交换机
  •   channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
  •   channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

  •   // 同一时刻服务器只会发一条消息给消费者
  •   channel.basicQos(1);

  •   // 定义队列的消费者
  •   QueueingConsumer consumer = new QueueingConsumer(channel);
  •   // 监听队列,手动返回完成
  •   channel.basicConsume(QUEUE_NAME, false, consumer);

  •   // 获取消息
  •   while (true) {
  •   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  •   String message = new String(delivery.getBody());
  •   System.out.println(" 前台系统: '" + message + "'");
  •   Thread.sleep(10);

  •   channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  •   }
  •   }
  •   }
  消费者2:接收insert,update,delete的消息
  [java] view plain copy

  •   public class Recv2 {

  •   private final static String QUEUE_NAME = "test_queue_direct_2";

  •   private final static String EXCHANGE_NAME = "test_exchange_direct";

  •   public static void main(String[] argv) throws Exception {

  •   // 获取到连接以及mq通道
  •   Connection connection = ConnectionUtil.getConnection();
  •   Channel channel = connection.createChannel();

  •   // 声明队列
  •   channel.queueDeclare(QUEUE_NAME, false, false, false, null);

  •   // 绑定队列到交换机
  •   channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
  •   channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
  •   channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");

  •   // 同一时刻服务器只会发一条消息给消费者
  •   channel.basicQos(1);

  •   // 定义队列的消费者
  •   QueueingConsumer consumer = new QueueingConsumer(channel);
  •   // 监听队列,手动返回完成
  •   channel.basicConsume(QUEUE_NAME, false, consumer);

  •   // 获取消息
  •   while (true) {
  •   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  •   String message = new String(delivery.getBody());
  •   System.out.println(" 搜索系统: '" + message + "'");
  •   Thread.sleep(10);

  •   channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  •   }
  •   }
  •   }
  如果生产者发布了insert消息,那么消费者2可以收到,消费者 1收不到,如果发布了update或者delete消息,两个消费者都可以收到。如果发布ABC消息两个消费者都收不到,因为没有绑定这个键值。这种模式基本满足了我们的需求,但是还不够灵活,下面介绍另外一个模式。
三、通配符模式(Topic Exchange)
  基本思想和路由模式是一样的,只不过路由键支持模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词



  任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上
  1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
  2.这种模式需要RouteKey,也许要提前绑定Exchange与Queue。
  3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
  4.“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
  5.同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。
示例代码:
  生产者:
  [java] view plain copy

  •   public class Send {

  •   private final static String EXCHANGE_NAME = "test_exchange_topic";

  •   public static void main(String[] argv) throws Exception {
  •   // 获取到连接以及mq通道
  •   Connection connection = ConnectionUtil.getConnection();
  •   Channel channel = connection.createChannel();

  •   // 声明exchange
  •   channel.exchangeDeclare(EXCHANGE_NAME, "topic");

  •   // 消息内容
  •   String message = "删除商品,id = 1001";
  •   channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
  •   System.out.println(" [x] Sent '" + message + "'");

  •   channel.close();
  •   connection.close();
  •   }
  •   }
  消费者1:
  [java] view plain copy

  •   public class Recv {

  •   private final static String QUEUE_NAME = "test_queue_topic_1";

  •   private final static String EXCHANGE_NAME = "test_exchange_topic";

  •   public static void main(String[] argv) throws Exception {

  •   // 获取到连接以及mq通道
  •   Connection connection = ConnectionUtil.getConnection();
  •   Channel channel = connection.createChannel();

  •   // 声明队列
  •   channel.queueDeclare(QUEUE_NAME, false, false, false, null);

  •   // 绑定队列到交换机
  •   channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
  •   channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");

  •   // 同一时刻服务器只会发一条消息给消费者
  •   channel.basicQos(1);

  •   // 定义队列的消费者
  •   QueueingConsumer consumer = new QueueingConsumer(channel);
  •   // 监听队列,手动返回完成
  •   channel.basicConsume(QUEUE_NAME, false, consumer);

  •   // 获取消息
  •   while (true) {
  •   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  •   String message = new String(delivery.getBody());
  •   System.out.println(" 前台系统: '" + message + "'");
  •   Thread.sleep(10);

  •   channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  •   }
  •   }
  •   }
  消费者2:
  [java] view plain copy

  •   public class Recv2 {

  •   private final static String QUEUE_NAME = "test_queue_topic_2";

  •   private final static String EXCHANGE_NAME = "test_exchange_topic";

  •   public static void main(String[] argv) throws Exception {

  •   // 获取到连接以及mq通道
  •   Connection connection = ConnectionUtil.getConnection();
  •   Channel channel = connection.createChannel();

  •   // 声明队列
  •   channel.queueDeclare(QUEUE_NAME, false, false, false, null);

  •   // 绑定队列到交换机
  •   channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#");

  •   // 同一时刻服务器只会发一条消息给消费者
  •   channel.basicQos(1);

  •   // 定义队列的消费者
  •   QueueingConsumer consumer = new QueueingConsumer(channel);
  •   // 监听队列,手动返回完成
  •   channel.basicConsume(QUEUE_NAME, false, consumer);

  •   // 获取消息
  •   while (true) {
  •   QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  •   String message = new String(delivery.getBody());
  •   System.out.println(" 搜索系统: '" + message + "'");
  •   Thread.sleep(10);

  •   channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  •   }
  •   }
  •   }
  消费者1是按需索取,并没有使用通配符模式,而是用的完全匹配,消费者2使用通配符模式,这样以item.开头的消息都会全部接收。
小结:
  1.与简单模式和work模式对比,前面两种同一个消息只能被一个消费者获取,而今天的这三种模式,可以实现一个消息被多个消费者 获取。
  2.fanout这种模式没有加入路由器,队列与exchange绑定后,就会接收到所有的消息,其余两种增加了路由键,并且第三种增加通配符,更加便利。
  本文出自https://blog.csdn.net/ww130929/article/details/72842234

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-533796-1-1.html 上篇帖子: 通过测试发现的Exchange 2013 CU16存在的一个小bug-jialt的博客 下篇帖子: exchange控制台没有重置用户密码选项解决办法
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表