设为首页 收藏本站
查看: 868|回复: 0

[经验分享] Part1.1 、RabbitMQ 操作使用

[复制链接]

尚未签到

发表于 2017-7-4 19:09:18 | 显示全部楼层 |阅读模式
本节目录:一、最基本的生产者消费者
二、acknowledgment 消息不丢失的方法。


三、durable 消息不丢失 (消息持久化)
四、消息获取顺序






RabbitMQ安装。
(1.1)、centos7 安装 RabbitMQ   
              详见另一篇运维笔记《RabbitQM安装》步骤【这个实验成功】。

简略篇;             安装配置阿里云镜像
安装erlang
$ yum -y install erlang
安装RabbitMQ
$ yum -y install rabbitmq-server
service rabbitmq-server start/stop




(1.2)、Python操作 RabbitMQ模块 pika  API
              >>  pip install pika 或 easy_install pika





  • 先来一个基于Queue实现生产者消费者模型试试水

   
     
import queue
import threading
message = queue.Queue(10)
def producer(i):
    '''厨师,生产包子放入队列'''
    while True:
        message.put(i)
def consumer(i):
    '''消费者,从队列中取包子吃'''
    while True:
        mes = message.get()

for i in range(12): #厨师的线程包子
    t = threading.Thread(target=producer, args=(i,))
    t.start()
for i in range(10): #消费者的线程 \ 吃包子.
    t = threading.Thread(target=consumer, args=(i,))
    t.start()





开始rabbitMQ


>>对于RabbitMQ来说,生产和消费不再针对 内存 里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。


一、最基本的生产者消费者


1.生产者代码

#!/usr/bin/env python3
#coding:utf8

import pika
# ######################### 生产者 #########################
#链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()        #创建频道
channel.queue_declare(queue='hello')  #创建一个队列名叫hello

channel.basic_publish(exchange='',       exchange -- 它使我们能够确切地指定消息应该到哪个队列去。
                  routing_key='hello',     队列 routing_key是队列名。
                  body='Hello World!')     body是要插入的内容
print("队列开始")

connection.close() #缓冲区已经flush而且消息已经确认发送到了RabbitMQ中,关闭链接





2.消费者代码-客户端

#!/usr/bin/env python
import pika
# ########################## 消费者 ##########################

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) #链接rabbit

channel = connection.channel() #创建频道


提示:如果生产者没有运行创建队列,那么消费者也许就找不到队列了。为了避免这个问题,
所以,消费者也可自行-
创建这个队列,,避免报错。
channel.queue_declare(queue='hello')

#接收消息需要使用callback这个函数来接收,它会被pika库来调用
def callback(ch, method, properties, body):
    print(" [x] 接受信息 %r" % body)

channel.basic_consume(callback,      #scallback
是回调函数 如果拿到数据 那么将执行callback函数
                      queue='hello',     #选择 操作的队列
                      no_ack=True)       #是否为了保证自己的消费端的数据安全,回复一个ack,MQ没有接收的话,会再次发送。此参数是不回复。
