Python的平凡之路(11)
一、 rabbitmq1 进程Queue:父进程与子进程进行交互,或者同属于同一父进程下多个子进程进行交互
2 队列通信:
send1.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import pika
#创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
#创建隧道
channel = connection.channel()
#声明queue
channel.queue_declare(queue='duilie')
#Send RabbitMQ a message can never be sent directly to the queue,it always needs to go throngh an exchange
channel.basic_publish(exchange='',
routing_key='duilie',
body='Hello,World!')
print(" Sent 'Hello World!'")
connection.close()
receive1.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import pika
#创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#创建隧道
channel = connection.channel()
#声明queue
channel.queue_declare(queue='duilie')
def callback(ch,method,properties,body):
print(" Received %r" % body)
channel.basic_consume(callback,
queue='duilie',
no_ack=True)
print(' Waiting for messages. To exit press CRTL+C')
channel.start_consuming()
3 work queues:在这种模式下,RabbitMQ会默认吧P发送的消息依次分发给各个消费者(C),和负载均衡差不多,其中如果一个消费者断掉了,消息不会丢失,会依次还传递给剩下的消费者
send2.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import pika
import time
#创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#创建隧道
channel = connection.channel()
#声明queue
channel.queue_declare(queue='task_queue')
#n RabbitMQ a message can never be sent directly to the queue,it always
#needs to go through an exchange.
for i in range(100):
channel.basic_publish(exchange='',
routing_key = 'task_queue',
body = 'Hello world! %s' %i,
properties = pika.BasicProperties(
delivery_mode=2,# make message persistent,把消息持久化
))
time.sleep(0.8)
print(" Sent 'Hello World! The %s")
connection.close()
receive2.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import pika
import time
#创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#创建隧道
channel = connection.channel()
#声明queue
channel.queue_declare(queue=‘task_queue')
def callback(ch,method,properties,body):
print(" Received %r" % body)
time.sleep(body.count(b'.'))
print(" Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback,
queue='task_queue',
)
print(' Waiting for messages. To exit press CRTL+C')
channel.start_consuming()
4 消息持久化:队列持久化和消息持久化是两回事
send4.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import pika
import time
#创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#创建隧道
channel = connection.channel()
#声明queue
channel.queue_declare(queue='task_queue_other',durable=True) #加上durable=True,队列持久化
#n RabbitMQ a message can never be sent directly to the queue,it always
#needs to go through an exchange.
for i in range(100):
channel.basic_publish(exchange='',
routing_key = 'task_queue_other',
body = 'Hello world! %s' %i,
properties = pika.BasicProperties(
delivery_mode=2,# make message persistent,把消息持久化
))
time.sleep(0.8)
print(" Sent 'Hello World! The %s")
connection.close()
receive4.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import pika
import time
#创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#创建隧道
channel = connection.channel()
#声明queue
channel.queue_declare(queue='task_queue_other',durable=True)#队列持久化
def callback(ch,method,properties,body):
print(" Received %r" % body)
time.sleep(body.count(b'.'))
print(" Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback,
queue='task_queue_other',
)
print(' Waiting for messages. To exit press CRTL+C')
channel.start_consuming()
5 消息公平分发:如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
send5.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import pika
import time
#创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#创建隧道
channel = connection.channel()
#声明queue
channel.queue_declare(queue='task_queue_other',durable=True) #加上durable=True,队列持久化
#n RabbitMQ a message can never be sent directly to the queue,it always
#needs to go through an exchange.
for i in range(100):
channel.basic_publish(exchange='',
routing_key = 'task_queue_other',
body = 'Hello world! %s' %i,
properties = pika.BasicProperties(
delivery_mode=2,# make message persistent,把消息持久化
))
time.sleep(0.8)
print(" Sent 'Hello World! The %s")
connection.close()
receive5.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import pika
import time
#创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#创建隧道
channel = connection.channel()
#声明queue
channel.queue_declare(queue='task_queue_other',durable=True)#队列持久化
def callback(ch,method,properties,body):
print(" Received %r" % body)
time.sleep(body.count(b'.'))
print(" Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)#平均分发
channel.basic_consume(callback,
queue='task_queue_other',
)
print(' Waiting for messages. To exit press CRTL+C')
channel.start_consuming()
6 消息发布订阅:之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,说白了就是广播,然后消费者都收听。
An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
表达式符号说明:#代表一个或多个字符,*代表任何字符
例:#.a会匹配a.a,aa.a,aaa.a等
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout
headers: 通过headers 来决定把消息发给哪些queue
fanout-publisher.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import pika
import sys
import time
#创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
#创建隧道
channel = connection.channel()
#声明exchange名称和类型
channel.exchange_declare(exchange='logs',
type='fanout') #fanout类型-所有bind到此exchange的queue都可以接收消息
#message = ' '.join(sys.argv) or "info: Hello World!"
message = "info: Hello World!"
for i in range(20):
channel.basic_publish(exchange='logs’,#声明exchange名称
routing_key='', #需要置空
body='message %s' %i) #通过一个循环查看效果
time.sleep(1)
print(" Sent %r" % message)
connection.close()
fanout-subscriber.py
!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import pika
#创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
#创建隧道
channel = connection.channel()
#声明exchange和类型
channel.exchange_declare(exchange='logs',
type='fanout')
result = channel.queue_declare(exclusive=True)# 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue
print(queue_name)
channel.queue_bind(exchange='logs',#fanout类型-所有bind到此exchange的queue都可以接收消息
queue=queue_name)
print('
[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
7 有选择的接收消息:RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
direct-publisher.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import pika
import sys
#创建连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
#创建隧道
channel = connection.channel()
#声明exchange和类型
channel.exchange_declare(exchange='direct_logs',type='direct')
severity = sys.argv if len(sys.argv) > 1 else 'info'
message = ''.join(sys.argv) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body = message)
print(" Sent %r:%r" % (severity,message))
connection.close()
direct-subscriber.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv #仅仅保留所有参数,去掉文件名
if not severities:
sys.stderr.write("Usage: %s \n" % sys.argv)
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity) #routing_key就是队列绑定的一些关键字
print('
[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
8 RPC:To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named call which sends an RPC request and blocks until the answer is received:
rpc-client.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response,no_ack=True,
queue=self.callback_queue)
def on_response(self,ch,method,props,body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self,n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id= self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" Requesting fib(30)")
response = fibonacci_rpc.call(30)
print("[.] Got %r" % response)
rpc-server.py
#!/usr/bin/env python
#Author is wspikh
# -*- coding: encoding -*-
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host = 'localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch,method,props,body):
n = int(body)
print("[.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id= \
props.correlation_id),
body = str(response))
ch.basic_ack(delivery_tag= method.delivery_tag)
channel.basic_qos(prefetch_count=1) #配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
channel.basic_consume(on_request, queue='rpc_queue')
print(" Awaiting RPC requests")
channel.start_consuming()
二、 redis
1、操作模式
redis-py提供两个类Redis和StrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py。
1
2
3
4
5
6
7
8
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import redis
r = redis.Redis(host='10.211.55.4', port=6379)
r.set('foo', 'Bar')
print r.get('foo')
2、连接池
redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。
1
2
3
4
5
6
7
8
9
10
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import redis
pool = redis.ConnectionPool(host='10.211.55.4', port=6379)
r = redis.Redis(connection_pool=pool)
r.set('foo', 'Bar')
print r.get('foo')
3、常用命令:
(一)、key pattern 查询相应的key
(1)redis允许模糊查询key 有3个通配符*、?、[]
(2)randomkey:返回随机key
(3)type key:返回key存储的类型
(4)exists key:判断某个key是否存在
(5)del key:删除key
(6)rename key newkey:改名
(7)renamenx key newkey:如果newkey不存在则修改成功
(8)move key 1:将key移动到1数据库
(9)ttl key:查询key的生命周期(秒)
(10)expire key 整数值:设置key的生命周期以秒为单位
(11)pexpire key 整数值:设置key的生命周期以毫秒为单位
(12)pttl key:查询key 的生命周期(毫秒)
(13)perisist key:把指定key设置为永久有效
(二)、字符串类型的操作
(1)set key value
如果ex和px同时写,则以后面的有效期为准
nx:如果key不存在则建立
xx:如果key存在则修改其值
(2)get key:取值
(3)mset key1 value1 key2 value2 一次设置多个值
(4)mget key1 key2 :一次获取多个值
(5)setrange key offset value:把字符串的offset偏移字节改成value
如果偏移量 > 字符串长度,该字符自动补0x00
(6)append key value :把value追加到key 的原值上
(7)getrange key start stop:获取字符串中范围的值
对于字符串的下标,左数从0开始,右数从-1开始
注意:当start>length,则返回空字符串
当stop>=length,则截取至字符串尾
如果start所处位置在stop右边,则返回空字符串
(8)getset key nrevalue:获取并返回旧值,在设置新值
(9)incr key:自增,返回新值,如果incr一个不是int的value则返回错误,incr一个不存在的key,则设置key为1
(10)incrby key 2:跳2自增
(11)incrbyfloat by 0.7: 自增浮点数
(12)setbit key offset value:设置offset对应二进制上的值,返回该位上的旧值
注意:如果offset过大,则会在中间填充0
offset最大到多少
2^32-1,即可推出最大的字符串为512M
(13)bitop operation destkey key1 对key1 key2做opecation并将结果保存在destkey上 opecation可以是AND OR NOT XOR
(14)strlen key:取指定key的value值的长度
(15)setex key time value:设置key对应的值value,并设置有效期为time秒
(三)、链表操作
Redis的list类型其实就是一个每个子元素都是string类型的双向链表,链表的最大长度是2^32。list既可以用做栈,也可以用做队列。
list的pop操作还有阻塞版本,主要是为了避免轮询
(1)lpush key value:把值插入到链表头部
(2)rpush key value:把值插入到链表尾部
(3)lpop key :返回并删除链表头部元素
(4)rpop key: 返回并删除链表尾部元素
(5)lrange key start stop:返回链表中中的元素
(6)lrem key count value:从链表中删除value值,删除count的绝对值个value后结束 count > 0 从表头删除 count < 0 从表尾删除 count=0 全部删除
(7)ltrim key start stop:剪切key对应的链接,切一段并把改制重新赋给key
(8)lindex key index:返回index索引上的值
(9)llen key:计算链表的元素个数
(10)linsert key after|before search value:在key 链表中寻找search,并在search值之前|之后插入value
(11)rpoplpush source dest:把source 的末尾拿出,放到dest头部,并返回单元值
应用场景: task + bak 双链表完成安全队列
业务逻辑: rpoplpush task bak
接收返回值并做业务处理
如果成功则rpop bak清除任务,如果不成功,下次从bak表取任务
(12)brpop,blpop key timeout:等待弹出key的尾/头元素
timeout为等待超时时间,如果timeout为0则一直等待下去
应用场景:长轮询ajax,在线聊天时能用到
(四)、hashes类型及操作
Redis hash 是一个string类型的field和value的映射表,它的添加、删除操作都是O(1)(平均)。hash特别适用于存储对象,将一个对象存储在hash类型中会占用更少的内存,并且可以方便的存取整个对象。
配置: hash_max_zipmap_entries 64 #配置字段最多64个
hash_max_zipmap_value 512 #配置value最大为512字节
(1)hset myhash field value:设置myhash的field为value
(2)hsetnx myhash field value:不存在的情况下设置myhash的field为value
(3)hmset myhash field1 value1 field2 value2:同时设置多个field
(4)hget myhash field:获取指定的hash field
(5)hmget myhash field1 field2:一次获取多个field
(6)hincrby myhash field 5:指定的hash field加上给定的值
(7)hexists myhash field:测试指定的field是否存在
(8)hlen myhash:返回hash的field数量
(9)hdel myhash field:删除指定的field
(10)hkeys myhash:返回hash所有的field
(11)hvals myhash:返回hash所有的value
(12)hgetall myhash:获取某个hash中全部的field及value
(五)、集合结构操作 特点:无序性、确定性、唯一性
(1)sadd key value1 value2:往集合里面添加元素
(2)smembers key:获取集合所有的元素
(3)srem key value:删除集合某个元素
(4)spop key:返回并删除集合中1个随机元素(可以坐抽奖,不会重复抽到某人)
(5)srandmember key:随机取一个元素
(6)sismember key value:判断集合是否有某个值
(7)scard key:返回集合元素的个数
(8)smove source dest value:把source的value移动到dest集合中
(9)sinter key1 key2 key3:求key1 key2 key3的交集
(10)sunion key1 key2:求key1 key2 的并集
(11)sdiff key1 key2:求key1 key2的差集
(12)sinterstore res key1 key2:求key1 key2的交集并存在res里
(六)、有序集合
概念:它是在set的基础上增加了一个顺序属性,这一属性在添加修改元素的时候可以指定,每次指定后,zset会自动按新的值调整顺序。可以理解为有两列的mysql表,一列存储value,一列存储顺序,操作中key理解为zset的名字。
和set一样sorted,sets也是string类型元素的集合,不同的是每个元素都会关联一个double型的score。sorted set的实现是skip list和hash table的混合体。
当元素被添加到集合中时,一个元素到score的映射被添加到hash table中,所以给定一个元素获取score的开销是O(1)。另一个score到元素的映射被添加的skip list,并按照score排序,所以就可以有序地获取集合中的元素。添加、删除操作开销都是O(logN)和skip list的开销一致,redis的skip list 实现是双向链表,这样就可以逆序从尾部去元素。sorted set最经常使用方式应该就是作为索引来使用,我们可以把要排序的字段作为score存储,对象的ID当元素存储。
(1)zadd key score1 value1:添加元素
(2)zrange key start stop :把集合排序后,返回名次的元素默认是升续排列withscores 是把score也打印出来
(3)zrank key member:查询member的排名(升序0名开始)
(4)zrangebyscore key min max limit offset N:集合(升序)排序后取score在内的元素,并跳过offset个,取出N个
(5)zrevrank key member:查询member排名(降序 0名开始)
(6)zremrangebyscore key min max:按照score来删除元素,删除score在之间
(7)zrem key value1 value2:删除集合中的元素
(8)zremrangebyrank key start end:按排名删除元素,删除名次在之间的
(9)zcard key:返回集合元素的个数
(10)zcount key min max:返回区间内元素数量
(11)zinterstore dest numkeys key1 ]
求key1,key2的交集,key1,key2的权值分别是weight1,weight2
聚合方法用 sum|min|max
聚合结果 保存子dest集合内
注意:weights,aggregate如何理解?
答:如果有交集,交集元素又有score,score怎么处理?aggregate num->score相加,min最小score,max最大score,另外可以通过weights设置不同的key的权重,交集时score*weight
(七)、服务器相关命令
(1)ping:测定连接是否存活
(2)echo:在命令行打印一些内容
(3)select:选择数据库
(4)quit:退出连接
(5)dbsize:返回当前数据库中key的数目
(6)info:获取服务器的信息和统计
(7)monitor:实时转储收到的请求
(8)config get 配置项:获取服务器配置的信息
config set 配置项值:设置配置项信息
(9)flushdb:删除当前选择数据库中所有的key
(10)flushall:删除所有数据库中的所有的key
(11)time:显示服务器时间,时间戳(秒),微秒数
(12)bgrewriteaof:后台保存rdb快照
(13)bgsave:后台保存rdb快照
(14)save:保存rdb快照
(15)lastsave:上次保存时间
(16)shutdown
注意:如果不小心运行了flushall,立即shutdown nosave,关闭服务器,然后手工编辑aof文件,去掉文件中的flushall相关行,然后开启服务器,就可以倒回原来是数据。如果flushall之后,系统恰好bgwriteaof了,那么aof就清空了,数据丢失。
(17)showlog:显示慢查询
问:多慢才叫慢?
答:由slowlog-log-slower-than 10000,来指定(单位为微秒)
问:服务器存储多少条慢查询记录
答:由slowlog-max-len 128,来做限制
4、管道
redis-py默认在执行每次请求都会创建(连接池申请连接)和断开(归还连接池)一次连接操作,如果想要在一次请求中指定多个命令,则可以使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline 是原子性操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import redis
pool = redis.ConnectionPool(host='10.211.55.4', port=6379)
r = redis.Redis(connection_pool=pool)
# pipe = r.pipeline(transaction=False)
pipe = r.pipeline(transaction=True)
r.set('name', 'alex')
r.set('role', 'sb')
pipe.execute()
5、发布订阅
三、 mysql
页:
[1]