设为首页 收藏本站
查看: 1039|回复: 0

[经验分享] RabbitMQ 官方NET教程(五)【Topic】

[复制链接]

尚未签到

发表于 2017-12-9 17:34:29 | 显示全部楼层 |阅读模式
  在上一个教程中,我们改进了我们的日志记录系统。我们使用direct类型转发器,使得接收者有能力进行选择性的接收日志,,而非fanout那样,只能够无脑的转发
  虽然使用direct类型改进了我们的系统,但它仍然存在一些局限性 - 它不能够基于多重条件进行路由选择。
  在我们的日志记录系统中,我们可能不仅要根据严重性订阅日志,还可以基于发出日志的源进行订阅。您可能会从unix工具syslog 中了解此概念,该工具根据严重性(info / warn / crit ...)和设备(auth / cron / kern ...)转发日志。
  这将给我们带来很大的灵活性 - 我们可能想要订阅来自cron的严重错误,也可以听kern的所有日志。
  为了在我们的系统中实现上述的需求,我们需要学习稍微复杂的主题类型的转发器(topic exchange)

主题转发(Topic Exchange)
  发往主题类型的转发器的消息不能随意的设置选择键(routing_key) - 它必须是由点分隔的单词列表。这些单词可以是任何东西,但通常它们指定与消息相关联的一些功能。几个有效的路由密钥示例:stock.usd.nyse,nyse.vmw,quick.orange.rabbit。路由密钥中可以有任意多的单词,最多可达255个字节。
  绑定键也必须是相同的形式。主题类型的转发器背后的逻辑和直接类型的转发器很类似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。:
  

*(星)可以替代一个单词。  
#(哈希)可以替换零个或多个单词。
  

  在一个例子中最简单的解释一下:
  
DSC0000.jpg
  在这个例子中,我们将发送所有描述动物的消息。消息会附加一个选择键包含将使用由三个单词标识符(两个点隔开)。第一个单词标识符描述速度,第二个单词标识符描述动物的颜色,和第三个单词标识符描述动物的物种:<speed>.<color>.<species>。
  我们创建了三个绑定:Q1绑定键*.orange.*和Q2与*.*.rabbit和lazy.#绑定。
  这些绑定可以总结为:
  

Q1对所有的橙色动物感兴趣。  
Q2想听听有关兔子的一切,以及关于懒惰动物的一切。
  

  一个附带quick.orange.rabbit的选择键的消息将会被转发到两个队列。附带lazy.orange.elephant的消息也会被转发到两个队列。另一方面,quick.orange.fox只会转到Q1,而lazy.brown.fox只能被转发到Q2。 lazy.pink.rabbit虽然与两个绑定键匹配,但是也只会被转发到Q2一次。 quick.brown.fox不能与任何绑定键匹配,因此它将被丢弃。
  如果我们违法我们的约定,发送一个或者四个标识符的选择键,类似:orange,quick.orange.male.rabbit,这些选择键不能与任何绑定键匹配,所以消息将会被丢弃。
  另一方面,lazy.orange.male.rabbit虽然是四个标识符,也可以与lazy.#匹配,从而转发至Q2。

Topic exchange
  

主题类型的转发器非常强大,可以实现其他类型的转发器。  

  当队列与`#`(哈希)绑定键绑定时,它将接收所有消息,而不管路由键,类似`fanout`类型转发器。
  

  当特殊字符`*`(星号)和`#`(哈希)在绑定中不被使用时,主题转发器将类似direct类型转发器。
  

完整的例子
  我们将在我们的日志记录系统中使用topic转发器。 我们将从一个工作假设开始,日志的选择键将有两个单词:<facility>.<severity>。
  EmitLogTopic.cs的代码:
  

using System;  
using System.Linq;
  
using RabbitMQ.Client;
  
using System.Text;
  

  
class EmitLogTopic
  
{
  public static void Main(string[] args)
  {
  var factory = new ConnectionFactory() { HostName = "localhost" };
  using(var connection = factory.CreateConnection())
  using(var channel = connection.CreateModel())
  {
  channel.ExchangeDeclare(exchange: "topic_logs",
  type: "topic");
  

  var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";
  var message = (args.Length > 1)
  ? string.Join(" ", args.Skip( 1 ).ToArray())
  : "Hello World!";
  var body = Encoding.UTF8.GetBytes(message);
  channel.BasicPublish(exchange: "topic_logs",
  routingKey: routingKey,
  basicProperties: null,
  body: body);
  Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
  }
  }
  
}
  

  ReceiveLogsTopic.cs的代码:
  

using System;  
using RabbitMQ.Client;
  
using RabbitMQ.Client.Events;
  
using System.Text;
  

  
class ReceiveLogsTopic
  
{
  public static void Main(string[] args)
  {
  var factory = new ConnectionFactory() { HostName = "localhost" };
  using(var connection = factory.CreateConnection())
  using(var channel = connection.CreateModel())
  {
  channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
  var queueName = channel.QueueDeclare().QueueName;
  

  if(args.Length < 1)
  {
  Console.Error.WriteLine("Usage: {0} [binding_key...]",
  Environment.GetCommandLineArgs()[0]);
  Console.WriteLine(" Press [enter] to exit.");
  Console.ReadLine();
  Environment.ExitCode = 1;
  return;
  }
  

  foreach(var bindingKey in args)
  {
  channel.QueueBind(queue: queueName,
  exchange: "topic_logs",
  routingKey: bindingKey);
  }
  

  Console.WriteLine("
  • Waiting for messages. To exit press CTRL+C");
      

      var consumer = new EventingBasicConsumer(channel);
      consumer.Received += (model, ea) =>
      {
      var body = ea.Body;
      var message = Encoding.UTF8.GetString(body);
      var routingKey = ea.RoutingKey;
      Console.WriteLine(" [x] Received '{0}':'{1}'",
      routingKey,
      message);
      };
      channel.BasicConsume(queue: queueName,
      noAck: true,
      consumer: consumer);
      

      Console.WriteLine(" Press [enter] to exit.");
      Console.ReadLine();
      }
      }
      
    }
      

      运行以下示例:
      
    收到所有的日志:
      

    cd ReceiveLogsTopic  
    dotnet run "#"
      

      从设备kern接收所有日志:
      

    cd ReceiveLogsTopic  
    dotnet run "kern.*"
      

      或者如果您只想收到关于critical的日志:
      

    ReceiveLogsTopic.exe "*.critical"  

      您可以创建多个绑定:
      

    cd ReceiveLogsTopic  
    dotnet run "kern.*" "*.critical"
      

      并使用选择键kern.critical类型发出日志:
      

    cd EmitLogTopic  
    dotnet run "kern.critical" "A critical kernel error"
      

  • 运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
    2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
    3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
    4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
    5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
    6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
    7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
    8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

    所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-422471-1-1.html 上篇帖子: Exchanger学习 下篇帖子: Mule ESB 介绍
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    扫码加入运维网微信交流群X

    扫码加入运维网微信交流群

    扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

    扫描微信二维码查看详情

    客服E-mail:kefu@iyunv.com 客服QQ:1061981298


    QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


    提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


    本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



    合作伙伴: 青云cloud

    快速回复 返回顶部 返回列表