linuxx 发表于 2017-7-2 16:49:06

RabbitMQ 官方NET教程(四)【路由选择】

  在上一个教程中,我们构建了一个简单的日志记录系统。 我们能够广播日志消息给所有你的接收者。
  在本教程中,我们将为其添加一个功能 - 我们将让日志接收者可以仅订阅一部分消息。 例如,我们将能够仅将关键的错误消息写入到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

绑定(Bindings)
  在以前的例子中,我们已经使用过绑定。类似下面的代码:
  

channel.QueueBind(queue: queueName,  exchange: "logs",
  routingKey: "");
  

  绑定表示转发器与队列之间的关系。我们也可以简单的认为:队列对该转发器上的消息感兴趣。
  绑定可以附带一个额外的参数routingKey。 为了避免与BasicPublish参数混淆,我们将其称为binding key。 这就是我们如何用一个键创建一个绑定:
  

channel.QueueBind(queue: queueName,  exchange: "direct_logs",
  routingKey: "black");
  

  绑定键的意义依赖于转发器的类型。对于fanout类型,忽略此参数。

直接转发(Direct exchange)
  我们从上一个教程的日志记录系统向所有消费者广播所有消息。 我们希望将其扩展为允许基于其严重性进行过滤日志消息。 例如,我们可能希望将日志消息写入磁盘的脚本仅接收严重错误,而不会在警告或信息日志消息上浪费磁盘空间。
  我们正在使用一个fanout的交换机,它不给我们很大的灵活性 - 它只能无意识地转发。
  我们会使用direct转发器。 direct类型的转发器背后的路由算法很简单 - 消息传递到binding key与消息的routing key完全匹配的队列。
  为了说明,请考虑以下设置:
  

  
在这个设置中,我们可以看到direct 类型的转发器X与两个队列绑定。 第一个队列与绑定键orange绑定,第二个队列与转发器间有两个绑定,一个与绑定键black绑定,另一个与green绑定键绑定。
  在这样的设置中,发布附带一个选择键(routing key) orange的消息至交换机,将被导向到队列Q1。 消息附带一个选择键 (routing key)black或者green将会被导向到Q2。 所有其他消息将被丢弃。

多重绑定(multiple bindings)

  
使用相同的绑定键绑定多个队列是完全合法的。 在我们的示例中,我们可以在X和Q1之间添加绑定键black。 在这种情况下,direct交换将表现得像fanout,并将消息广播到所有匹配的队列。 附带选择键black的消息将传送到Q1和Q2。

发送日志(Emittinglogs)
  我们将此模型用于日志记录系统。我们将消息发送到direct类型的转发器而不是fanout类型。这样的话, 接收程序可以根据严重性来选择接收。 我们首先关注发送日志的代码:
  一如以往,我们需要先创建一个转发器:
  

channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");  

  然后我们准备发送一条消息:
  

var body = Encoding.UTF8.GetBytes(message);  
channel.BasicPublish(exchange: "direct_logs",
  routingKey: severity,
  basicProperties: null,
  body: body);
  

  为了简化代码,我们假定severity是info,warning,error中的一个。

订阅
  接收消息将像上一个教程类似,只有一点不同 - 我们将为每个我们感兴趣的严重性类型的日志创建一个新的绑定。
  

var queueName = channel.QueueDeclare().QueueName;  

  
foreach(var severity in args)
  
{
  channel.QueueBind(queue: queueName,
  exchange: "direct_logs",
  routingKey: severity);
  
}
  

完整的实例

  EmitLogDirect.cs 类的代码:
  

using System;  
using System.Linq;
  
using RabbitMQ.Client;
  
using System.Text;
  

  
class EmitLogDirect
  
{
  public static void Main(string[] args)
  {
  var factory = new ConnectionFactory() { HostName = "localhost" };
  using(var connection = factory.CreateConnection())
  using(var channel = connection.CreateModel())
  {
  channel.ExchangeDeclare(exchange: "direct_logs",
  type: "direct");
  

  var severity = (args.Length > 0) ? args : "info";
  var message = (args.Length > 1)
  ? string.Join(" ", args.Skip( 1 ).ToArray())
  : "Hello World!";
  var body = Encoding.UTF8.GetBytes(message);
  channel.BasicPublish(exchange: "direct_logs",
  routingKey: severity,
  basicProperties: null,
  body: body);
  Console.WriteLine(" Sent '{0}':'{1}'", severity, message);
  }
  

  Console.WriteLine(" Press to exit.");
  Console.ReadLine();
  }
  
}
  

  ReceiveLogsDirect.cs的代码:
  

using System;  
using RabbitMQ.Client;
  
using RabbitMQ.Client.Events;
  
using System.Text;
  

  
class ReceiveLogsDirect
  
{
  public static void Main(string[] args)
  {
  var factory = new ConnectionFactory() { HostName = "localhost" };
  using(var connection = factory.CreateConnection())
  using(var channel = connection.CreateModel())
  {
  channel.ExchangeDeclare(exchange: "direct_logs",
  type: "direct");
  var queueName = channel.QueueDeclare().QueueName;
  

  if(args.Length < 1)
  {
  Console.Error.WriteLine("Usage: {0} ",
  Environment.GetCommandLineArgs());
  Console.WriteLine(" Press to exit.");
  Console.ReadLine();
  Environment.ExitCode = 1;
  return;
  }
  

  foreach(var severity in args)
  {
  channel.QueueBind(queue: queueName,
  exchange: "direct_logs",
  routingKey: severity);
  }
  

  Console.WriteLine("
[*] Waiting for messages.");
  

  var consumer = new EventingBasicConsumer(channel);
  consumer.Received += (model, ea) =>
  {
  var body = ea.Body;
  var message = Encoding.UTF8.GetString(body);
  var routingKey = ea.RoutingKey;
  Console.WriteLine(" Received '{0}':'{1}'",
  routingKey, message);
  };
  channel.BasicConsume(queue: queueName,
  noAck: true,
  consumer: consumer);
  

  Console.WriteLine(" Press to exit.");
  Console.ReadLine();
  }
  }
  
}
  

  如果您只想将warning和error(而不是info)保存到文件中,只需打开控制台并键入:
  

cd ReceiveLogsDirect  
dotnet run warning error > logs_from_rabbit.log
  

  如果您想查看屏幕上的所有日志消息,请打开一个新终端,然后执行以下操作:
  

cd ReceiveLogsDirect  
dotnet run info warning error
  
# =>
[*] Waiting for logs. To exit press CTRL+C
  

  而且,例如,要发出error日志消息,只需键入:
  

cd EmitLogDirect  
dotnet run error "Run. Run. Or it will explode."
  
# => Sent 'error':'Run. Run. Or it will explode.'
  
页: [1]
查看完整版本: RabbitMQ 官方NET教程(四)【路由选择】