设为首页 收藏本站
查看: 913|回复: 0

[经验分享] rabbitmq消息队列——"路由"

[复制链接]

尚未签到

发表于 2017-7-3 16:05:10 | 显示全部楼层 |阅读模式
  在之前的教程中,我们创建了一个简单的日志系统。我们能够向许多交换器转发日志消息。
  在本教程中,我们将添加一个功能——我们让它仅仅接收我们感兴趣的日志类别。举例:我们 实现仅将严重级别的错误日志写入磁盘(为了节省磁盘空间),其余日志级别的日志直接打印到控制台。
  绑定
  之前的章节中我们已经创建过绑定,你可能还会记得:

err = ch.QueueBind(
q.Name, // queue name
"",     // routing key
"logs", // exchange
false,
nil)

  绑定是用来维系交换器和队列关系的,这可以被简单地理解为:队列仅仅对从交换器中传的消息感兴趣。
  绑定有个额外参数叫做routing_key,为了避免与Channel.Publish方法中的参数相混淆,我们称之为binding key(绑定键)。使用绑定键创建绑定如下:

err = ch.QueueBind(
q.Name,    // queue name
"black",   // routing key
"logs",    // exchange
false,
nil)

  绑定键的含义取决于交换器的类型。我们之前使用的fanout类型的交换器,就会直接忽略这个参数。
  Direct型交换器
  我们之前的教程中的日志系统是广播所有的消息到所有消费者。我们希望以此拓展来实现根据消息严重性来过滤消息。比如我们希望 写日志到硬盘的代码仅仅接收严重级别的,不要浪费磁盘存储在warning或者info级别的日志。
  之前使用的是fanout类型交换器,没有更好的拓展性或者说灵活性——它只能盲目的广播。
  现在 使用direct型交换器替代。Direct型的路由算法 比较简单——消息会被派发到某个队列,该队列的绑定键恰好和消息的路由键一致。
  为了阐述,考虑如下设置:
DSC0000.png

  该设置中,可以看到direct型的交换器X被绑定到了两个队列:Q1、Q2。Q1使用绑定键orange绑定,Q2包含两个绑定键:black和green。
  基于如上设置的话,使用路由键orange发布的消息会被路由到Q1队列,而使用black或者green路由键的消息均会被路由到Q2,所有其余消息将被丢弃。
  备注:这里的交换器X和队列的绑定是多对多的关系,也就是说一个交换器可以到绑定多个队列,一个队列也可以被多个交换器绑定,消息只会被路由一次,不能因为两个绑定键都匹配上了路由键消息就会被路由两次,这种是不存在的。
  多个绑定
DSC0001.png

  用相同的绑定键去绑定多个队列是完全合法的,我们可以再添加一个black绑定键来绑定X和Q1,这样Q1和Q2都使用black绑定到了交换器X,这其实和fanout类型的交换器直接绑定到队列Q1、Q2功能相同:使用black路由键的消息会被直接路由到Q1和Q2。
  发送日志
  我们将使用该模型来构建日志系统。使用direct型的交换器替换fanout型的,我们将日志的严重级别作为路由键,这样的话接收端程序可以选择日志接收级别进行接收,首先聚焦下日志发送端:
  首先创建一个交换器:

err = ch.ExchangeDeclare(
"logs_direct", // name
"direct",      // type
true,          // durable
false,         // auto-deleted
false,         // internal
false,         // no-wait
nil,           // arguments
)

  然后是发送消息:

err = ch.ExchangeDeclare(
"logs_direct", // name
"direct",      // type
true,          // durable
false,         // auto-deleted
false,         // internal
false,         // no-wait
nil,           // arguments
)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
err = ch.Publish(
"logs_direct",         // exchange
severityFrom(os.Args), // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body:        []byte(body),
})

  为了简单起见,我们假设日志严重级别如下:'info', 'warning', 'error'。
  订阅
  接收还和之前章节接收一样,只有一个例外:我们将为每一个感兴趣的严重级别创建一个绑定:

q, err := ch.QueueDeclare(
"",    // name
false, // durable
false, // delete when usused
true,  // exclusive
false, // no-wait
nil,   // arguments
)
failOnError(err, "Failed to declare a queue")
if len(os.Args) < 2 {
log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
os.Exit(0)
}
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "logs_direct", s)
err = ch.QueueBind(
q.Name,        // queue name
s,             // routing key
"logs_direct", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
}

  糅合在一起
DSC0002.png

  发送端:

// rabbitmq_4_emit_log_direct.go project main.go
package main
import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
//链接队列服务
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
//声明一个channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
//声明一个direct类型交换器
err = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil)
failOnError(err, "Failed to declare an exchange")
body := bodyFrom(os.Args)
ch.Publish("logs_direct", severityFrom(os.Args), false, false, amqp.Publishing{
ContentType: "text/plain",
Body:        []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
}
//接收消息发送内容
func bodyFrom(args []string) string {
var s string
if (len(args) < 3) || os.Args[2] == "" {
s = "hello"
} else {
s = strings.Join(args[2:], " ")
}
return s
}
//接收日志级别,作为路由键使用
func severityFrom(args []string) string {
var s string
if len(args) < 2 || args[1] == "" {
s = "info"
} else {
s = args[1]
}
return s
}

  接收端:

// rabbitmq_4_receive_logs_direct.go project main.go
package main
import (
"fmt"
"log"
"os"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
panic(fmt.Sprintf("%s: %s", msg, err))
}
}
func main() {
//链接队列服务
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
//声明一个channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
//声明一个direct类型交换器
err = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil)
failOnError(err, "Failed to declare an exchange")
//声明一个队列
q, err := ch.QueueDeclare("", false, false, true, false, nil)
failOnError(err, "Failed to declare a queue")
//判断cmd窗口接收参数是否足够
if len(os.Args) < 2 {
log.Printf("Usage:%s [info] [warning] [error]", os.Args[0])
os.Exit(0)
}
//cmd窗口输入的多个日志级别,分别循环处理—进行绑定
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s)
ch.QueueBind(q.Name, s, "logs_direct", false, nil)
failOnError(err, "Failed to bind a queue")
}
msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf("
  • Waiting for logs. To exit press CTRL+C")
    <-forever
    }

      如果您只想保存“警告”和“错误”(而不是“信息”)日志消息到文件,只需要打开一个控制台然后输入:

    go run receive_logs_direct.go warning error > logs_from_rabbit.log
      如果你想看到所有的日志消息在你的屏幕上,打开一个新的终端,输入:

    go run receive_logs_direct.go info warning error
      发出一个错误日志消息类型如下:

    go run emit_log_direct.go error "Run. Run. Or it will explode."
      可以观察到:
      消息可以进行分类接收了, 只有error级别的消息才会被存入log日志文件,而info、warning级别的都不存入。
      实际效果如下:
    DSC0003.png

    DSC0004.png

    DSC0005.png

  • 运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
    2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
    3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
    4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
    5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
    6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
    7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
    8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

    所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390620-1-1.html 上篇帖子: MySQL分区表管理 下篇帖子: RabbitMQ之window安装步骤
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    扫码加入运维网微信交流群X

    扫码加入运维网微信交流群

    扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

    扫描微信二维码查看详情

    客服E-mail:kefu@iyunv.com 客服QQ:1061981298


    QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


    提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


    本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



    合作伙伴: 青云cloud

    快速回复 返回顶部 返回列表