设为首页 收藏本站
查看: 1085|回复: 0

[经验分享] RabbitMQ 使用详细介绍

[复制链接]

尚未签到

发表于 2017-12-9 13:58:39 | 显示全部楼层 |阅读模式
  1. 实现最简单的队列通信
DSC0000.png

  2. producer端



# !/usr/bin/env python
import pika
#通过这个实例,先去建立一个socket,默认端口15672
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel() #声明一个管道,在管道里发消息
# 在管道里面声明queue队列。
channel.queue_declare(queue='hello')
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
#在管道里发消息
channel.basic_publish(exchange='',
routing_key='hello', #队列名字
body='Hello World!') #消息
print(" [x] Sent 'Hello World!'")
connection.close() #直接把链接关闭,不需要关闭管道。

  3.consumer端



# _*_coding:utf-8_*_
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost')) #建立链接
channel = connection.channel() #建立管道
# You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
# was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
# 为什么又声明了一个‘hello’队列?
# 如果确定已经声明了,可以不声明。但是你不知道那个机器先运行,所以可以再次声明,这样就不会报错。
channel.queue_declare(queue='hello') #说明从哪个队列里面收消息
# ch 管道的内存对象地址
def callback(ch, method, properties, body):
print('---->',ch,method,properties)
print(" [x] Received %r" % body)
#开始消费消息
channel.basic_consume(callback,   #如果收到消息,就调用callback函数来处理消息
queue='hello', #表示从哪个队列里收消息
no_ack=True)
print('
  • Waiting for messages. To exit press CTRL+C')
    channel.start_consuming() #之前的只是声明了语法,这条语句才是真正开始收数据

      4.运行
    DSC0001.png

      5. 上面实现的实例是一个生产者,一个消费者模型。下面尝试建立一个生产者,多个消费者的模型。
      启动3个客户端, consumer1,consumer2, consumer3。再启动producer,producer发的第1条消息被consumer1收到了,producer发的第2条消息被consumer2收到了,producer发的第3条消息被consumer3收到了,可以看出一对多情况下,默认采用的是轮询机制。
    DSC0002.png

      6. 如果消费者收到消息以后,处理的过程中down机了。当生产者收到消费者端发来的消息处理完的确认函以后,生产者才会把消息从队列中删除。
      如果处理的过程中突然down机了,客户端就没法发确认函了。生产者就认为没有处理完。
      consumer.py 中 no_ack=True # no acknowledge  不管有没有处理完,都不会给生成者端发消息.
      一般都会注释掉这句,#no_ack=True.意思就是处理完了,请给我一个确认函。然后服务器就把把这个任务从队列里面删除掉。


    • 假如消费者处理消息需要15秒,如果当机了,那这个消息处理明显还没处理完,怎么处理? (可以模拟消费端断了,分别注释和不注释 no_ack=True 看一下) 你没给我回复确认,就代表消息没处理完。
    •   上面的效果消费端断了就转到另外一个消费端去了,但是生产者怎么知道消费端断了呢? 因为生产者和消费者是通过socket连接的,socket断了,就说明消费端断开了。

    • 如果服务器端发了一个任务给消费者1,消费者1在处理的过程中,突然down机了(socket断了)。服务器端没有收到确认函,则这个任务还存在于队列中,就会发给消费者2进行处理,直至处理完毕为止。通过这种机制,就会保证消息会被全部处理完。
      7. 在安装目录下的 sbin文件夹下,有一个rabbitmqctl的文件,它是管理rabbitMQ的一个工具。
    DSC0003.png

      rabbitmqctl.bat list_queues 用这条命令可以查看当前有几个队列,以及每个队列里面存在的消息个数。
    DSC0004.png

      8. 客户端处理完消息以后,必须主动跟服务器端确认。否则如果客户端处理完消息以后,又拿着这条消息去干别的事情的话,服务器端一直收不到确认函,就不合理了。
    DSC0005.png

      9.如果队列里面还有一条消息,此时服务器端down机了,会发生什么呢?
      尝试down掉服务器端机器,rum中输入services.msc,然后找到这个程序,右击点停止。
    DSC0006.png

    DSC0007.png

      然后重启,测试效果:发现队列都丢了。因为队列是保存在内存中的。
      10. 为了保证队列不再丢失,在每次声明队列的时候,参数中加上durable=True 这句,服务器端和客户端都需要写上这句。把队列持久化了,但是里面的消息丢失了。如果想要消息也保留住,需要在生产者端加下面一句。
    DSC0008.png

      11. 测试上面的效果
    DSC0009.png

      插播一句,有可能会遇到以下问题
    DSC00010.png

      原因:这是因为已经定义的队列,再次定义是无效的,这就是幂次原理。RabbitMQ不允许重新定义一个已有的队列信息,也就是说不允许修改已经存在的队列的参数。如果你非要这样做,只会返回异常。所以做测试的时候,可以使用不同的队列进行比较。不能在同一个队列中做修改。
      producer.py中的程序



    # !/usr/bin/env python
    import pika
    #通过这个实例,先去建立一个socket,默认端口15672
    connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
    )
    channel = connection.channel() #声明一个管道,在管道里发消息
    # 在管道里面声明queue队列。
    channel.queue_declare(queue='hello1',durable=True)
    # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    #在管道里发消息
    channel.basic_publish(exchange='',
    routing_key='hello', #队列名字
    body='Hello World!', #消息
    properties=pika.BasicProperties(
    delivery_mode=2,#把消息持久化。
    )
    )
    print(" [x] Sent 'Hello World!'")
    connection.close() #直接把队列关闭,不需要关闭管道。

      consumer.py中的程序



    # _*_coding:utf-8_*_
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost')) #建立链接
    channel = connection.channel() #建立管道
    # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
    # We could avoid that if we were sure that the queue already exists. For example if send.py program
    # was run before. But we're not yet sure which program to run first. In such cases it's a good
    # practice to repeat declaring the queue in both programs.
    # 为什么又声明了一个‘hello’队列?
    # 如果确定已经声明了,可以不声明。但是你不知道那个机器先运行,所以可以再次声明,这样就不会报错。
    channel.queue_declare(queue='hello1',durable=True) #说明从哪个队列里面收消息
    #ch:管道的内存对象地址
    #method:包含了把消息发给谁等消息
    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)
    #开始消费消息
    channel.basic_consume(callback,   #如果收到消息,就调用callback函数来处理消息
    queue='hello', #表示从哪个队列里收消息
    #no_ack=True #no acknowledgement,不管有没有处理完,都不会给生成者端发消息。
    )
    print('
  • Waiting for messages. To exit press CTRL+C')
    channel.start_consuming() #之前的只是声明了语法,这条语句才是真正开始收数据

      12. 上面的例子是1个生产者,3个消费者。平均分发消息。下面引入权重分配任务。----能者多劳
      服务器端给客户端发消息的时候,如果当期队列中有1条那么,那么就不给你发了。
      只需要在消费者端加一句话:channel.basic_qos(prefetch_count=1)
      做模拟的时候,可以写2个消费者,其中1个sleep30秒-代表性能较差的机器。生产者一直不停发消息,当性能较差的机器一直没有处理完任务的时候,所有的任务都跑到了另外一个消费者处。
    DSC00011.png
      13, 做一个广播的实例,一个生产者发消息,所有的消费者都能收到。需要用到exchange。exchange类似于一个转发器。
      exchange必须精确的知道收到的消息要发给谁。exchange的类型决定了怎么处理, 类型有以下几种:


    • fanout: 所有绑定到此exchange的queue都可以接收消息
    • direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
    • topic: 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。
      1)fanout 纯广播、all
      需要queue和exchange绑定,因为消费者不是和exchange直连的,消费者是连在queue上,queue绑定在exchange上,消费者只会在queu里读消息.消息是实时的,如果生产者发的时候,消费者错过了,那就再也收不到这条消息了。类似于收音机的模型。
       DSC00012.png
      生产者的关键代码:
    DSC00013.png

      消费者的关键代码:
    DSC00014.png

      发送端 publisher 发布、广播



    import pika
    import sys
    connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
    )
    channel = connection.channel()
    #注意:这里是广播,不需要声明queue
    channel.exchange_declare(exchange='logs', #声明广播管道
    type='fanout'
    )
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    channel.basic_publish(exchange='logs',
    routing_key='', #queue名字为空,因为是广播,所以不需要声明queue。但是这句必须有。
    body=message)
    print(" [x] Sent %r" % message)
    connection.close()

      接收端 subscriber 订阅



    import pika
    connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
    )
    channel = connection.channel()
    channel.exchange_declare(exchange='logs',
    type='fanout')
    result = channel.queue_declare(exclusive=True)
    # exclusive-排它的,唯一的。不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    #随机生成,自动删除。queue对象的名字叫result。
    queue_name = result.method.queue
    channel.queue_bind(exchange='logs',  #queue绑定到转发器上。
    queue=queue_name)
    print('
  • Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r" % body)

    channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)
    channel.start_consuming()

      可能因为程序版本的问题,没法使用type关键词。否则会报错,可以把下面的注释掉,直接按照顺序写参数即可正常运行。
    DSC00015.png




    #channel.exchange_declare(exchange='logs', #声明广播管道
    #                         type='fanout'
    #                        )
    channel.exchange_declare('logs', #声明广播管道
    'fanout'
    )

      2)有选择地接收消息 (exchange type=direct),接收者可以过滤消息,只收我想要的消息.
    DSC00016.png

      当运行服务器端,后运行客户端以后,发现报错了,因为没有带参数。
    DSC00017.png

    DSC00018.png

      想带参数的话,只能用cmd了。
    DSC00019.png

      再启动一个客户端
    DSC00020.png

    DSC00021.png

      服务器端启动,默认用info模式发,看效果。该收的收到了,不该收的没收到。
    DSC00022.png

      用cmd启动服务器端,带参数的,发1条warning消息。测试效果:
    DSC00023.png

      效果如下:
    DSC00024.png

      测试结果一切良好。
      14. 细致的消息过滤广播模式。现在是收所有的info,所有的warning,所有的error。想区分哪个警告是哪个客户端发来的,就需要更细致的消息过滤了。分别代表1.“所有包含orange的”;2-“所有以rabbit结尾的”; 3-"所有以lazy开头的”.类似于一个动态的匹配。
    DSC00025.png

      server端:



    import pika
    import sys
    connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
    )
    channel = connection.channel()
    #channel.exchange_declare(exchange='topic_logs',
    #                         type='topic')
    channel = connection.channel()
    channel.exchange_declare('topic_logs',
    'topic')
    #可以自己填要发的消息,也可以使用默认的,但是格式是XXX.info的格式。
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='topic_logs',
    routing_key=routing_key,
    body=message)
    print(" [x] Sent %r:%r" % (routing_key, message))
    connection.close()

      客户端



    import pika
    import sys
    connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
    )
    channel = connection.channel()
    #channel.exchange_declare(exchange='topic_logs',
    #                         type='topic')
    channel.exchange_declare('topic_logs',
    'topic')
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    binding_keys = sys.argv[1:]
    if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)
    for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
    queue=queue_name,
    routing_key=binding_key)
    print('
  • Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)
    channel.start_consuming()

      测试效果:
    DSC00026.png

      python topic_consumer.py # 代表什么都收。
    DSC00027.png

      书写接收内容的规则如下:
    DSC00028.png

  • 运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
    2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
    3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
    4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
    5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
    6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
    7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
    8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

    所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-422418-1-1.html 上篇帖子: c++ 引用类型以及函数指针 下篇帖子: python rabbitMQ持久化队列消息
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    扫码加入运维网微信交流群X

    扫码加入运维网微信交流群

    扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

    扫描微信二维码查看详情

    客服E-mail:kefu@iyunv.com 客服QQ:1061981298


    QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


    提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


    本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



    合作伙伴: 青云cloud

    快速回复 返回顶部 返回列表