设为首页 收藏本站
查看: 2063|回复: 0

[经验分享] python的学习第十一天 rabbit redis

[复制链接]

尚未签到

发表于 2017-7-2 13:09:15 | 显示全部楼层 |阅读模式
rabbit
  一个简单的实例
DSC0000.png


send端,生产端



#!/usr/bin/env python
# -*- coding:utf-8 -*-
__author__ = "tao"
import pika,time
# connection = pika.BaseConnection(pika.ConnectionParameters('192.168.142.129'))  #本地连接
credentials = pika.PlainCredentials('admin', 'admin')   #用户名和密码
# 这里可以连接远程IP,请记得打开远程端口
parameters = pika.ConnectionParameters('192.168.142.129', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()   #声明一个管道
# 声明queue
channel.queue_declare(queue='hello')
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='hello', #queue的名字
body='你好 美女!')  #消息内容
print(" [x] Sent '你好世界!'")
connection.close()   #直接关闭队列
receive端 消费端



#!/usr/bin/env python
# -*- coding:utf-8 -*-
__author__ = "tao"
import pika
# connection = pika.BaseConnection(pika.ConnectionParameters('192.168.142.129'))  #本地连接
credentials = pika.PlainCredentials('admin', 'admin')   #用户名和密码
# 这里可以连接远程IP,请记得打开远程端口
parameters = pika.ConnectionParameters('192.168.142.129', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()   #声明一个管道
#原因再次声明的:
#You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='hello') #
def callback(ch, method, properties, body):
#处理消息的方法
print(ch,method,properties)
print(" [x] Received %r" % body)

channel.basic_consume(callback,   #消费消息
queue='hello',
no_ack=True)
print('
  • Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    工作模式
      默认轮询,有确认机制,no_ack=Fals 打开True信息就会丢失,在这种模式下,RabbitMQ会默认把p发的消息公平的依次分发给各个消费者(c),跟负载均衡差不多
    DSC0001.png

      测试可以在callback方法里面定义sleep时间进行宕机丢失的测试



    def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(10)              # 模拟处理时间,这样就可以宕机消息丢失的测试了
    print("Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)   #手动确认后才不丢失,确认
      如果rabbit服务挂掉就管道都丢了要持久化



    channel.queue_declare(queue='hello', durable=True) #声明queue的时候就声明持久化,队列持久了,消息还没持久化


    channel.basic_publish(exchange='',
    routing_key="task_queue",
    body=message,
    properties=pika.BasicProperties(
    delivery_mode = 2, #  这样就把消息也持久化了
    ))
      在rabbit服务器上直接运行 rabbitmqctl list_queues 可以查看队列和消息的信息
      消息的公平分发
      如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
    DSC0002.png

      生产者:



    #!/usr/bin/env python
    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='task_queue', durable=True)

    message = ' '.join(sys.argv[1:]) or "Hello World!"
    channel.basic_publish(exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
    delivery_mode = 2, # make message persistent
                          ))
    print(" [x] Sent %r" % message)
    connection.close()
      消费者:



    #!/usr/bin/env python
    import pika
    import time

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='task_queue', durable=True)
    print('
  • Waiting for messages. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback,
    queue='task_queue')

    channel.start_consuming()

    Publish\Subscribe(消息发布\订阅)
      Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
      fanout: 所有bind到此exchange的queue都可以接收消息
    direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
    topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

       表达式符号说明:#代表一个或多个字符,*代表任何字符
          例:#.a会匹配a.a,aa.a,aaa.a等
              *.a会匹配a.a,b.a,c.a等
         注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 
      headers: 通过headers 来决定把消息发给哪些queue
    DSC0003.png

      http://www.rabbitmq.com/tutorials/tutorial-three-python.html  官网的介绍
      广播是实时的和收音机的广播一样



    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    __author__ = "tao"
    import pika
    credentials = pika.PlainCredentials('admin', 'admin')
    # 这里可以连接远程IP,请记得打开远程端口
    parameters = pika.ConnectionParameters('192.168.142.129', 5672, '/', credentials)
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    channel.exchange_declare(exchange='logs',type='fanout')
    result = channel.queue_declare(exclusive=True)  #自动生成随机queue
    queue_name = result.method.queue
    print('random queuename',queue_name)
    channel.queue_bind(exchange='logs',queue=queue_name)
    print("发送完成")
    connection.close()


    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='logs',
    type='fanout')

    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    channel.basic_publish(exchange='logs',
    routing_key='',    #必须有个空
    body=message)
    print(" [x] Sent %r" % message)
    connection.close()
      routing模式:
    DSC0004.png

      RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
      生产者:



    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs',
    type='direct')

    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='direct_logs',
    routing_key=severity,
    body=message)
    print(" [x] Sent %r:%r" % (severity, message))
    connection.close()
      消费者:



    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs',
    type='direct')

    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue

    severities = sys.argv[1:]
    if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

    for severity in severities:
    channel.queue_bind(exchange='direct_logs',
    queue=queue_name,
    routing_key=severity)

    print('
  • Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)

    channel.start_consuming()
    Topic exchange
    DSC0005.png

      更细致的消息过滤,关键字等
      生产者:



    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='topic_logs',
    type='topic')

    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='topic_logs',
    routing_key=routing_key,
    body=message)
    print(" [x] Sent %r:%r" % (routing_key, message))
    connection.close()
      消费者:



    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='topic_logs',
    type='topic')

    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue

    binding_keys = sys.argv[1:]
    if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

    for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
    queue=queue_name,
    routing_key=binding_key)

    print('
  • Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)

    channel.start_consuming()
    Remote procedure call (RPC)
    DSC0006.png

      server端:



    #_*_coding:utf-8_*_
    import pika
    import time
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))

    channel = connection.channel()

    channel.queue_declare(queue='rpc_queue')

    def fib(n):
    if n == 0:
    return 0
    elif n == 1:
    return 1
    else:
    return fib(n-1) + fib(n-2)

    def on_request(ch, method, props, body):
    n = int(body)
    print(" [.] fib(%s)" % n)
    response = fib(n)
    ch.basic_publish(exchange='',
    routing_key=props.reply_to,
    properties=pika.BasicProperties(correlation_id = \
    props.correlation_id),
    body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(on_request, queue='rpc_queue')

    print(" [x] Awaiting RPC requests")
    channel.start_consuming()
      客户端:



    import pika
    import uuid

    class FibonacciRpcClient(object):
    def __init__(self):
    self.connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    self.channel = self.connection.channel()
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    self.channel.basic_consume(self.on_response, no_ack=True,
    queue=self.callback_queue)
    def on_response(self, ch, method, props, body):
    if self.corr_id == props.correlation_id:
    self.response = body
    def call(self, n):
    self.response = None
    self.corr_id = str(uuid.uuid4())
    self.channel.basic_publish(exchange='',
    routing_key='rpc_queue',
    properties=pika.BasicProperties(
    reply_to = self.callback_queue,
    correlation_id = self.corr_id,
    ),
    body=str(n))
    while self.response is None:
    self.connection.process_data_events()
    return int(self.response)

    fibonacci_rpc = FibonacciRpcClient()

    print(" [x] Requesting fib(30)")
    response = fibonacci_rpc.call(30)
    print(" [.] Got %r" % response)
    Memcached & Redis使用
      http://www.cnblogs.com/wupeiqi/articles/5132791.html

  • 运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
    2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
    3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
    4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
    5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
    6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
    7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
    8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

    所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390350-1-1.html 上篇帖子: Python之路第十二天,高级(4) 下篇帖子: python之rabbitMQ篇
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    扫码加入运维网微信交流群X

    扫码加入运维网微信交流群

    扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

    扫描微信二维码查看详情

    客服E-mail:kefu@iyunv.com 客服QQ:1061981298


    QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


    提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


    本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



    合作伙伴: 青云cloud

    快速回复 返回顶部 返回列表