设为首页 收藏本站
查看: 1608|回复: 0

[经验分享] RabbitMQ基本用法、消息分发模式、消息持久化、广播模式

[复制链接]

尚未签到

发表于 2017-12-8 22:41:48 | 显示全部楼层 |阅读模式
RabbitMQ基本用法


  • 进程queue用于同一父进程创建的子进程间的通信
  • 而RabbitMQ可以在不同父进程间通信(例如在word和QQ间通信)
示例代码


  • 生产端(发送)  
    ```python
      
    import pika

  connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

相当于建立一个socket,连接本地的RabbitMQ,默认端口:5672
  channel = connection.channel()#声明一个通信管道(信道)

在管道里什么一个queue
  channel.queue_declare(queue='hello')#声明一个名称为hello的queue

通过管道发送消息
  channel.basic_publish(exchange='',
  
routing_key='hello',#queue的名字
  
body='Hellow Word!')#消息主体
  
connection.close()#关闭连接
  
- 消费端(接收)python
  
import pika
  connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
  
channel = connection.channel()

不确定生产端或消费端谁先运行,因此为了避免错误,消费端也要申请一个queue

消费端先运行,如果没申请这个queue,生产端还没建立这个queue,因此报错
  channel.queue_declare(queue='hello')
  def callback(ch,method,properties,body):
  
#ch,管道(信道)channel的内存地址
  
#method,设置的一些基本信息
  
#properties,
  
#body,消息主体,二进制数据
  
print(ch,method,properties)
  
print('[x] Received %r'%body)

声明要收消息
  channel.basic_consume(
  
callback,#如果收到消息就调用回调函数处理消息
  
queue='hello',#queue的名字
  
no_ack=True#不确认,是否处理完callback,给rabbitmq返回确认信息
  
)

开始收消息
  channel.start_consuming()#开启后一直收消息,没消息则卡住
  
```

消息分发

RabbitMQ消息分发(一对多)


  • 一个生产者,多个消费者
  • 多个消费者时,是轮询机制,依次分发给消费者。(每个消费者按顺序依次消费)
no_act设置是否确认消息处理完


  • 设置no_act = True,消费者不发送确认信息,RabbitMQ从发送消息队列后,不管消费者是否处理完,删除queue
  • 设置no_act = False,RabbitMQ等待消费者的callback处理完,发送确认信息,如果此时消费者down了,则Rabbit把消息轮询发送给下一个消费者,等待确认才会删除queue
  • 去掉no_act = True(默认为False),需要在回调函数中新增代码,手动向RabbitMQ发送确认信息
  • ch.basic_ack(delivery_tag=method.delivery_tag)
消息持久化

rabbitmq目录下启动cmd,命令:rabbitmqctl.bat list_queues查看当前queue列表

当我们需要消息不会丢失(RabbitMQ server宕机时),需要进行消息持久化


  • 1、在申明队列是加上参数使其持久化,生产者和消费者都需要申明
  • channel.queue_declare(queue='hello',durable=True)队列持久化
  • 2、在生产端发送消息函数时加入参数使消息持久化
  •   消息持久化
    channel.basic_publish(  exchange='',
      routing_key='hello',#queue的名字
      body='Hellow Word!'
      porperties=pika.BasicProperties(
      delivery_mode=2#使队列里的消息持久化
      )
      
    )#消息主体

广播模式

消息公平分发


  • 如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。


  • 在消费端channel.basic_consume()函数前新增一条代码
  • channel.basic_qos(prefetch_count=1)
  • 解释:如果有2个消费者(a,b),a处理消息比较慢,b比较快;RabbitMQ是轮询发送消息,依次给a一条,给b一条,再给a.....。当在消费者端设置以上代码时,a还在处理,那么RabbitMQ不会给a发送,只会给b
广播模式(消息是实时的,发送时没有启动接收端,消息丢失)


  • 1、发送端将消息发送到RabbitMQ的消息转发器(exchange)
  • 2、转发器(Exchange)遍历所有绑定它的queue,将消息广播给queue
  • 3、接收端从queue里获取接收消息
  • 4、使用此queue的消费者断开后,此queue删除


  • 设置exchange转发器
  • Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
  • fanout: 所有bind到此exchange的queue都可以接收消息
  • direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
  • topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
