设为首页 收藏本站
查看: 882|回复: 0

[经验分享] RabbitMQ的使用(5)

[复制链接]
YunVN网友  发表于 2017-7-2 11:25:12 |阅读模式
  理清路由机制是了解RabbitMQ来龙去脉的关键。在前面的例子中我们常常遇见这三个概念:exchange,routingKey 和 queue。真正地消息传输流程是消息先到exchange,然后exchange根据对应的routingKey放入queue,如果routingKey不匹配则丢弃。网上网友的一张图很好的展示了这个流程:
  
   DSC0000.png
  
  0.9 版本的AMQP协议的exchange有如下4中类型:fanout,direct,topic 和 headers。RabbitMQ服务会在启动以后预先建立4个exchange,分别对应于4中类型:
  
   DSC0001.png
  

默认的exchange
  如果用空字符串去申明一个exchange,那么系统就会使用"amq.direct"这个exchange。我们在创建一个queue的时候,默认的都会有一个和新建queue同名的routingKey绑定到这个默认的exchange上去。就像我们在发送和接收例子中,发送者的发送代码:

channel.BasicPublish("", "TaskQueue", properties, bytes);
  因为在第一个参数选择了默认的exchange,而我们申明的队列叫TaskQueue,所以默认的,它在新建一个也叫TaskQueue的routingKey,并绑定在默认的exchange上,导致了我们可以在第二个参数routingKey中写TaskQueue,这样它就会找到定义的同名的queue,并把消息放进去。
  

Direct Exchange
  direct exchange 发送消息是要看routingKey的。举个例子,定义了一个direct exchange 名字是X1,然后一个queue名字为Q1 用routingKey=K1 绑定到exchange X1上,当一个routeKey为 K2 的消息到达X1上,那么只有K1=K2的时候,这个消息才能到达Q1上。代码例子如下:
  

channel.ExchangeDeclare("X1", "direct");
channel.QueueDeclare("Q1", true, false, false, null);
channel.QueueBind("Q1", "X1", "K1");
channel.BasicPublish("X1", "K1", null, bytes);
  
  这样Q1才能收到消息。
  如果有两个接收程序都是用了同一个的queue和相同的routingKey去绑定direct exchange的话,分发的行为是负载均衡的,也就是说第一个是程序1收到,第二个是程序2收到,以此类推。
  如果有两个接收程序用了各自的queue,但使用相同的routingKey去绑定direct exchange的话,分发的行为是复制的,也就是说每个程序都会收到这个消息的副本。行为相当于fanout类型的exchange。
  官方图片展示direct exchange:
   DSC0002.png
  

Fanout Exchange
  fanout类型的exchange就比较好理解。就是简单的广播,而且是忽略routingKey的。所以只要是有queue绑定到fanout exchange上,通过这个exchange发送的消息都会被发送到那些绑定的queue中,不管你有没有输入routingKey。
  您可以需要广播消息或分布式系统的消息同步等场景中广泛使用到它。
  官方图片展示:
   DSC0003.png
  
  

Topic Exchange
  Topic类型的exchange给与我们更大的灵活性。通过定义routingKey可以有选择的订阅某些消息,此时routingKey就会是一个表达式。exchange会通过匹配绑定的routingKey来决定是否要把消息放入对应的队列中。有两种表达式符号可以让我们选择:#和*。
  *(星号):代表任意的一个词。 例:*.a会匹配a.a,b.a,c.a等
  #(井号):代码任意的0个或多个词。 例:#.a会匹配a.a,aa.a,aaa.a等
  topic exchange 有时候的行为会像其他类型的exchange,比如说:
  当routingKey只是有#号的时候,它的行为和fanout的行为是一样的。
  当routingKey什么的没有,空字符串的时候,它的行为是和direct是一样的。
  
  要注意的是,符号代表的是词不是字符。RabbitMQ中在表达式中词的定义是以.(点号)分隔的。
  
  对于一个queue绑定到exchange,是可以多次绑定的:

channel.QueueBind(queue_name, "X1", "lazy.#");
channel.QueueBind(queue_name, "X1", "*.*.rabbit");

  如上面的代码,表示这个queue即可以收lazy.开头的,又可以收.rabbit结尾的。
  大家可以通过一些测试来理解这里面的规则,但是在测试的时候需要注意,每次修改表达式后,需要重置一下RabbitMQ。因为是可以多次绑定的,所以之前你改的所有的表达式都会记录下来,都会被尝试匹配。
  重置的命令(RabbitMQ以控制台的形式运行):
   DSC0004.png
  

Headers Exchange
  Headers类型的exchange使用的比较少,它也是忽略routingKey的一种路由方式。是使用Headers来匹配的。Headers是一个键值对,可以定义成Hashtable。发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。之前的几种exchange的routingKey都需要要字符串形式的,而headers exchange则没有这个要求,因为键值对的值可以是任何类型。代码示例如下:
  发送端:



channel.ExchangeDeclare("X1", "headers");

IBasicProperties properties = channel.CreateBasicProperties();
properties.Headers = new Hashtable();
properties.Headers.Add("Key1", 123);
properties.Headers.Add("Key2", 345);

XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
MemoryStream ms = new MemoryStream();
xs.Serialize(ms, message);
byte[] bytes = ms.ToArray();

channel.BasicPublish("X1", "", properties, bytes);
  接收端:



channel.ExchangeDeclare("X1", "headers");
//随机创建一个队列
string queue_name = channel.QueueDeclare("headerssubscriber2", true, false, false, null);
//绑定
IDictionary ht = new Hashtable();
ht.Add("x-match", "any");
ht.Add("Key1", 12345);
ht.Add("Key2", 34567);
channel.QueueBind(queue_name, "X1", "", ht);
//定义这个队列的消费者
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queue_name, true, consumer);

while (true)
{
    BasicDeliverEventArgs ea =
        (BasicDeliverEventArgs)consumer.Queue.Dequeue();

    byte[] bytes = ea.Body;

    XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
    using (MemoryStream ms = new MemoryStream(bytes))
    {
        RequestMessage message = (RequestMessage)xs.Deserialize(ms);
        Console.WriteLine("Receive a Message, Id:" + message.MessageId + " Message:" + message.Message);
    }
}
  
  点击这里下载测试代码

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390289-1-1.html 上篇帖子: 关于 RabbitMQ 的 Dead-Letters-Queue “死信队列” 下篇帖子: rabbitmq 3.6 延时消息
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表