设为首页 收藏本站
查看: 963|回复: 0

[经验分享] python之day12(线程池,redis,rabbitMQ)

[复制链接]

尚未签到

发表于 2017-7-2 17:55:04 | 显示全部楼层 |阅读模式
  参考博客:
  mysql
http://www.cnblogs.com/wupeiqi/articles/5699254.html   
缓存
http://www.cnblogs.com/wupeiqi/articles/5132791.html
线程池
http://www.cnblogs.com/wupeiqi/articles/4839959.html
  一,线程池:
  上下文管理



import contextlib
@contextlib.contextmanager
def worker_state(state_list, worker_thread):
state_list.append(worker_thread)
try:
print(state_list)
yield                       #相当于return 返回值功能,但是不终止整个函数,只是跳出后重新执行函数
finally:
state_list.remove(worker_thread)
free_list = []
current_thread = "alex"
with worker_state(free_list, current_thread):
print(123)        

  yield简单用法:



def fab(max):
a,b = 0,1
while a < max:
yield a                    #a的值返回给i
a, b = b, a+b           #然后a被重新赋值
for i in fab(15):
print(i,',',)   

  终止线程池操作
  ---利用contextlib模块以及with完成socket的自动关闭



import contextlib
import socket
@contextlib.contextmanager
def context_socket(host, port):
s = socket.socket()
s.bind( (host, port) )   #元组格式
s.listen(5)
try:
yield                   #返回值为None ==sock
finally:
s.close()
with context_socket("127.0.0.1", 8888) as sock:
print(sock)

  二 redis 发布订阅
    连接池



import redis
pool = redis.ConnectionPool(host='192.168.61.131', port=6379)       #连接服务器端
r = redis.Redis(connection_pool=pool)       #使用线程池
r.set('foo', 'bar')
print(r.get('foo'))

  
    自定列表操作
    事务:
        原子性操作
    **发布订阅


DSC0000.gif DSC0001.gif


1 import redis
2
3 class RedisHelper:
4     def __init__(self):
5         self.__conn = redis.Redis(host="192.168.61.131")     #redis 服务器
6
7     def public(self, msg, chan):
8         self.__conn.publish(chan, msg)          #发布者 发布频道和信息
9
10     def subscribe(self, chan):
11         pub = self.__conn.pubsub()               #订阅者接收
12         pub.subscribe(chan)                         #接收的频道
13         pub.parse_response()                       #接收消息
14         return pub        
15
16
17 #等待发布者以及订阅者调用的模块
demo




1 import demo
2
3 obj = demo.RedisHelper()
4 obj.public('alex db', 'fm111.7')
发布者




1 import demo
2
3 obj = demo.RedisHelper()
4 data = obj.subscribe('fm111.7')
5 print(data.parse_response())      
6 #[b'message', b'fm111.7', b'alex db']
订阅者  先执行订阅者完成订阅,然后发布者发布消息,订阅者会接收到发布者发布的消息。
  三 rabbitMQ
    1,基础
  安装方法(参考):http://yidao620c.iteye.com/blog/1947335
  基于Queue实现生产者消费者模型:



import queue
import threading
import time
q = queue.Queue(20)
def productor(arg):
'''
生产者
:param arg:
:return:
'''
while True:
q.put(str(arg) + "-包子")
time.sleep(1)
def consumer(arg):
while True:
print(arg , q.get())
time.sleep(1)
for i in range(3):
t = threading.Thread(target = productor, args = (i, ))
t.start()
for j in range(20):
t = threading.Thread(target = consumer, args = (j, ))
t.start()

  对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。





