设为首页 收藏本站
查看: 1210|回复: 0

[经验分享] python 与rabbitmq

[复制链接]

尚未签到

发表于 2017-7-2 18:06:21 | 显示全部楼层 |阅读模式
一、rabbitmq简介、安装
  简介:
  MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
  RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现的产品,遵循Mozilla Public License开源协议,RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收messag。
  安装(linux)


1、安装erlang

以root身份执行下面命令



yum install erlang xmlto



2、安装epel源

rpm -ivh http://download.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm

wget -O /etc/yum.repos.d/epel-erlang.repo http://repos.fedorapeople.org/repos/peter/erlang/epel-erlang.repo



3、安装rabbitmq rpm包

wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.1.5/rabbitmq-server-3.1.5-1.noarch.rpm     

rpm -ivh  rabbitmq-server-3.1.5-1.noarch.rpm



4、启动rabbitmq,并验证启动情况

rabbitmq-server --detached &ps aux |grep rabbitmq



5、以服务的方式启动

service rabbitmq-server start



6、检查端口5672是否打开

/sbin/iptables -I INPUT -p tcp --dport 5672 -j ACCEPT

/etc/rc.d/init.d/iptables save

/etc/init.d/iptables restart     

/etc/init.d/iptables status



7、启用维护插件

rabbitmq-plugins enable rabbitmq_management



8、重启rabbitmq

service rabbitmq-server restart



9、登录

http://192.168.110.60:15672/ 用户名密码 guest



无法登陆解决办法

vim /etc/rabbitmq/rabbitmq.config

写入信息,并保存

[{rabbit, [{loopback_users, []}]}].





其他相关:



1、服务器启动与关闭

启动:service rabbitmq-server start

关闭:service rabbitmq-server stop

重启:service rabbitmq-server restart



2、用户管理

新增 rabbitmqctl add_user admin admin

删除 rabbitmqctl delete_user admin

修改 rabbitmqctl change_password admin admin123



用户列表 rabbitmqctl  list_users

设置角色 rabbitmqctl set_user_tags admin administrator monitoring policymaker management



设置用户权限 rabbitmqctl  set_permissions  -p  VHostPath  admin  ConfP  WriteP  ReadP

查询所有权限 rabbitmqctl  list_permissions  [-p  VHostPath]

指定用户权限 rabbitmqctl  list_user_permissions  admin

清除用户权限 rabbitmqctl  clear_permissions  [-p VHostPath]  admin



tips:

设置远程用户密码


创建一个admin用户:rabbitmqctl add_user admin 1234qwer
设置该用户为administrator角色:rabbitmqctl set_user_tags admin administrator
设置权限:rabbitmqctl  set_permissions  -p  '/'  admin '.' '.' '.'
重启rabbitmq服务: service rabbitmq-server restart



二、rabbitmq python API


详细的api请查看rabbitmq官网:http://www.rabbitmq.com/devtools.html

安装:pip install pika

1.简单的消费者生产者模型

DSC0000.png

生产者:



DSC0001.gif DSC0002.gif


#!/usr/bin/env python3
#_*_ coding:utf-8 _*_
#Author:wd
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='10.0.0.241',port=5672)) #创建连接
channel = connection.channel()#建立管道

channel.queue_declare(queue='hello')#声明queue
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" Sent 'Hello World!'")
connection.close()
product  消费者:





#!/usr/bin/env python3
#_*_ coding:utf-8 _*_
#Author:wd
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='10.0.0.241',port=5672))#建立连接
channel = connection.channel()#建立管道

channel.queue_declare(queue='hello')#声明从那个管道接受消息

def callback(ch, method, properties, body):#回调函数,收到消息后执行的函数,body指消息主题
print(" [x] Received %r" % body)

