设为首页 收藏本站
查看: 1038|回复: 0

[经验分享] rabbitmq消息队列——"发布订阅"

[复制链接]

尚未签到

发表于 2017-7-4 19:11:25 | 显示全部楼层 |阅读模式
  三、发布订阅
  上一节的练习中我们创建了一个工作队列。队列中的每条消息都会被发送至一个工作进程。这节,我们将做些完全不同的事情——我们将发送单个消息发送至多个消费者。这种模式就是广为人知的“发布订阅”模式。
  为了说明这种模式,我们将构建一个简单的日志系统。包括2个应用程序,一个传送日志消息另一个接收并打印这些消息。
  我们的日志系统中每一个运作的接收端程序都会收到这些消息。这种方式下,我们就可以运行一个接收端发送日志消息至硬盘,同时可以运行另一个接收端将日志打印到屏幕上。
  理论上讲,已发布的日志消息将会被广播到所有的接收者。
  交换器(Exchange
  之前的几节练习中我们发送接收消息都是在队列中进行,是时候介绍下RabbitMQ完整的消息传递模式了。
  先来迅速的回顾下我们之前章节:


  • 一个生产者就是一个用来发送消息的应用程序
  • 一个 队列好比存储消息的缓存buffer
  • 一个消费者就是一个用户应用程序用来接收消息
  RabbitMQ消息传递模型的核心思想是生产者从来不会直接发送消息至队列。事实上,生产者经常都不知道消息会被分发至哪个队列。
  相反的是,生产者仅仅发送消息至交换器。交换器是非常简单的东西:一边从生产者那边接收消息一边发送这些消息至队列。交换器必须准确的知道这些被接收的消息该如何处理。它应该被添加到某个特定队列?或者添加到多个队列?甚至直接放弃。具体的传输规则就是通过交换器类型来定义的。
DSC0000.png

  交换器类型有四种:direct、topic、headers、fanout。这节我们主要关注最后一种——fanout。让我们来创建一个fanout类型的交换器,命名为logs:



err = ch.ExchangeDeclare(
"logs",   // name
"fanout", // type
true,     // durable
false,    // auto-deleted
false,    // internal
false,    // no-wait
nil,      // arguments
)
正如你从名字中猜测的一样,它仅仅广播所有消息到所有已知的接收队列。实际上这正是我们需要的日志系统。

备注:之前的几节练习中我们并不知道交换器,但我们依然能够将消息发送至队列中,之所以可以实现是因为我们使用了默认的交换器,使用空字符串表示。

回顾下之前我们发送消息是这样子的:




err = ch.Publish(
"",     // exchange
q.Name, // routing key
false,  // mandatory
false,  // immediate
  amqp.Publishing{
ContentType: "text/plain",
Body:        []byte(body),
})
这里我们可以使用默认也可以自己命名交换器:如果路由键存在的话,消息会被路由到加上路由键参数的地址,注意fanout类型会直接忽略路由键的存在。

以下是修改后的代码:




err = ch.ExchangeDeclare(
"logs",   // name    定义一个名为logs的交换器   
"fanout", // type    交换器类型为fanout即广播类型
true,     // durable    持久化
false,    // auto-deleted        无队列绑定时是否自动删除
false,    // internal
false,    // no-wait
nil,      // arguments
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs", // exchange    指定消息发送的交换器名称
"",     // routing key    因为fanout类型会自动忽略路由键,所以这里的路由键参数任意,一般不填
false,  // mandatory
false,  // immediate
  amqp.Publishing{
ContentType: "text/plain",
Body:        []byte(body),
})
临时队列

你可能记得之前我们声明队列的时候都会指定一个队列名称(记得hello和task_queue?)。队列的命名对我们来说至关重要——我们需要将工作进程指向同一个队列。当你需要在消费者和生产者之间共享队列的话声明队列就显得很重要。

但这对我们的日志系统来说无关重要。我们需要监听的是所有的日志消息,而不是他们中的某一类。我们只关注当前流中的消息而不关注旧的那些。解决这个我们需要做两件事。

首先,每当链接RabbitMQ的时候我们需要创建一个新的、空的队列。为做到这点,我们必须创建一个名称随机的队列,甚至更好的实现方式是——让服务端给我们自动生成一个随机的队列。

其次,一旦消费者链接断开,该队列便会自动删除。

在amqp客户端中,当我们给一个队列名称设定为空字符串时,我们就创建了一个非持久化的生成队列:




q, err := ch.QueueDeclare(
"",    // name    满足第一点:服务端自动产生随机队列
false, // durable
false, // delete when usused
true,  // exclusive   满足第二点:连接断开立即删除
false, // no-wait
nil,   // arguments
)
当该方法返回的时候,声明好的队列便包含一个由RabbitMQ生成的随机队列名称。举例来说,队列名称形如:amq.gen-JzTY20BRgKO-HjmUJj0wLg这种的。

当消费者的链接宣布关闭后,队列便像exclusive参数设置的那样,自动删除。

绑定

DSC0001.png

我们已经创建了一个fanout类型的交换器和一个队列,现在我们需要告诉交换器将消息发送至我们的队列。这种交换器和队列中的关联关系就叫做绑定。




err = ch.QueueBind(
q.Name, // queue name    绑定的队列名称
"",     // routing key    绑定的路由键
"logs", // exchange    绑定的交换器名称
false,
nil
)
从现在起,logs交换器便能发送消息至我们的队列。

糅合在一起

DSC0002.png

生产者的程序,也就是发送消息端,跟之前几节的发送代码差不多。最重要的是我们现在要发送消息到logs交换器而非默认的交换器。发送的时候我们可以设置一个路由键,但是对于fanout类型的交换器来说它将被忽略。下面就是发送日志方的代码:




// rabbitmq_3_emit_log.go project main.go
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s:%s", err, msg)
panic(fmt.Sprintf("%s:%s", err, msg))
}
}
func bodyForm(args []string) string {
var s string
if len(args) < 2 || os.Args[1] == "" {
s = "Hello World! This is a test!"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "failed to dial rabbitmq server")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "failed to declare the channel")
defer ch.Close()
//声明一个交换器,交换器名称logs,类型fanout
err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
failOnError(err, "failed to declare the exchange")
body := bodyForm(os.Args)
//发送消息到交换器
err = ch.Publish("logs", "", false, false, amqp.Publishing{
Body:        []byte(body),
ContentType: "text/plain",
})
failOnError(err, "failed to publish the message")
}
  备注:这里发送方并不需要声明队列之类的,不像之前的代码需要声明,这里的发送方唯一关联的是交换器,所以只需声明交换器并发送消息至交换器即可。
  正如你想的那样,链接建立后我们声明交换器,这一步是必须的因为发送消息到一个不存在的交换器是完全禁止的。
  如果该交换器上面没有队列绑定的话那么发送至该交换器的消息将全部丢失,但这对我们来时ok;如果没有消费者我们会安全地丢弃这些消息。
  下面是日志接收方的代码:



