设为首页 收藏本站
查看: 645|回复: 0

[经验分享] RabbitMQ实例教程:主题交换机

[复制链接]
累计签到:1 天
连续签到:1 天
发表于 2015-10-15 08:29:49 | 显示全部楼层 |阅读模式
  前面的例子中,尽管我们使用了direct路由代替fanout路由解决了盲目广播的问题,但direct路由也有它的缺陷,他不能基于多个标准做路由转发。

  在上面的日志系统中,如果不仅想基于日志等级做订阅,也想根据日志的发生源做订阅该怎么处理呢?这时候你可能想到了unix系统工具中的syslog服务,它不仅基于日志等级(info/warn/crit...)进行路由转发,也会根据操作(auth/cron/kern...)做路由转发。

  如果是那样的话,日志系统就灵活多了,它不仅能够监听来自‘cron’的关键错误,也能监听来自'kern'的所有日志。其实主题交换机(topic exchange)就能解决这种问题。

  主题交换机(Topic exchange)

  主题交换机的路由代码不能是任意写的,必须是小树点分隔开的一组单词列表。这些单词可以随便写,但通常是与连接消息特征有关的单词。有效地路由代码应该是这样的“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。路由代码可以随便写,但是长度限制在255字节。

  注意,绑定代码也必须在同一个表单中。topic交换机与direct交换机类似-具有特定路由代码的消息会传送给所有匹配绑定代码的队列,但有两个特殊的绑定代码:

  * :它能替代一个单词 

  #:它能替代0或多个单词
wKiom1YekEyRndmFAABl8iEOXGg312.jpg

  该例子中,我们给所有的动物发送消息,符合由三个单词(第一个单词描述速度;第二个单词描述颜色;第三个单词描述物种)组成的路由代码将会发送消息:“<speed>.<colour>.<species>”。

  我们创建了三个绑定:Q1使用“*.orange.*”绑定,Q2使用“*.*.rabbit”和“lazy.#”绑定。这些绑定的意义如下:

  Q1描述了所有颜色为橙色的动物。

  Q2描述了是兔子的动物和懒惰的动物。

  这样,“quick.orange.rabbit”消息通过路由转发给Q1、Q2两个队列。"lazy.orange.elephant"消息也会转发给Q1、Q2两个队列。“quick.orange.fox”消息只会转发给Q1队列,"lazy.brown.fox"也只会转发给Q2队列。"lazy.pink.rabbit"会转发给Q2队列一次,尽管它匹配两个绑定。"quick.brown.fox"并不匹配任何一个队列就会被废弃。

  如果我们打破规则,每次只发一个或四个单词的话,如“orange”或”quick.orange.male.rabbit“,这些消息不匹配任何绑定,就会被废弃。但如果发送”lazy.orange.male.rabbit“这样的消息的话,由于它匹配最后的绑定仍会被转发到Q2队列中。

  主题交换机是一种非常强大的交换机,当它只绑定”#“时,它会接收所有的消息,与fanout交换机类似。当没有使用”*“和”#“符号时,主题交换机的作用等同与direct交换机。

  源代码

EmitLogTopic.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.favccxx.favrabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) {
        Connection connection = null;
        Channel channel = null;
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");

            connection = factory.newConnection();
            channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            String[] routingKeys = { "fast.orange.duck", "slow.orange.fish", "grey.rabbit", "fast.black.rabbit",
                    "quick.white.rabbit", "lazy.dog", "lazy.black.pig" };
            String[] messages = { "Hello", "Guys", "Girls", "Babies" };

            for (int i = 0; i < routingKeys.length; i++) {
                for (int j = 0; j < messages.length; j++) {
                    channel.basicPublish(EXCHANGE_NAME, routingKeys, null, messages[j].getBytes("UTF-8"));
                    System.out.println(" [x] Sent '" + routingKeys + "':'" + messages[j] + "'");
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ignore) {
                }
            }
        }
    }
}




ReceiveLogsTopic.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.favccxx.favrabbit;

import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class ReceiveLogsTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        String[] bindingKeys = { "*.orange.*", "*.*.rabbit", "lazy.#" };
        for (final String bindingKey : bindingKeys) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("[" + bindingKey + "] Received message :'" + message + "' from routingKey : " + envelope.getRoutingKey());
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }

    }
}




运行消息发送器,在消息接收平台输出内容如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[*.orange.*] Received message :'Hello' from routingKey : fast.orange.duck
[*.*.rabbit] Received message :'Guys' from routingKey : fast.orange.duck
[lazy.#] Received message :'Girls' from routingKey : fast.orange.duck
[*.orange.*] Received message :'Babies' from routingKey : fast.orange.duck
[*.*.rabbit] Received message :'Hello' from routingKey : slow.orange.fish
[lazy.#] Received message :'Guys' from routingKey : slow.orange.fish
[*.orange.*] Received message :'Girls' from routingKey : slow.orange.fish
[*.*.rabbit] Received message :'Babies' from routingKey : slow.orange.fish
[lazy.#] Received message :'Hello' from routingKey : fast.black.rabbit
[*.orange.*] Received message :'Guys' from routingKey : fast.black.rabbit
[*.*.rabbit] Received message :'Girls' from routingKey : fast.black.rabbit
[lazy.#] Received message :'Babies' from routingKey : fast.black.rabbit
[*.orange.*] Received message :'Hello' from routingKey : quick.white.rabbit
[*.*.rabbit] Received message :'Guys' from routingKey : quick.white.rabbit
[lazy.#] Received message :'Girls' from routingKey : quick.white.rabbit
[*.orange.*] Received message :'Babies' from routingKey : quick.white.rabbit
[*.*.rabbit] Received message :'Hello' from routingKey : lazy.dog
[lazy.#] Received message :'Guys' from routingKey : lazy.dog
[*.orange.*] Received message :'Girls' from routingKey : lazy.dog
[*.*.rabbit] Received message :'Babies' from routingKey : lazy.dog
[lazy.#] Received message :'Hello' from routingKey : lazy.black.pig
[*.orange.*] Received message :'Guys' from routingKey : lazy.black.pig
[*.*.rabbit] Received message :'Girls' from routingKey : lazy.black.pig
[lazy.#] Received message :'Babies' from routingKey : lazy.black.pig






运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-126868-1-1.html 上篇帖子: centos7 下 减少/boot分区空间,扩大/ 的空间 下篇帖子: CentOS 6.6单系统硬盘重装 交换机 主题
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表