channel.basic_consume(callback,
queue='hello',
no_ack=True) #如果设置no_ack=Flase,会把消费的消息重写添加到队列中
print('
  • Waiting for messages.')
    channel.start_consuming()#阻塞模式
    consumer  2.work模式(轮询)


    • 在这种模式下,RabbitMQ会默认把p发的消息依次分发给连接该条队列的各个消费者(c),跟负载均衡差类似,如果在消费者段设置了no_ack=Flase(默认),也就是确认消息,如果在回调函数中不手动进行确认,那么该消息将一直存在,此时我们需要在回调函数周手动确认消息接收完毕,此时队列中的消息才会被删除。
    • 假如消费者处理消息需要15秒,当消费者断开了,那这个消息处理明显还没处理完,并设置了no_ack=Flase(默认),此时该条消息会发给下一个消费者。


    • 上面的效果消费端断了就转到另外一个消费端去了,但是生产者怎么知道消费端断了呢? 因为生产者和消费者是通过socket连接的,socket断了,就说明消费端断开了。
    DSC0003.png

      生产者:





    #!/usr/bin/env python3
    #_*_ coding:utf-8 _*_
    #Author:wd
    import pika
    import time
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    '10.0.0.241'))
    channel = connection.channel()
    # 声明queue
    channel.queue_declare(queue='task_queue')

    message = "Hello World! %s" % time.time()
    channel.basic_publish(exchange='',
    routing_key='task_queue',
    body=message,
    )
    print(" [x] Sent %r" % message)
    connection.close()
    product  消费者:





    #!/usr/bin/env python3
    #_*_ coding:utf-8 _*_
    #Author:wd
    import pika, time
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    '10.0.0.241'))
    channel = connection.channel()
    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(10)
    print(" [x] Done")
    print("method.delivery_tag", method.delivery_tag)
    ch.basic_ack(delivery_tag=method.delivery_tag)#主动向服务器发确认消息,此时delivery_tag为消费消息的tag号

    channel.basic_consume(callback,
    queue='task_queue',
    # no_ack=True 如果在回掉函数中手动确认必须把no_ack设置为Flase或者不带该参数
                          )
    print('
  • Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    consumer  公平的分发消息:
      在实际的应用中,每个客户端的消费消息的能力是不一样的,如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。如下图:
    DSC0004.png

      消费者:





    #!/usr/bin/env python3
    #_*_ coding:utf-8 _*_
    #Author:wd
    import pika
    import time
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='10.0.0.241'))
    channel = connection.channel()
    channel.queue_declare(queue='task_queue')
    print('
  • Waiting for messages. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_qos(prefetch_count=1)#设置消费的条数为1,当当前消费者有一条消息未消费完时,该消费者不会主动接受消息了。
    channel.basic_consume(callback,
    queue='task_queue')
    channel.start_consuming()
    按消费能力接受消息
    三、消息持久化
      当rabbitmq队列中有很多消息,此时rabbitmq server宕机了,会导致数据丢下,那么如何将消息进行持久化呢。分两步:
      1.持久化管道:
      在生产者和消费者两端声明管道时候加参数:
      channel.queue_declare(queue='hello2', durable=True)
      2.持久化消息:
      在生产者端设置properties参数:
      properties=pika.BasicProperties( delivery_mode=2, )# 消息持久化
      完整的demo:
      生产者:





    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost',5672))  # 默认端口5672,可不写
    channel = connection.channel()
    #声明queue
    channel.queue_declare(queue='hello2', durable=True)  
    channel.basic_publish(exchange='',
    routing_key='hello2',
    body='Hello World!',
    properties=pika.BasicProperties(
    delivery_mode=2,  # make message persistent
                              )
    )
    print(" [x] Sent 'Hello World!'")
    connection.close()
    product  消费者:





    import pika
    import time
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='hello2', durable=True)
    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(10)
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 告诉生产者,消息处理完成

    channel.basic_qos(prefetch_count=1)  # 类似权重,按能力分发,如果有一个消息,就不在给你发
    channel.basic_consume(  # 消费消息
    callback,  # 如果收到消息,就调用callback
    queue='hello2',
    # no_ack=True  # 一般不写,处理完接收处理结果。宕机则发给其他消费者
                          )
    print('
  • Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    consumer
    四、rabbitmq发布/订阅的三种模式
      之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,定义的类型有三种:


    • fanout: 所有绑定到此exchange的queue都可以接收消息
    • direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
    • topic: 所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
      TIPS:以上三种模式都是广播形式,时时接收,如果消费者不在线该条消息将不会再次接收,类似收音机。
      1.fanout
      fanout模式是纯广播模式,所有绑定了相同的exchange的消费者都能收到来自生产者的一条消息,收取消息时需要queue和exchange绑定,因为消费者不是和exchange直连的,消费者是连在queue上,queue绑定在exchange上,消费者只会在queu里收消息。如下图:
    DSC0005.png

      demo:
      发布者:



    import pika
    import sys
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()
    # 注意:这里是广播,不需要声明queue
    channel.exchange_declare(exchange='logs',  # 声明广播管道
    type='fanout')
    # message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    message = "info: Hello World!"
    channel.basic_publish(exchange='logs',
    routing_key='',  # 注意此处空,必须有
    body=message)
    print(" [x] Sent %r" % message)
    connection.close()
      订阅者:



    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange='logs',
    type='fanout')
    # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    result = channel.queue_declare(exclusive=True)
    # 获取随机的queue名字
    queue_name = result.method.queue
    print("random queuename:", queue_name)
    channel.queue_bind(exchange='logs',  # queue绑定到转发器上
    queue=queue_name)
    print('
  • Waiting for logs. To exit press CTRL+C')
    def callback(ch, method, properties, body):
    print(" [x] %r" % body)
    channel.basic_consume(callback,
    queue=queue_name,
    )
    channel.start_consuming()
      2.direct模式
      RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列,此时的关键字由参数routing_key指定。模式如下图:
    DSC0006.png

      发布者:



    #!/usr/bin/env python3
    #_*_ coding:utf-8 _*_
    #Author:wd
    import pika
    from random import randint
    credentials = pika.PlainCredentials('admin','1234qwer')#使用用户名密码连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='10.0.0.241',port=5672,virtual_host='/',credentials=credentials))
    channel = connection.channel()
    channel.exchange_declare(exchange='direct_logs',
    type='direct')#声明type类型

    index=randint(0,3)
    log_level=['info','wraning','error','nothing']
    message ='{}--->Hello World!'.format(log_level[index])
    channel.basic_publish(exchange='direct_logs',
    routing_key=log_level[index],   #发消息随机绑定一个关键字
    body=message)
    print(" [x] Sent %r:%r" % (log_level[index], message))
      订阅者:



    #!/usr/bin/env python3
    #_*_ coding:utf-8 _*_
    #Author:wd
    import pika
    credentials = pika.PlainCredentials('admin','1234qwer')#使用用户名密码连接
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='10.0.0.241',port=5672,virtual_host='/',credentials=credentials))
    channel = connection.channel()
    channel.exchange_declare(exchange='direct_logs',
    type='direct')
    result = channel.queue_declare(exclusive=True)#随机生成队列名字,断开后删除
    queue_name = result.method.queue
    # 获取运行脚本所有的参数

    channel.queue_bind(exchange='direct_logs',
    queue=queue_name,
    routing_key='info')#只绑定了info关键字,接受只接受info关键字的消息
    print('
  • Waiting for logs. To exit press CTRL+C')
    def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
    channel.basic_consume(callback,
    queue=queue_name,
    )
    channel.start_consuming()
      3.topic(主题)模式
      topic相比于dirct而言,提供了更为详细的消息接受规则,可使用*、#等来匹配关键字来接受消息。
      发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节。
    绑定键和选择键的形式一样。主题类型的转发器背后的逻辑和直接类型的转发器很类似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。
    *可以匹配一个标识符。
    #可以匹配0个或多个标识符。
      例如:#.a会匹配a.a,aa.a,aaa.a等
              *.a会匹配a.a,b.a,c.a等
    注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
      topic消费模式如下图:
    DSC0007.png

      demo:
      发布者:



    #!/usr/bin/env python3
    #_*_ coding:utf-8 _*_
    #Author:wd
    import pika
    import sys
    credentials = pika.PlainCredentials('admin','1234qwer')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='10.0.0.241',port=5672,virtual_host="/",credentials=credentials))
    channel = connection.channel()
    channel.exchange_declare(exchange='topic_logs',
    type='topic')
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='topic_logs',
    routing_key=routing_key,
    body=message)
    print(" [x] Sent %r:%r" % (routing_key, message))
    connection.close()
      订阅者:



    #!/usr/bin/env python3
    #_*_ coding:utf-8 _*_
    #Author:wd
    import pika
    import sys
    credentials = pika.PlainCredentials('admin','1234qwer')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='10.0.0.241',port=5672,virtual_host="/",credentials=credentials))
    channel = connection.channel()
    channel.exchange_declare(exchange='topic_logs',
    type='topic')
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    binding_keys = sys.argv[1:]
    if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)
    for binding_key in binding_keys:#循环绑定routing_key,如果绑定*.info,就接受以.info结尾的routing_key所发的消息。
    channel.queue_bind(exchange='topic_logs',
    queue=queue_name,
    routing_key=binding_key)
    print('
  • Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,
    queue=queue_name,
    )
    channel.start_consuming()

    五、rabbitmq应用场景(简单RPC)
      RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。真正的RPC有更为标准的定义,这里我们可以使用rabbitmq来实现简单的RPC模型,其原理图如下:
    DSC0008.png

      上述图中,client和server对于rabbitmq来说都具有两个角色,即:即是生产者又是消费者。client端通过生产者角色发送命令,服务端此时充当消费者接受客户端的命令消息,当接受到消息以后又以生产者角色发送命令结果给客户端,此时客户端是消费者接受客户端的消息。
      过程:


    • 客户端 Client 设置消息的 routing key 为 Service 的队列 op_q,设置消息的 reply-to 属性为返回的 response 的目标队列 reponse_q,设置其 correlation_id 为以随机UUID,然后将消息发到 exchange。比如channel.basic_publish(exchange='', routing_key='op_q', properties=pika.BasicProperties(reply_to = reponse_q, correlation_id = self.corr_id),body=request)
    • Exchange 将消息转发到 Service 的 op_q
    • Service 收到该消息后进行处理,然后将response 发到 exchange,并设置消息的 routing_key 为原消息的 reply_to 属性,以及设置其 correlation_id 为原消息的 correlation_id
    • ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response))Exchange 将消息转发到 reponse_q


    • Client 逐一接受 response_q 中的消息,检查消息的 correlation_id 是否为等于它发出的消息的correlation_id,是的话表明该消息为它需要的response。
      代码实现:
      clinet:



    import pika
    import uuid

    class FibonacciRpcClient(object):
    def __init__(self):
    self.connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    self.channel = self.connection.channel()
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    self.channel.basic_consume(self.on_response, no_ack=True,
    queue=self.callback_queue)
    def on_response(self, ch, method, props, body):
    if self.corr_id == props.correlation_id:
    self.response = body
    def call(self, n):
    self.response = None
    self.corr_id = str(uuid.uuid4())
    self.channel.basic_publish(exchange='',
    routing_key='rpc_queue',
    properties=pika.BasicProperties(
    reply_to = self.callback_queue,
    correlation_id = self.corr_id,
    ),
    body=str(n))
    while self.response is None:
    self.connection.process_data_events()
    return int(self.response)

    fibonacci_rpc = FibonacciRpcClient()

    print(" [x] Requesting fib(30)")
    response = fibonacci_rpc.call(30)
    print(" [.] Got %r" % response)
      server:



    import pika
    import time
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))

    channel = connection.channel()

    channel.queue_declare(queue='rpc_queue')

    def fib(n):
    if n == 0:
    return 0
    elif n == 1:
    return 1
    else:
    return fib(n-1) + fib(n-2)

    def on_request(ch, method, props, body):
    n = int(body)
    print(" [.] fib(%s)" % n)
    response = fib(n)
    ch.basic_publish(exchange='',
    routing_key=props.reply_to,
    properties=pika.BasicProperties(correlation_id = \
    props.correlation_id),
    body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(on_request, queue='rpc_queue')

    print(" [x] Awaiting RPC requests")
    channel.start_consuming()

  • 运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
    2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
    3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
    4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
    5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
    6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
    7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
    8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

    所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390531-1-1.html 上篇帖子: rabbitmq学习笔记 下篇帖子: Python的平凡之路(11)
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    扫码加入运维网微信交流群X

    扫码加入运维网微信交流群

    扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

    扫描微信二维码查看详情

    客服E-mail:kefu@iyunv.com 客服QQ:1061981298


    QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


    提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


    本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



    合作伙伴: 青云cloud

    快速回复 返回顶部 返回列表