设为首页 收藏本站
查看: 789|回复: 0

[经验分享] RabbitMQ入门教程——发布/订阅

[复制链接]

尚未签到

发表于 2017-7-2 17:36:15 | 显示全部楼层 |阅读模式
  什么是发布订阅
  发布订阅是一种设计模式定义了一对多的依赖关系,让多个订阅者对象同时监听某一个主题对象。这个主题对象在自身状态变化时,会通知所有的订阅者对象,使他们能够自动更新自己的状态。
  为了描述这种模式,我们将会构建一个简单的日志系统。它包括两个程序——第一个程序负责发送日志消息,第二个程序负责获取消息并输出内容。在我们的这个日志系统中,所有正在运行的接收方程序都会接受消息。我们用其中一个接收者(receiver)把日志写入硬盘中,另外一个接受者(receiver)把日志输出到屏幕上。最终,日志消息被广播给所有的接受者(receivers)。
  Exchanges
  RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,生产者只能发送消息给到exchange,exchange比较简单,一边从生产者就收消息,一边把消息推送到队列中。exchange必须清楚的知道消息应该按照什么规则路由到对应的队列中,而具体使用那种路由算法是由exchange type决定的。AMQP协议提供了四种交换机类型:

  Name(交换机类型)
  Default pre-declared names(预声明的默认名称)
  Direct exchange(直连交换机)
  (Empty string) and amq.direct
  Fanout exchange(扇型交换机)
  amq.fanout
  Topic exchange(主题交换机)
  amq.topic
  Headers exchange(头交换机)
  amq.match (and amq.headers in RabbitMQ)
  除交换机类型外,在声明交换机时还可以附带许多其他的属性,其中最重要的几个分别是:


  • Name

  • Durability (消息代理重启后,交换机是否还存在)

  • Auto-delete (当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它)

  • Arguments(依赖代理本身)  交换机可以有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。然而并不是所有的应用场景都需要持久化的交换机。
      本文中具体讲解下以下两种交换机:直连交换机(前面几个例子中使用的交换机类型),扇形交换机(本文中要使用的交换机类型)
      直连交换机
      直连交换机(direct exchange)可以使用消息携带的路由键(routing key)将消息投递给对应的队列中。用来处理消息的单播路由(unicast routing),也可以处理多播路由。
      那么它具体是如何工作的呢


    • 将一个队列绑定到某个交换机上,同时给该绑定指定一个路由键(routing key)

    • 当一个携带路由键为R的消息被发送到直连交换机时,交换机会把它路由给绑定值同样为R的队列。
      直连交换机经常用来循环分发任务给多个工作者,当这样做时,一定要明白,这时消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)中。
      直连交换机图例:
    DSC0000.png

      扇形交换机
      扇形交换机(funout exchange)将消息路由给绑定到它身上的所有队列,不关心所绑定的路由键(routing key)。扇形交换机用来处理消息的广播路由(broadcast routing)。
      由于扇形交换机投递消息到所有绑定他的队列,以下几个场景比较适合使用扇形交换机:


    • 大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件

    • 体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端

    • 分发系统使用它来广播各种状态和配置更新

    • 在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP没有内置presence的概念,因此XMPP可能会是个更好的选择)
      扇形交换机图例
    DSC0001.png

      创建exchange
                          channel.ExchangeDeclare(exchange: "log_exchange", //exchange 名称
                              type: ExchangeType.Fanout, //exchange 类型
                              durable: false,
                              autoDelete: false,
                              arguments: null);
      临时队列
      之前的几个示例中我们在为每一个声名的队列都指定了一个名字,因为我们希望consumer指向正确的队列。当我们希望在生产者和消费者之间共享队列时,为队列命名就非常的重要了。
      不过我们要实现的日志系统只是想要得到所有的消息,而且只对当前正在传递的消息感兴趣,并不关心队列的名称,所以为了满足我们的需求,要做两件事情:
      无论什么时间连接到RabbitMQ我们都需要一个新的空的队列。为了达到目的我们可以使用随机数创建队列,或让服务器给我们提供一个随机的名称。
      一旦消费者与RabbitMQ断开,消费者所接受的队列都应该被自动删除。
      创建临时队列
                          //创建一个未命名的新的消息队列,
                          QueueDeclareOk queue = channel.QueueDeclare(queue: "", //队列名称,为空时有系统自动分配
                              durable: false,
                              exclusive: false,
                              autoDelete: true,//自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
                              arguments: null);
                          //或
                          //queue = channel.QueueDeclare();
      绑定
      我们已经创建了一个扇型交换机(fanout)和一个队列。现在我们需要告诉交换机如何发送消息给我们的队列。交换器和队列之间的联系我们称之为绑定(binding)
      创建交换机与队列的关系
      //扇形交换机(funout exchange)将消息路由给绑定到它身上的所有队列,不关心所绑定的路由键(routing key)
                          //fanout exchange不需要指定routing key 指定了也没用
                          //通过绑定告诉exchange 需要发送消息到哪些消息队列
                          channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, arguments: null);
      完整代码:
      生产者  Pub_SubProducer.cs
      using System;
      using System.Collections.Generic;
      using System.Linq;
      using System.Text;
      using System.Threading.Tasks;
      using RabbitMQ.Client;
      namespace RabbitMQProducer
      {
          public class Pub_SubProducer
          {
              const string EXCHANGE_NAME = "log_exchange";
              const string ROUTING_KEY = "";
              //直接发送消息到交换机
              public static void Publish()
              {
                  var factory = new ConnectionFactory()
                  {
                      HostName = "127.0.0.1"
                  };
                  using (var connection = factory.CreateConnection())
                  {
                      using (IModel channel = connection.CreateModel())
                      {
                          channel.ExchangeDeclare(exchange: EXCHANGE_NAME, //exchange 名称
                              type: ExchangeType.Fanout, //exchange 类型
                              durable: false,
                              autoDelete: false,
                              arguments: null);
                          Parallel.For(1, 100, item =>
                          {
                              string message = $"日志内容{DateTime.Now.ToString()}";
                              channel.BasicPublish(exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, basicProperties: null, body: Encoding.UTF8.GetBytes(message));
                              Console.WriteLine(message);
                          });
                          Console.WriteLine(" Press [enter] to exit.");
                          Console.ReadLine();
                      }
                  }
              }
          }
      }
      消费者 Pub_SubConsumer.cs
      using RabbitMQ.Client;
      using System;
      using System.Collections.Generic;
      using System.Linq;
      using System.Text;
      using System.Threading.Tasks;
      using RabbitMQ.Client.Events;
      using System.IO;
      namespace RabbitMQConsumer
      {
          public class Pub_SubConsumer
          {
              const string EXCHANGE_NAME = "log_exchange";
              const string ROUTING_KEY = "";
              //输出到屏幕
              public static void Subscribe()
              {
                  var factory = new ConnectionFactory()
                  {
                      HostName = "127.0.0.1"
                  };
                  using (var connection = factory.CreateConnection())
                  {
                      using (IModel channel = connection.CreateModel())
                      {
                          channel.ExchangeDeclare(exchange: EXCHANGE_NAME, //exchange 名称
                              type: ExchangeType.Fanout, //exchange 类型
                              durable: false,
                              autoDelete: false,
                              arguments: null);
                          //创建一个未命名的新的消息队列,
                          QueueDeclareOk queue = channel.QueueDeclare(queue: "", //队列名称,为空时有系统自动分配
                              durable: false,
                              exclusive: false,
                              autoDelete: true,//自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
                              arguments: null);
                          //或
                          //queue = channel.QueueDeclare();
                          string queueName = queue.QueueName;
                          //扇形交换机(funout exchange)将消息路由给绑定到它身上的所有队列,不关心所绑定的路由键(routing key)
                          //fanout exchange不需要指定routing key 指定了也没用
                          //通过绑定告诉exchange 需要发送消息到哪些消息队列
                          channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, arguments: null);
                          EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                          consumer.Received += (sender, args) =>
                          {
                              string message = Encoding.UTF8.GetString(args.Body);
                              Console.WriteLine(message);
                          };
                          channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);
                          Console.WriteLine(" Press [enter] to exit.");
                          Console.ReadLine();
                      }
                  }
              }
              /// <summary>
              /// 输出到文件
              /// </summary>
              public static void SubscribeFile()
              {
                  var factory = new ConnectionFactory()
                  {
                      HostName = "127.0.0.1"
                  };
                  using (var connection = factory.CreateConnection())
                  {
                      using (IModel channel = connection.CreateModel())
                      {
                          channel.ExchangeDeclare(exchange: EXCHANGE_NAME, //exchange 名称
                              type: ExchangeType.Fanout, //exchange 类型
                              durable: false,
                              autoDelete: false,
                              arguments: null);
                          //创建一个未命名的新的消息队列,
                          QueueDeclareOk queue = channel.QueueDeclare(queue: "", //队列名称,为空时有系统自动分配
                              durable: false,
                              exclusive: false,
                              autoDelete: true,//自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
                              arguments: null);
                          //或
                          //queue = channel.QueueDeclare();
                          string queueName = queue.QueueName;
                          //扇形交换机(funout exchange)将消息路由给绑定到它身上的所有队列,不关心所绑定的路由键(routing key)
                          //fanout exchange不需要指定routing key 指定了也没用
                          //通过绑定告诉exchange 需要发送消息到哪些消息队列
                          channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, arguments: null);
                          EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                          consumer.Received += (sender, args) =>
                          {
                              string message = Encoding.UTF8.GetString(args.Body);
                              //写入日志到txt文件
                              using (StreamWriter writer = new StreamWriter(@"c:\log\log.txt", true, Encoding.UTF8))
                              {
                                  writer.WriteLine(message);
                                  writer.Close();
                              }
                              Console.WriteLine(message);
                          };
                          channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);
                          Console.WriteLine(" Press [enter] to exit.");
                          Console.ReadLine();
                      }
                  }
              }
          }
      }
      运行以上实例代码发现,每个订阅者实例 都能得到相同的内容。


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390512-1-1.html 上篇帖子: 二叉树 下篇帖子: RabbitMQ模拟器使用方法
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表