python之day12(线程池,redis,rabbitMQ)
参考博客:mysql
http://www.cnblogs.com/wupeiqi/articles/5699254.html
缓存
http://www.cnblogs.com/wupeiqi/articles/5132791.html
线程池
http://www.cnblogs.com/wupeiqi/articles/4839959.html
一,线程池:
上下文管理
import contextlib
@contextlib.contextmanager
def worker_state(state_list, worker_thread):
state_list.append(worker_thread)
try:
print(state_list)
yield #相当于return 返回值功能,但是不终止整个函数,只是跳出后重新执行函数
finally:
state_list.remove(worker_thread)
free_list = []
current_thread = "alex"
with worker_state(free_list, current_thread):
print(123)
yield简单用法:
def fab(max):
a,b = 0,1
while a < max:
yield a #a的值返回给i
a, b = b, a+b #然后a被重新赋值
for i in fab(15):
print(i,',',)
终止线程池操作
---利用contextlib模块以及with完成socket的自动关闭
import contextlib
import socket
@contextlib.contextmanager
def context_socket(host, port):
s = socket.socket()
s.bind( (host, port) ) #元组格式
s.listen(5)
try:
yield #返回值为None ==sock
finally:
s.close()
with context_socket("127.0.0.1", 8888) as sock:
print(sock)
二 redis 发布订阅
连接池
import redis
pool = redis.ConnectionPool(host='192.168.61.131', port=6379) #连接服务器端
r = redis.Redis(connection_pool=pool) #使用线程池
r.set('foo', 'bar')
print(r.get('foo'))
自定列表操作
事务:
原子性操作
**发布订阅
1 import redis
2
3 class RedisHelper:
4 def __init__(self):
5 self.__conn = redis.Redis(host="192.168.61.131") #redis 服务器
6
7 def public(self, msg, chan):
8 self.__conn.publish(chan, msg) #发布者 发布频道和信息
9
10 def subscribe(self, chan):
11 pub = self.__conn.pubsub() #订阅者接收
12 pub.subscribe(chan) #接收的频道
13 pub.parse_response() #接收消息
14 return pub
15
16
17 #等待发布者以及订阅者调用的模块
demo
1 import demo
2
3 obj = demo.RedisHelper()
4 obj.public('alex db', 'fm111.7')
发布者
1 import demo
2
3 obj = demo.RedisHelper()
4 data = obj.subscribe('fm111.7')
5 print(data.parse_response())
6 #
订阅者 先执行订阅者完成订阅,然后发布者发布消息,订阅者会接收到发布者发布的消息。
三 rabbitMQ
1,基础
安装方法(参考):http://yidao620c.iteye.com/blog/1947335
基于Queue实现生产者消费者模型:
import queue
import threading
import time
q = queue.Queue(20)
def productor(arg):
'''
生产者
:param arg:
:return:
'''
while True:
q.put(str(arg) + "-包子")
time.sleep(1)
def consumer(arg):
while True:
print(arg , q.get())
time.sleep(1)
for i in range(3):
t = threading.Thread(target = productor, args = (i, ))
t.start()
for j in range(20):
t = threading.Thread(target = consumer, args = (j, ))
t.start()
对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
1 import pika
2
3 #消费者
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(
6 host='192.168.61.131'))
7 channel = connection.channel()
8
9 channel.queue_declare(queue='hello34')
10 def callback(ch, method, properties, body):
11 print(" Received %r" % body)
12
13 channel.basic_consume(callback,
14 queue='hello34',
15 no_ack=True)
16
17 print('
[*] Waiting for messages. To exit press CTRL+C')
18 channel.start_consuming()
消费者
1 import pika
2
3 #生产者
4
5 connection= pika.BlockingConnection(pika.ConnectionParameters(
6 host='192.168.61.131'
7 ))
8
9 channel = connection.channel()
10
11 channel.queue_declare(queue='hello34')
12
13 channel.basic_publish(exchange='',
14 routing_key='hello34',
15 body='hello world')
16 print(" Sent 'Hello World!'")
17 connection.close()
生产者 1、acknowledgment 消息不丢失
no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
1 import pika
2
3 connection = pika.BlockingConnection(pika.ConnectionParameters(
4 host='192.168.61.131'))
5 channel = connection.channel()
6
7 channel.queue_declare(queue='hello')
8
9 def callback(ch, method, properties, body):
10 print(" Received %r" % body)
11 import time
12 time.sleep(10)
13 print('ok')
14 ch.basic_ack(delivery_tag = method.delivery_tag) #回复队列确认任务已完成
15
16 channel.basic_consume(callback,
17 queue='hello',
18 no_ack=False)
19
20 print('
[*] Waiting for messages. To exit press CTRL+C')
21 channel.start_consuming()
消费者 2、durable 消息不丢失(持久化)
channel.queue_declare(queue='hello1', durable=True) 在消费者和生产者都需要设置
通过delivery_mode = 2, 实现消息持久化
1 import pika
2
3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.61.131'))
4 channel = connection.channel()
5
6 # make message persistent
7 channel.queue_declare(queue='hello1', durable=True)
8
9
10 def callback(ch, method, properties, body):
11 print(" Received %r" % body)
12 import time
13 time.sleep(10)
14 print('ok')
15 ch.basic_ack(delivery_tag = method.delivery_tag)
16
17 channel.basic_consume(callback,
18 queue='hello1',
19 no_ack=False)
20
21 print('
[*] Waiting for messages. To exit press CTRL+C')
22 channel.start_consuming()
23
24 #消费者
消费者
1 import pika
2
3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.61.131'))
4 channel = connection.channel()
5
6 # make message persistent
7 channel.queue_declare(queue='hello1', durable=True)
8
9 channel.basic_publish(exchange='',
10 routing_key='hello1',
11 body='Hello World!',
12 properties=pika.BasicProperties(
13 delivery_mode=2, # make message persistent
14 ))
15 print(" Sent 'Hello World!'")
16 connection.close()
17
18 #生产者
生产者 3、消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列
1 import pika
2
3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.61.131'))
4 channel = connection.channel()
5
6 # make message persistent
7 channel.queue_declare(queue='hello1', durable=True)
8
9
10 def callback(ch, method, properties, body):
11 print(" Received %r" % body)
12 import time
13 time.sleep(10)
14 print('ok')
15 ch.basic_ack(delivery_tag = method.delivery_tag)
16
17 channel.basic_qos(prefetch_count=1) #实现随来随取
18
19 channel.basic_consume(callback,
20 queue='hello1',
21 no_ack=False)
22
23 print('
[*] Waiting for messages. To exit press CTRL+C')
24 channel.start_consuming()
25
26 #消费者
消费者 重点说明
a、使用工作队列的一个好处就是能够并行的处理队列。如果任务堆积,只需要添加更多的工作者work即可
b、对于多个work,RabbitMQ会按照顺序把消息发送给每个消费者,这种方式为轮询(round-robin)
c、消息响应:如果一个work挂掉,上面代码实现将这个消息发送给其他work,而不是丢弃。
因此需要消息响应机制,每个work处理完成任务的时候,会发送一个ack,告诉RabbitMQ-server已经收到并处理某条消息,然后RabbitMQ-server释放并删除这条消息。
d、消息ack没有超时的概念,这样在处理一个非常耗时的消息任务时候就不会出现问题
e、消息ack默认是开启的,通过no_ack=True标识关闭,在回调函数中basic_ack中
f、如果忘记调用basic_ack的话,这样消息在程序退出后重新发送,会导致RabbitMQ-server中消息堆积,占用越来越多的内存。通过如下命令进行确认:
# ./rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 0 0
hello13 0
hello34 0 0
...done.
存在三个堆积的任务
g、关于队列大小:如果所有的工作者都在处理任务,队列就会被填满。需要留意这个问题,要么添加更多的工作者,要么使用其他策略,例如设置队列大小等。
4、发布订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
exchange type = fanout
1 #!/usr/bin/envpython
2 # -*- coding: UTF-8 -*-
3 # Author: Aaron Shen
4
5 import pika
6 import sys
7
8 connection = pika.BlockingConnection(pika.ConnectionParameters(
9 host='192.168.61.131'))
10 channel = connection.channel()
11
12 channel.exchange_declare(exchange='logs',
13 type='fanout')
14
15 message = "info: Hello World!"
16 channel.basic_publish(exchange='logs',
17 routing_key='',
18 body=message)
19 print(" Sent %r" % message)
20 connection.close()
21
22 # 发布者
发布者
1 import pika
2
3 connection = pika.BlockingConnection(pika.ConnectionParameters(
4 host='192.168.61.131'))
5 channel = connection.channel()
6
7 channel.exchange_declare(exchange='logs',
8 type='fanout')
9
10 result = channel.queue_declare(exclusive=True) #队列断开后自动删除临时队列
11 queue_name = result.method.queue # 队列名采用服务端分配的临时队列
12
13 channel.queue_bind(exchange='logs',
14 queue=queue_name)
15
16 print('
[*] Waiting for logs. To exit press CTRL+C')
17
18 def callback(ch, method, properties, body):
19 print(" %r" % body)
20
21 channel.basic_consume(callback,
22 queue=queue_name,
23 no_ack=True)
24
25 channel.start_consuming()
订阅者 5、关键字发送
exchange type = direct
之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
1 #!/usr/bin/envpython
2 # -*- coding: UTF-8 -*-
3 # Author: Aaron Shen
4
5 #!/usr/bin/env python
6 import pika
7 import sys
8
9 connection = pika.BlockingConnection(pika.ConnectionParameters(
10 host='192.168.61.131'))
11 channel = connection.channel()
12
13 channel.exchange_declare(exchange='direct_logs_test',
14 type='direct')
15
16 severity = 'error'
17 message = 'qwe'
18 channel.basic_publish(exchange='direct_logs_test',
19 routing_key=severity,
20 body=message)
21 print(" Sent %r:%r" % (severity, message))
22 connection.close()
23
24 #生产者
生产者
1 #!/usr/bin/envpython
2 # -*- coding: UTF-8 -*-
3 # Author: Aaron Shen
4
5 #!/usr/bin/env python
6 import pika
7 import sys
8
9 connection = pika.BlockingConnection(pika.ConnectionParameters(
10 host='192.168.61.131'))
11 channel = connection.channel()
12
13 channel.exchange_declare(exchange='direct_logs_test',
14 type='direct')
15
16 result = channel.queue_declare(exclusive=True)
17 queue_name = result.method.queue
18
19 severities = ['info'] #只消费info
20 if not severities:
21 sys.stderr.write("Usage: %s \n" % severities)
22 sys.exit(1)
23
24 for severity in severities:
25 channel.queue_bind(exchange='direct_logs_test',
26 queue=queue_name,
27 routing_key=severity)
28
29 print('
[*] Waiting for logs. To exit press CTRL+C')
30
31 def callback(ch, method, properties, body):
32 print(" %r:%r" % (method.routing_key, body))
33
34 channel.basic_consume(callback,
35 queue=queue_name,
36 no_ack=True)
37
38 channel.start_consuming()
39
40 #消费者
消费者1
1 #!/usr/bin/envpython
2 # -*- coding: UTF-8 -*-
3 # Author: Aaron Shen
4
5 #!/usr/bin/env python
6 import pika
7 import sys
8
9 connection = pika.BlockingConnection(pika.ConnectionParameters(
10 host='192.168.61.131'))
11 channel = connection.channel()
12
13 channel.exchange_declare(exchange='direct_logs_test',
14 type='direct')
15
16 result = channel.queue_declare(exclusive=True)
17 queue_name = result.method.queue
18
19 severities = ['error', 'info'] #可以消费error 和info的
20 if not severities:
21 sys.stderr.write("Usage: %s \n" % severities)
22 sys.exit(1)
23
24 for severity in severities:
25 channel.queue_bind(exchange='direct_logs_test',
26 queue=queue_name,
27 routing_key=severity)
28
29 print('
[*] Waiting for logs. To exit press CTRL+C')
30
31 def callback(ch, method, properties, body):
32 print(" %r:%r" % (method.routing_key, body))
33
34 channel.basic_consume(callback,
35 queue=queue_name,
36 no_ack=True)
37
38 channel.start_consuming()
39
40 #消费者
消费者2 6、模糊匹配
exchange type = topic
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
[*]# 表示可以匹配 0 个 或 多个 单词
[*]*表示只能匹配 一个 单词
发送者路由值 队列中
old.boy.python old.*-- 不匹配
old.boy.python old.#-- 匹配
1 #!/usr/bin/envpython
2 # -*- coding: UTF-8 -*-
3 # Author: Aaron Shen
4
5 import pika
6 import sys
7
8 connection = pika.BlockingConnection(pika.ConnectionParameters(
9 host='192.168.61.131'))
10 channel = connection.channel()
11
12 channel.exchange_declare(exchange='topic_logs',
13 type='topic')
14
15 routing_key = 'anonymous.info'
16 message = ' '.join(sys.argv) or 'Hello World!'
17 channel.basic_publish(exchange='topic_logs',
18 routing_key=routing_key,
19 body=message)
20 print(" Sent %r:%r" % (routing_key, message))
21 connection.close()
22
23 # 生产者
生产者
1 #!/usr/bin/envpython
2 # -*- coding: UTF-8 -*-
3 # Author: Aaron Shen
4
5 import pika
6 import sys
7
8 connection = pika.BlockingConnection(pika.ConnectionParameters(
9 host='192.168.61.131'))
10 channel = connection.channel()
11
12 channel.exchange_declare(exchange='topic_logs',
13 type='topic')
14
15 result = channel.queue_declare(exclusive=True)
16 queue_name = result.method.queue
17
18 binding_keys = ['*.info', ] #模糊查询
19 if not binding_keys:
20 sys.stderr.write("Usage: %s ...\n" % sys.argv)
21 sys.exit(1)
22
23 for binding_key in binding_keys:
24 channel.queue_bind(exchange='topic_logs',
25 queue=queue_name,
26 routing_key=binding_key)
27
28 print('
[*] Waiting for logs. To exit press CTRL+C')
29
30 def callback(ch, method, properties, body):
31 print(" %r:%r" % (method.routing_key, body))
32
33 channel.basic_consume(callback,
34 queue=queue_name,
35 no_ack=True)
36
37 channel.start_consuming()
38
39 # 消费者
消费者
页:
[1]