// rabbitmq_3_receive_logs.go project main.go
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s:%s", err, msg)
panic(fmt.Sprintf("%s:%s", err, msg))
}
}
func bodyForm(args []string) string {
var s string
if len(args) < 2 || os.Args[1] == "" {
s = "Hello World! This is a test!"
} else {
s = strings.Join(args[1:], " ")
}
return s
}
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "failed to dial rabbitmq server")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "failed to declare the channel")
defer ch.Close()
//声明一个交换器,交换器名称logs,类型fanout
err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
failOnError(err, "failed to declare the exchange")
//声明一个队列
q, err := ch.QueueDeclare("", false, false, true, false, nil)
failOnError(err, "failed to declare the queue")
//设置绑定(第二个参数为路由键,这里为空)
err = ch.QueueBind(q.Name, "", "logs", false, nil)
failOnError(err, "failed to bind the queue")
//注册一个消费者
msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
failOnError(err, "Failed to register a consumer")
forever := make(<-chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf("
  • Waiting for logs. To exit press CTRL+C")
    <-forever
    }
      如果你想将日志保存到文件,执行如下命令:



    go run receive_logs.go > logs_from_rabbit.log
      如果你仅仅想在屏幕上查看日志,开启一个新的控制台执行如下命令:



    go run receive_logs.go
      当然了,你最后还要发出日志才行:



    go run emit_log.go
      使用rabbitmqctl list_bindings命令可以直接查看所有的绑定,如运行2个receive_logs.go程序你就会看到如下输出:



    rabbitmqctl list_bindings
    Listing bindings ...
    logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
    logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
    ...done.
      实际效果:
    DSC0003.png

    DSC0004.png

      分别开启两个控制台,均监听相同队列,同时收到消息并打印了,说明两个随机的队列均收到了logs交换器发来的消息,发送方略。

  • 运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
    2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
    3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
    4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
    5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
    6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
    7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
    8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

    所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390763-1-1.html 上篇帖子: .NET中RabbitMQ的使用 下篇帖子: OBYC中的Transaction key详细解释以及用法-Group(RMK)
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    扫码加入运维网微信交流群X

    扫码加入运维网微信交流群

    扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

    扫描微信二维码查看详情

    客服E-mail:kefu@iyunv.com 客服QQ:1061981298


    QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


    提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


    本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



    合作伙伴: 青云cloud

    快速回复 返回顶部 返回列表