给所有bind此exchange的发送消息


  • 发送端  
    ```python
      
    import pika
      
    import sys

  connection = pika.BlockingConnection(pika.ConnectionParameters(
  
host='localhost'))
  
channel = connection.channel()
  channel.exchange_declare(exchange='logs',
  
type='fanout')#广播模式,不用申明queue指定queue名

设置exchange为fanout模式
  message = ' '.join(sys.argv[1:]) or "info: Hello World!"
  
channel.basic_publish(exchange='logs',
  
routing_key='',
  
body=message)
  
print(" [x] Sent %r" % message)
  
connection.close()
  
- 接收端python
  
import pika
  connection = pika.BlockingConnection(pika.ConnectionParameters(
  
host='localhost'))
  
channel = connection.channel()
  channel.exchange_declare(exchange='logs',
  
type='fanout')
  result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
  
queue_name = result.method.queue#拿到这个随机分配的queue名
  
channel.queue_bind(exchange='logs',#绑定发送端的exchange
  
queue=queue_name)
  print('
  • Waiting for logs. To exit press CTRL+C')
      def callback(ch, method, properties, body):
      
    print(" [x] %r" % body)
      channel.basic_consume(callback,
      
    queue=queue_name,
      
    no_ack=True)
      channel.start_consuming()
      
    ```

    有选择的广播(接受者过滤接收消息exchange type=direct)


    • 生产者  
      ```python
        
      import pika
        
      import sys

      connection = pika.BlockingConnection(pika.ConnectionParameters(
      
    host='localhost'))
      
    channel = connection.channel()
      channel.exchange_declare(exchange='direct_logs',
      
    type='direct')
      severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
      
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
      
    channel.basic_publish(exchange='direct_logs',
      
    routing_key=severity,
      
    body=message)
      
    print(" [x] Sent %r:%r" % (severity, message))
      
    connection.close()
      
    ```


    • 消费者
    import pika  
    import sys
      

      
    connection = pika.BlockingConnection(pika.ConnectionParameters(
      host='localhost'))
      
    channel = connection.channel()
      

      
    channel.exchange_declare(exchange='direct_logs',
      type='direct')
      

      
    result = channel.queue_declare(exclusive=True)
      
    queue_name = result.method.queue
      

      
    severities = sys.argv[1:]
      
    if not severities:
      sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
      sys.exit(1)
      

      
    for severity in severities:
      channel.queue_bind(exchange='direct_logs',
      queue=queue_name,
      routing_key=severity)
      

      
    print('
  • Waiting for logs. To exit press CTRL+C')
      

      
    def callback(ch, method, properties, body):
      print(" [x] %r:%r" % (method.routing_key, body))
      

      
    channel.basic_consume(callback,
      queue=queue_name,
      no_ack=True)
      

      
    channel.start_consuming()

    细致的消息过滤()


    • 生产者
    import pika  
    import sys
      

      
    connection = pika.BlockingConnection(pika.ConnectionParameters(
      host='localhost'))
      
    channel = connection.channel()
      

      
    channel.exchange_declare(exchange='topic_logs',
      type='topic')
      

      
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
      
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
      
    channel.basic_publish(exchange='topic_logs',
      routing_key=routing_key,
      body=message)
      
    print(" [x] Sent %r:%r" % (routing_key, message))
      
    connection.close()


    • 消费者
    • 参数为#,不过滤收所有
    • mysql.*,收所有mysql开头的消息
      

    import pika  
    import sys
      

      
    connection = pika.BlockingConnection(pika.ConnectionParameters(
      host='localhost'))
      
    channel = connection.channel()
      

      
    channel.exchange_declare(exchange='topic_logs',
      type='topic')
      

      
    result = channel.queue_declare(exclusive=True)
      
    queue_name = result.method.queue
      

      
    binding_keys = sys.argv[1:]
      
    if not binding_keys:
      sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
      sys.exit(1)
      

      
    for binding_key in binding_keys:
      channel.queue_bind(exchange='topic_logs',
      queue=queue_name,
      routing_key=binding_key)
      

      
    print('
  • Waiting for logs. To exit press CTRL+C')
      

      
    def callback(ch, method, properties, body):
      print(" [x] %r:%r" % (method.routing_key, body))
      

      
    channel.basic_consume(callback,
      queue=queue_name,
      no_ack=True)
      

      
    channel.start_consuming()
      

  • 运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
    2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
    3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
    4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
    5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
    6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
    7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
    8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

    所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-422281-1-1.html 上篇帖子: BotVS开发基础—2.3 下市价单 交易 下篇帖子: Rabbitmq(一)
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    扫码加入运维网微信交流群X

    扫码加入运维网微信交流群

    扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

    扫描微信二维码查看详情

    客服E-mail:kefu@iyunv.com 客服QQ:1061981298


    QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


    提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


    本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



    合作伙伴: 青云cloud

    快速回复 返回顶部 返回列表