设为首页 收藏本站
查看: 1084|回复: 0

[经验分享] RabbitMQ入门教程——路由(Routing)

[复制链接]

尚未签到

发表于 2017-7-2 18:19:14 | 显示全部楼层 |阅读模式
  绑定( Bindings)
  之前的文章中我们已经创建过bindings,代码如下:
        channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: ROUTING_KEY, arguments: null);
  绑定(bindings)是指交换机(exchange)与队列(queue)之间的关系。可以简单的理解为:队列(queue)对所绑定的交换机(exchange)上的消息感兴趣,交换机(exchange)要把它接收到的消息推送到队列(queue)中。
  绑定的时候需要带上一个额外的参数routingKey,为避免与BasicPublish中的路由键(routing key)参数混淆,我们称之为绑定键(binding key),以下是如何创建一个绑定。
      channel.QueueBind(queue: queue, exchange: EXCHANGE_NAME, routingKey: "error", arguments: null);
  注意:

  • 参数routingKey为空时,也是一个绑定键
  • 绑定键的意义依赖于exchange type。如:如果exchange type 为 fanout 时,绑定键没有任何意义。
  直连交换机(direct exchange)
  在之前的发布订阅中我们已经讲到直连交换机,我们了解到直连交换机的工作方式为——交换机(exchange)会对绑定键(binding key)与 路由键(routing key)进行精确匹配,然后将消息发送到能够匹配成功的队列中。
  下图能够很好的描述整个场景:
DSC0000.png

  在这个场景中,可以看出直连交换机X和队列(Q1与Q2)进行了绑定。Q1队列使用orange为绑定键(binding key),Q2有两个绑定,分别以black和green作为绑定键(binding key)。
  这样以来,当路由键为orange的消息发送到交换机,就会被路由到队列Q1,路由键为black和green的下拍戏就会被路由到Q2,其它的消息将会被丢弃。
  多重绑定(multiple bindings)
  多重绑定即使用一个绑定键(binding key)绑定到多个队列,这是完全合法的,而且每个队列都能得到完全相同的信息。
  示例
  接下来我们就使用direct exchange完善之前的日志功能
  1.日志级别为error的日志保存的到txt文件中
  2.日志级别为log的日志输出到控制台面板
  3.输出所有的日志到控制台面板
  生产者 RoutingProducer.cs
  using System;
  using System.Collections.Generic;
  using System.Linq;
  using System.Text;
  using System.Threading.Tasks;
  using RabbitMQ.Client;
  using System.Threading;
  namespace RabbitMQProducer
  {
      public class RoutingProducer
      {
          const string EXCHANGE_NAME = "ROUTING_EXCHANGE";
          static readonly List<string> LEVELS = new List<string>() { "error", "log" };
          public static void Send()
          {
              ConnectionFactory factory = new ConnectionFactory() { HostName = "localhost" };
              using (IConnection connection = factory.CreateConnection())
              {
                  using (IModel channel = connection.CreateModel())
                  {
                      //创建交换机类型为 direct 的交换机
                      channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Direct);
                      for (int i = 0; i < 20; i++)
                      {
                          Thread.Sleep(100);
                          string level = GetLevels();
                          string message = $"日志信息:{i}——日志等级:{level}";
                          //发送消息至之前创建的交换机,并设置路由键为日志级别
                          channel.BasicPublish(exchange: EXCHANGE_NAME, routingKey: level, basicProperties: null, body: Encoding.UTF8.GetBytes(message));
                          Console.WriteLine(message);
                      }
                      Console.WriteLine(" Press [enter] to exit.");
                      Console.ReadLine();
                  }
              }
          }
          private static string GetLevels()
          {
              return LEVELS[new Random().Next(0, 2)];
          }
      }
  }
  消费者 RoutingConsumer.cs
  using RabbitMQ.Client;
  using System;
  using System.Collections.Generic;
  using System.Linq;
  using System.Text;
  using System.Threading.Tasks;
  using RabbitMQ.Client.Events;
  using System.IO;
  namespace RabbitMQConsumer
  {
      public class RoutingConsumer
      {
          const string EXCHANGE_NAME = "ROUTING_EXCHANGE";
          /// <summary>
          /// 是否使用多重绑定将所有日志级别消息输出到控制台
          /// 默认只是输出日志级别为log的内容到控制台
          /// </summary>
          /// <param name="all"></param>
          public static void Log(bool all = false)
          {
              var factory = new ConnectionFactory()
              {
                  HostName = "127.0.0.1"
              };
              using (var connection = factory.CreateConnection())
              {
                  using (var channel = connection.CreateModel())
                  {
                      channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Direct);
                      //每次运行consumer客户端都创建一个新的queue,并且绑定到对应的exchange,这样使每次发送消息到exchange时就能把消息由exchange传递到所绑定的queue
                      QueueDeclareOk queue = channel.QueueDeclare();
                      string queueName = queue.QueueName;
                      channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: "log", arguments: null);
                      if (all)
                      {
                          channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: "error", arguments: null);
                      }
                      EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                      consumer.Received += (sender, e) =>
                      {
                          string message = Encoding.UTF8.GetString(e.Body);
                          Console.WriteLine($"LOG——日志信息:{message}");
                      };
                      channel.BasicConsume(queueName, noAck: true, consumer: consumer);
                      Console.WriteLine(" Press [enter] to exit.");
                      Console.ReadLine();
                  }
              }
          }
          public static void Error()
          {
              var factory = new ConnectionFactory() { HostName = "127.0.0.1" };
              using (IConnection connection = factory.CreateConnection())
              {
                  using (IModel channel = connection.CreateModel())
                  {
                      //创建交换机类型为 direct 的交换机
                      channel.ExchangeDeclare(exchange: EXCHANGE_NAME, type: ExchangeType.Direct);
                      //创建一个未命名的新的消息队列,该队列名称有系统自动分配,并且为非持久化,在该队列没有订阅时自动删除的排它队列
                      QueueDeclareOk queue = channel.QueueDeclare();
                      string queueName = queue.QueueName;
                      //绑定exchange 与 queue 并设置路由键为日志级别error
                      channel.QueueBind(queue: queue, exchange: EXCHANGE_NAME, routingKey: "error", arguments: null);
                      EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                      consumer.Received += (sender, arg) =>
                      {
                          string message = Encoding.UTF8.GetString(arg.Body);
                          //写入日志到txt文件
                          using (StreamWriter writer = new StreamWriter(@"c:\log\log.txt", true, Encoding.UTF8))
                          {
                              writer.WriteLine(message);
                              writer.Close();
                          }
                      };
                      channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);
                  }
              }
          }
      }
  }

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390536-1-1.html 上篇帖子: Spring AMQP 下篇帖子: python2.0_s12_day10_rabbitMQ使用介绍
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表