设为首页 收藏本站
查看: 1299|回复: 0

[经验分享] python RabbitMQ广播

[复制链接]
累计签到:2 天
连续签到:1 天
发表于 2017-12-8 22:24:57 | 显示全部楼层 |阅读模式


消息公平分发
  如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
DSC0000.png

  消费者端添加



channel.basic_qos(prefetch_count=1)

  带消息持久化+公平分发的完整代码
  生产者



import pika
#相当于声明一个socket
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = conn.channel()
#声明queue
channel.queue_declare(queue='hello',durable=True)
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2,
)
) # routing_key 消息的key  body 消息的内容
print('Sent "hello world"')
conn.close()

  消费者



import pika
#相当于声明一个socket
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

#声明一个管道
channel = conn.channel()
#声明queue  这里可以不用声明,但是如果消费者先运行,又不希望出错,就要消费者先运行
channel.queue_declare(queue='hello',durable=True)
def callback(ch,method,properties,body):
print('[x] Received %r' % body )
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='hello' ) #消费消息 如果收到消息就调用CALLBACK函数处理
print('
  • Waiting for message.To exit press CTRL+C')
    channel.start_consuming() #开始收消息

      消费者2



    import pika,time
    #相当于声明一个socket
    conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    #声明一个管道
    channel = conn.channel()
    #声明queue  这里可以不用声明,但是如果消费者先运行,又不希望出错,就要消费者先运行
    channel.queue_declare(queue='hello1',durable=True)
    def callback(ch,method,properties,body):
    time.sleep(30)
    print('[x] Received %r' % body )
    ch.basic_ack(delivery_tag=method.delivery_tag)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback, queue='hello1' ) #消费消息 如果收到消息就调用CALLBACK函数处理
    print('
  • Waiting for message.To exit press CTRL+C')
    channel.start_consuming() #开始收消息

      消费者2 CALLBACK中sleep30s 在这个过程中 将不会接收到消息,那么消息会发送到消费者1
    DSC0001.png

    DSC0002.png


    Publish\Subscribe(消息发布\订阅) 
      之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,
      exchange是一个简单的东西,一端接收消息,另一端将消息推送到队列。exchange类型将决定这条消息是放到一个队列,还是很多队列,还是被删除。exchange 就像转发器。
      exchange的类型:
      fanout: 所有bind到此exchange的queue都可以就收消息。 纯广播 ,只要绑定exchange就可以接收到。
      direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息。
      topic 所有符合routingkey(此时可以是一个表达式)的routingkey所bind的queue可以接收消息。
      headers 通过headers 来决定把消息发给哪些queue。
    DSC0003.png


    消息publisher 生产者



    import pika
    import sys
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()
    #发布方不需要声明queue 只需要有个exchange就可以了 , exchange_type 类型是 fanout
    channel.exchange_declare(exchange='logs',exchange_type='fanout')
    #message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    message = "info: Hello World!"
    channel.basic_publish(exchange='logs',  # 发布广播的时候 exchange定义要一致
    routing_key='',   #必须要写成空
    body=message)     #发送的消息主体
    print(" [x] Sent %r" % message)
    connection.close()

    消费者



    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange='logs', exchange_type='fanout') #exchange 转发器,exchange_type=fanout 绑定到这个转发器上的消费者都能接收到消息
    result = channel.queue_declare(exclusive=True)  # 随机queue 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    queue_name = result.method.queue #获取queueu的名字
    channel.queue_bind(exchange='logs',  #绑定到这个转发器上,只能从这个转发器上接收
    queue=queue_name)  #指定queue的名字 转发器把消息发送到这个queue上,消费者从这个queue上接收消息
    print('
  • Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r" % body)

    channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)
    channel.start_consuming()

      发送接收过程:
      消费者要先在线,生产者发送消息,消费者才能接收到。
      生产者
    DSC0004.png

      消费者1在线
    DSC0005.png

      消费者2在线
    DSC0006.png

      消费者3中断重新连接
    DSC0007.png

      广播并不会存下来,不在线就接收不到了。

    有选择的接收消息(exchange type=direct) 
      RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
    DSC0008.png


    publisher



    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs',
    type='direct')

    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='direct_logs',
    routing_key=severity,
    body=message)
    print(" [x] Sent %r:%r" % (severity, message))
    connection.close()

    subscriber



    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs',
    type='direct')

    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue

    severities = sys.argv[1:]
    if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

    for severity in severities:
    channel.queue_bind(exchange='direct_logs',
    queue=queue_name,
    routing_key=severity)

    print('
  • Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)

    channel.start_consuming()

      服务器端
    DSC0009.png

      消费者端
    DSC00010.png

      绑定哪个就能接收的哪个的消息。

    更细致的消息过滤(exchange type=topic)
      上面的过滤条件是写死的,更细致的过滤条件就是在上面的基础上,对过滤参数的匹配。类似与正则匹配。
    DSC00011.png

      publisher



    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='topic_logs',
    type='topic')

    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='topic_logs',
    routing_key=routing_key,
    body=message)
    print(" [x] Sent %r:%r" % (routing_key, message))
    connection.close()

      subscriber



    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='topic_logs',
    type='topic')

    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue

    binding_keys = sys.argv[1:]
    if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

    for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
    queue=queue_name,
    routing_key=binding_key)

    print('
  • Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)

    channel.start_consuming()

    DSC00012.png

    DSC00013.png

  • 运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
    2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
    3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
    4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
    5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
    6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
    7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
    8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

    所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-422274-1-1.html 上篇帖子: RabbitMQ学习之spring-amqp的重要类的认识 下篇帖子: Cannot save rules; there is insufficient space
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    扫码加入运维网微信交流群X

    扫码加入运维网微信交流群

    扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

    扫描微信二维码查看详情

    客服E-mail:kefu@iyunv.com 客服QQ:1061981298


    QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


    提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


    本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



    合作伙伴: 青云cloud

    快速回复 返回顶部 返回列表