1 import pika
2
3 #消费者
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(
6         host='192.168.61.131'))
7 channel = connection.channel()
8
9 channel.queue_declare(queue='hello34')
10 def callback(ch, method, properties, body):
11     print(" [x] Received %r" % body)
12
13 channel.basic_consume(callback,
14                       queue='hello34',
15                       no_ack=True)
16
17 print('
  • Waiting for messages. To exit press CTRL+C')
    18 channel.start_consuming()
    消费者




    1 import pika
    2
    3 #生产者
    4
    5 connection= pika.BlockingConnection(pika.ConnectionParameters(
    6     host='192.168.61.131'
    7 ))
    8
    9 channel = connection.channel()
    10
    11 channel.queue_declare(queue='hello34')
    12
    13 channel.basic_publish(exchange='',
    14                       routing_key='hello34',
    15                       body='hello world')
    16 print(" [x] Sent 'Hello World!'")
    17 connection.close()
    生产者  1、acknowledgment 消息不丢失
      no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。





    1 import pika
    2
    3 connection = pika.BlockingConnection(pika.ConnectionParameters(
    4         host='192.168.61.131'))
    5 channel = connection.channel()
    6
    7 channel.queue_declare(queue='hello')
    8
    9 def callback(ch, method, properties, body):
    10     print(" [x] Received %r" % body)
    11     import time
    12     time.sleep(10)
    13     print('ok')
    14     ch.basic_ack(delivery_tag = method.delivery_tag)    #回复队列确认任务已完成
    15
    16 channel.basic_consume(callback,
    17                       queue='hello',
    18                       no_ack=False)
    19
    20 print('
  • Waiting for messages. To exit press CTRL+C')
    21 channel.start_consuming()
    消费者  2、durable   消息不丢失(持久化)

    channel.queue_declare(queue='hello1', durable=True) 在消费者和生产者都需要设置
    通过delivery_mode = 2, 实现消息持久化




    1 import pika
    2
    3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.61.131'))
    4 channel = connection.channel()
    5
    6 # make message persistent
    7 channel.queue_declare(queue='hello1', durable=True)
    8
    9
    10 def callback(ch, method, properties, body):
    11     print(" [x] Received %r" % body)
    12     import time
    13     time.sleep(10)
    14     print('ok')
    15     ch.basic_ack(delivery_tag = method.delivery_tag)
    16
    17 channel.basic_consume(callback,
    18                       queue='hello1',
    19                       no_ack=False)
    20
    21 print('
  • Waiting for messages. To exit press CTRL+C')
    22 channel.start_consuming()
    23
    24 #消费者
    消费者




    1 import pika
    2
    3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.61.131'))
    4 channel = connection.channel()
    5
    6 # make message persistent
    7 channel.queue_declare(queue='hello1', durable=True)
    8
    9 channel.basic_publish(exchange='',
    10                       routing_key='hello1',
    11                       body='Hello World!',
    12                       properties=pika.BasicProperties(
    13                           delivery_mode=2, # make message persistent
    14                       ))
    15 print(" [x] Sent 'Hello World!'")
    16 connection.close()
    17
    18 #生产者
    生产者  3、消息获取顺序
      默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
      channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列





    1 import pika
    2
    3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.61.131'))
    4 channel = connection.channel()
    5
    6 # make message persistent
    7 channel.queue_declare(queue='hello1', durable=True)
    8
    9
    10 def callback(ch, method, properties, body):
    11     print(" [x] Received %r" % body)
    12     import time
    13     time.sleep(10)
    14     print('ok')
    15     ch.basic_ack(delivery_tag = method.delivery_tag)
    16
    17 channel.basic_qos(prefetch_count=1)    #实现随来随取
    18
    19 channel.basic_consume(callback,
    20                       queue='hello1',
    21                       no_ack=False)
    22
    23 print('
  • Waiting for messages. To exit press CTRL+C')
    24 channel.start_consuming()
    25
    26 #消费者
    消费者  重点说明
      a、使用工作队列的一个好处就是能够并行的处理队列。如果任务堆积,只需要添加更多的工作者work即可
      b、对于多个work,RabbitMQ会按照顺序把消息发送给每个消费者,这种方式为轮询(round-robin)
      c、消息响应:如果一个work挂掉,上面代码实现将这个消息发送给其他work,而不是丢弃。
      因此需要消息响应机制,每个work处理完成任务的时候,会发送一个ack,告诉RabbitMQ-server已经收到并处理某条消息,然后RabbitMQ-server释放并删除这条消息。
      d、消息ack没有超时的概念,这样在处理一个非常耗时的消息任务时候就不会出现问题
      e、消息ack默认是开启的,通过no_ack=True标识关闭,在回调函数中basic_ack中
      f、如果忘记调用basic_ack的话,这样消息在程序退出后重新发送,会导致RabbitMQ-server中消息堆积,占用越来越多的内存。通过如下命令进行确认:



    [iyunv@localhost sbin]# ./rabbitmqctl list_queues name messages_ready messages_unacknowledged
    Listing queues ...
    hello   0       0
    hello1  3       0
    hello34 0       0
    ...done.

      存在三个堆积的任务
      g、关于队列大小:如果所有的工作者都在处理任务,队列就会被填满。需要留意这个问题,要么添加更多的工作者,要么使用其他策略,例如设置队列大小等。
      4、发布订阅   
      发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
      exchange type = fanout





    1 #!/usr/bin/env  python
    2 # -*- coding: UTF-8 -*-
    3 # Author: Aaron Shen
    4
    5 import pika
    6 import sys
    7
    8 connection = pika.BlockingConnection(pika.ConnectionParameters(
    9         host='192.168.61.131'))
    10 channel = connection.channel()
    11
    12 channel.exchange_declare(exchange='logs',
    13                          type='fanout')
    14
    15 message = "info: Hello World!"
    16 channel.basic_publish(exchange='logs',
    17                       routing_key='',
    18                       body=message)
    19 print(" [x] Sent %r" % message)
    20 connection.close()
    21
    22 # 发布者
    发布者




    1 import pika
    2
    3 connection = pika.BlockingConnection(pika.ConnectionParameters(
    4         host='192.168.61.131'))
    5 channel = connection.channel()
    6
    7 channel.exchange_declare(exchange='logs',
    8                          type='fanout')
    9
    10 result = channel.queue_declare(exclusive=True) #队列断开后自动删除临时队列
    11 queue_name = result.method.queue            # 队列名采用服务端分配的临时队列
    12
    13 channel.queue_bind(exchange='logs',
    14                    queue=queue_name)
    15
    16 print('
  • Waiting for logs. To exit press CTRL+C')
    17
    18 def callback(ch, method, properties, body):
    19     print(" [x] %r" % body)
    20
    21 channel.basic_consume(callback,
    22                       queue=queue_name,
    23                       no_ack=True)
    24
    25 channel.start_consuming()
    订阅者  5、关键字发送
      exchange type = direct
      之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。





    1 #!/usr/bin/env  python
    2 # -*- coding: UTF-8 -*-
    3 # Author: Aaron Shen
    4
    5 #!/usr/bin/env python
    6 import pika
    7 import sys
    8
    9 connection = pika.BlockingConnection(pika.ConnectionParameters(
    10         host='192.168.61.131'))
    11 channel = connection.channel()
    12
    13 channel.exchange_declare(exchange='direct_logs_test',
    14                          type='direct')
    15
    16 severity = 'error'
    17 message = 'qwe'
    18 channel.basic_publish(exchange='direct_logs_test',
    19                       routing_key=severity,
    20                       body=message)
    21 print(" [x] Sent %r:%r" % (severity, message))
    22 connection.close()
    23
    24 #生产者
    生产者




    1 #!/usr/bin/env  python
    2 # -*- coding: UTF-8 -*-
    3 # Author: Aaron Shen
    4
    5 #!/usr/bin/env python
    6 import pika
    7 import sys
    8
    9 connection = pika.BlockingConnection(pika.ConnectionParameters(
    10         host='192.168.61.131'))
    11 channel = connection.channel()
    12
    13 channel.exchange_declare(exchange='direct_logs_test',
    14                          type='direct')
    15
    16 result = channel.queue_declare(exclusive=True)
    17 queue_name = result.method.queue
    18
    19 severities = ['info']      #只消费info
    20 if not severities:
    21     sys.stderr.write("Usage: %s [info] [error]\n" % severities)
    22     sys.exit(1)
    23
    24 for severity in severities:
    25     channel.queue_bind(exchange='direct_logs_test',
    26                        queue=queue_name,
    27                        routing_key=severity)
    28
    29 print('
  • Waiting for logs. To exit press CTRL+C')
    30
    31 def callback(ch, method, properties, body):
    32     print(" [x] %r:%r" % (method.routing_key, body))
    33
    34 channel.basic_consume(callback,
    35                       queue=queue_name,
    36                       no_ack=True)
    37
    38 channel.start_consuming()
    39
    40 #消费者
    消费者1




    1 #!/usr/bin/env  python
    2 # -*- coding: UTF-8 -*-
    3 # Author: Aaron Shen
    4
    5 #!/usr/bin/env python
    6 import pika
    7 import sys
    8
    9 connection = pika.BlockingConnection(pika.ConnectionParameters(
    10         host='192.168.61.131'))
    11 channel = connection.channel()
    12
    13 channel.exchange_declare(exchange='direct_logs_test',
    14                          type='direct')
    15
    16 result = channel.queue_declare(exclusive=True)
    17 queue_name = result.method.queue
    18
    19 severities = ['error', 'info']                  #可以消费error 和info的
    20 if not severities:
    21     sys.stderr.write("Usage: %s [error]\n" % severities)
    22     sys.exit(1)
    23
    24 for severity in severities:
    25     channel.queue_bind(exchange='direct_logs_test',
    26                        queue=queue_name,
    27                        routing_key=severity)
    28
    29 print('
  • Waiting for logs. To exit press CTRL+C')
    30
    31 def callback(ch, method, properties, body):
    32     print(" [x] %r:%r" % (method.routing_key, body))
    33
    34 channel.basic_consume(callback,
    35                       queue=queue_name,
    36                       no_ack=True)
    37
    38 channel.start_consuming()
    39
    40 #消费者
    消费者2  6、模糊匹配
      exchange type = topic
      在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。


    • # 表示可以匹配 0 个 或 多个 单词
    • *  表示只能匹配 一个 单词



    发送者路由值              队列中
    old.boy.python          old.*  -- 不匹配
    old.boy.python          old.#  -- 匹配





    1 #!/usr/bin/env  python
    2 # -*- coding: UTF-8 -*-
    3 # Author: Aaron Shen
    4
    5 import pika
    6 import sys
    7
    8 connection = pika.BlockingConnection(pika.ConnectionParameters(
    9         host='192.168.61.131'))
    10 channel = connection.channel()
    11
    12 channel.exchange_declare(exchange='topic_logs',
    13                          type='topic')
    14
    15 routing_key = 'anonymous.info'
    16 message = ' '.join(sys.argv[2:]) or 'Hello World!'
    17 channel.basic_publish(exchange='topic_logs',
    18                       routing_key=routing_key,
    19                       body=message)
    20 print(" [x] Sent %r:%r" % (routing_key, message))
    21 connection.close()
    22
    23 # 生产者
    生产者




    1 #!/usr/bin/env  python
    2 # -*- coding: UTF-8 -*-
    3 # Author: Aaron Shen
    4
    5 import pika
    6 import sys
    7
    8 connection = pika.BlockingConnection(pika.ConnectionParameters(
    9         host='192.168.61.131'))
    10 channel = connection.channel()
    11
    12 channel.exchange_declare(exchange='topic_logs',
    13                          type='topic')
    14
    15 result = channel.queue_declare(exclusive=True)
    16 queue_name = result.method.queue
    17
    18 binding_keys = ['*.info', ]                  #模糊查询
    19 if not binding_keys:
    20     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    21     sys.exit(1)
    22
    23 for binding_key in binding_keys:
    24     channel.queue_bind(exchange='topic_logs',
    25                        queue=queue_name,
    26                        routing_key=binding_key)
    27
    28 print('
  • Waiting for logs. To exit press CTRL+C')
    29
    30 def callback(ch, method, properties, body):
    31     print(" [x] %r:%r" % (method.routing_key, body))
    32
    33 channel.basic_consume(callback,
    34                       queue=queue_name,
    35                       no_ack=True)
    36
    37 channel.start_consuming()
    38
    39 # 消费者
    消费者

  • 运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
    2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
    3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
    4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
    5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
    6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
    7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
    8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

    所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390525-1-1.html 上篇帖子: 嵌入式jetty的HTTP实现 下篇帖子: Python之操作RabbitMQ
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    扫码加入运维网微信交流群X

    扫码加入运维网微信交流群

    扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

    扫描微信二维码查看详情

    客服E-mail:kefu@iyunv.com 客服QQ:1061981298


    QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


    提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


    本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



    合作伙伴: 青云cloud

    快速回复 返回顶部 返回列表