print('
  • 等待信息.退出 Ctrl+c')

    channel.start_consuming()  
    #永远循环等待数据处理和callback处理的数据







    二、acknowledgment 消息不丢失的方法。

          DSC0000.png




    • 消息确认(Message acknowledgment)







    • 背景:执行一个任务能消耗几秒. 你可能想知道当一个consumer在执行一个艰巨任务或执行到一半是死掉了会发生什么。就我们当前的代码而言,一旦RabbitMQ 的分发完消息给 consumer后 就立即从内存中移除该消息。这样的话,如果一个worker刚启动你就结束掉,那么消息就丢失了。


            那么所有发送给这个 worker 的还没有处理完成的消息也将丢失,但是我们不想丢失任何任务,如果worker死掉了,我们希望这个任务能够发送给其它的worker。

       


    • 实现:为了确保一个消息不会丢失,RabbitMQ支持消息的 ack nowlegements , 一个 ack(nowlegement) 是从consumer端发送一个回执去告诉RabbitMQ 消息已经接 收了、处理了,RabbitMQ可以释放并删除掉了。

       


    • 1、 如果一个consumer消费者】 死掉了(channel关闭、connection关闭、或者TCP连接断开了)而没有发送ack

              RabbitMQ 就会知道这个消息没有被完全处理并会重新发送到消息队列中,
             

    • 2、如果同时有另外一个consumer在线,将会很快转发到另外一个consumer中。 那样的话你就能确保虽然worker死掉,但消息不会丢失。

             这个是没有超时的,当消费方(consumer)死掉后RabbitMQ会重新转发消息,即使处理这个消息需要很长很长时间也没有问题。


    • 消息的 acknowlegments 默认是打开的,在前面的例子中关闭了: no_ack = True . 现在删除这个标识 然后 发送一个 acknowledgment







      当no-ack = False,如果 消费者遇到情况 (关闭通道,连接关闭或TCP连接丢失))挂掉了,
    那么,RabbitMQ会重新将该任务添加到队列中
    1. 生产者不变,但是还是复制上来吧。
    #coding:utf-8
    #!/usr/bin/env python
    import pika
    # ######################### 生产者 #########################

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) #链接rabbit服务器
    channel = connection.channel() #创建频道

    channel.queue_declare(queue='Myqueue') #创建一个队列名叫Myqueue

    channel.basic_publish(exchange='',   #向队列插入数值 routing_key是队列名 body是要插入的内容
                       routing_key='Myqueue',
                      
    body='Hello World!')
    print("开始队列")
    connection.close()







    2.消费者

      当生产者生成一条数据,被消费者接收,消费者中断后如果不超过10秒,连接的时候数据还在。
      当超过10秒之后,重新链接,数据将消失。消费者等待链接。
    #!/usr/bin/env python
    import pika

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) #链接rabbit
    channel = connection.channel() #创建频道

    channel.queue_declare(queue='Myqueue') #如果生产者没有运行创建队列,那么消费者创建队列

    def callback(ch, method, properties, body):
        print(" [x] 接收到 %r" % body)
        import time
        time.sleep(10)
        print ('ok')
        ch.basic_ack(delivery_tag = method.delivery_tag) #主要使用此代码

    channel.basic_consume(callback,
                          queue='Myqueue',
                          no_ack=False)

    print('
  • 等待接受信息... 退出按 ctrl+c ')
    channel.start_consuming()













    三、durable 消息不丢失 (消息持久化)



    • 消息持久化(Message durability)




    我们已经学习了即使客户端死掉了任务也不会丢失。但是如果RabbitMQ服务停止了的话,我们的任务还是会丢失。


    当RabbitMQ退出或宕掉的话将会丢失queues和消息信息,除非你进行设置告诉服务器队列不能丢失。要确保消息不会丢失需要做两件事


    我们需要将队列和消息标记为 durable.



    1、首先 MQ端配置:
         我们需要确保RabbitMQ 永远不会丢失队列,为了确保这个,我们需要定义队列为 durable:

    channel.queue_declare(queue='hello', durable=True

        尽管此命令本身定义是正确的,但我们设置后还是不会工作。因为我们已经定义了个名为 hello ,但不是durable属性的队列。




    • 1.1、RabbitMQ 不允许你重新定义一个已经存在、但属性不同的queue。RabbitMQ 将会给定义这个属性的程序返回一个错误。

    但这里有一个快速的解决方法:让我们定义个不同名称的队列,
         比如 task_queue:


    channel.queue_declare(queue='task_queue', durable=True)
           这个 queue_declare 需要在 生产者(producer) 和消费方(consumer) 代码中都进行设置。 基于这一点, 我们能够确保 task_queue 队列即使RabbitMQ重启也不会丢失。


    2、现在我们需要标记我们的消息为持久化的 - 通过设置 delivery_mode 属性为 2


    channel.basic_publish(exchange='',

                          routing_key="task_queue", #队列名
    body=message,
    properties=pika.BasicProperties(
    delivery_mode = 2,             # make message persistent
    ))


    消息持久化的注意点:

          标记消息为持久化的并不能完全保证消息不会丢失,尽管告诉 RabbitMQ 保存消息到磁盘,当RabbitMQ接收到消息还没有保存的时候仍然有一个短暂的时间窗口. RabbitMQ不会对每个消息都执行同步fsync(2) ---
       可能只是保存到缓存cache还没有写入到磁盘中,这个持久化保证不是很强,但这比我们简单的任务queue要好很多,如果你想很强的保证你可以使用 publisher confirms






    • 这个 queue_declare 需要在 生产者(producer) 和 消费方(consumer) 代码中都进行设置。

       1、生产者。

    #!/usr/bin/env python
    import pika

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) #链接rabbit服务器
    channel = connection.channel() #创建频道

    '''
      提示:两处都要声明下,1、在创建队列时候durable=True  2、在队列使用的参数中 delivery_mode=2
    '''
    channel.queue_declare(queue='Myqueue', durable=True)
                                        #创建队列,使用durable方法。
                                           #如果想让队列实现持久化那么加上durable=True

                                        
    channel.basic_publish(exchange='',
                          routing_key='Myqueue',
                          body='Hello World!',
                          properties=pika.BasicProperties(
                          delivery_mode=2,
                          #标记我们的消息为持久化的 - 通过设置 delivery_mode 属性为 2
                          #
    这样必须设置,让消息实现持久化
                          ))

    #这个exchange参数就是这个exchange的名字. 空字符串标识默认的或者匿名的exchange:
    # 如果存在routing_key, 消息路由到routing_key
    指定的队列中。
    print(" [x] 开始队列'")
    connection.close()








    2.消费者。
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel() #创建频道
    channel.queue_declare(queue='Myqueue', durable=True) #创建队列,使用durable方法

    def callback(ch, method, properties, body):
        print(" [x] 接受信息 %r" % body)
        import time
        time.sleep(10)
        print ('ok')
        ch.basic_ack(delivery_tag = method.delivery_tag)

        channel.basic_consume(callback,
                        queue='Myqueue',
                        no_ack=False)

        print('
  • 等待队列. 退出 CTRL+C')
        channel.start_consuming()









    四、消息获取顺序


    默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取奇数序列的任务,消费者1去队列中获取偶数序列的任务。
    channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列。


    1、生产者。

    import pika
    import sys  

    connection = pika.BlockingConnection(pika.ConnectionParameters(  
        host='localhost'))  
    channel = connection.channel()  

    channel.queue_declare(queue='task_queue', durable=True) # 设置队列为持久化的队列
    message = ' '.join(sys.argv[1:]) or "Hello World!"  
    channel.basic_publish(exchange='',  
                         routing_key='task_queue',  
                         body=message,  
                         properties=pika.BasicProperties(  
                         delivery_mode = 2, #设置消息为持久化的
                      ))  
    print(" [x] Sent %r" % message)  
    connection.close()  




    2、消费者。
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='task_queue',durable=True) # 设置队列持久化

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        import time
        time.sleep(10)
        print ('ok')
        ch.basic_ack(delivery_tag = method.delivery_tag)

    channel.basic_qos(prefetch_count=1)
                                   #消息未处理完前不要发送信息的消息
                                      #
    表示谁来谁取,不再按照奇偶数排列

    channel.basic_consume(callback,
                      queue='task_queue',
                      no_ack=False)

    print('
  • Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()







































    来自为知笔记(Wiz)

  • 运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
    2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
    3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
    4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
    5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
    6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
    7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
    8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

    所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390761-1-1.html 上篇帖子: rabbitmq(1)-入门 下篇帖子: .NET中RabbitMQ的使用
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    扫码加入运维网微信交流群X

    扫码加入运维网微信交流群

    扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

    扫描微信二维码查看详情

    客服E-mail:kefu@iyunv.com 客服QQ:1061981298


    QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


    提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


    本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



    合作伙伴: 青云cloud

    快速回复 返回顶部 返回列表