RabbitMQ、Rdis
RabbitMQRabbitMQ:官方网址:http://www.rabbitmq.com/
Install Erlang
# rpm -ivh erlang-18.3-1.el6.x86_64.rpm
Install RabbitMQ Server
rpm -ivh rabbitmq-server-3.6.2-1.noarch.rpm
1、Hello World!
生产者
import pika
credentials = pika.PlainCredentials('guest', 'guest')# 使用用户名密码进行验证
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='192.168.33.35', port=5672, credentials=credentials))
channel = connection.channel()# 声明一个通道
channel.queue_declare(queue='test.queue')# 声明一个queue
channel.basic_publish(exchange='', routing_key='test.queue', body='Hello World!')
print(" Sent 'Hello World!'")
connection.close()
消费者
import pika
credentials = pika.PlainCredentials('guest', 'guest')# 使用用户名密码进行验证
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='192.168.33.35', port=5672, credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='test.queue')
def callback(ch, method, properties, body):
print(" Received %r" % body)
channel.basic_consume(callback, queue='test.queue', no_ack=True)# callback如果收到消息就调用callback函数处理消息
#no_ack=True 指客户端处理完消息后,不跟服务器进行确认,这个参数在生成一定不能是用
channel.basic_qos(prefetch_count=1)# 设置接收端控制消息处理机制
print('
[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
acknowledgments
Message acknowledgments are turned on by default,在前面的代码内,我们为了举例,no_ack=True,这个参数在生产环境强烈建议去掉。
下面展示一段消费者代码:
import pika
credentials = pika.PlainCredentials('guest', 'guest')# 使用用户名密码进行验证
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='192.168.33.35', port=5672, credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='test.queue', durable=True)
def callback(ch, method, properties, body):
print(" Received %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)# 代表告诉消息服务器我处理完毕,你可以把消息删除了
channel.basic_consume(callback, queue='test.queue')# callback如果收到消息就调用callback函数处理消息
channel.basic_qos(prefetch_count=1)# 设置接收端控制消息处理机制
print('
[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
消息的持久化
First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:
channel.queue_declare(queue='hello', durable=True)
This queue_declare change needs to be applied to both the producer and consumer code.
Now we need to mark our messages as persistent - by supplying a delivery_mode property with a value 2.
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
例1:
import pika
credentials = pika.PlainCredentials('guest', 'guest')# 使用用户名密码进行验证
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='192.168.33.35', port=5672, credentials=credentials))
channel = connection.channel()# 声明一个通道
channel.queue_declare(queue='test.queue', durable=True)# 声明一个queue
properties = pika.BasicProperties(delivery_mode=2)# 消息持久化
channel.basic_publish(exchange='', routing_key='test.queue', body='Hello World!', properties=properties)
print(" Sent 'Hello World!'")
connection.close()
消息发布\订阅
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
fanout: 所有bind到此exchange的queue都可以接收消息
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
注意:消息不会存在Exchange里面。
exchange type=fanout
生产者:
import pika
import sys
credentials = pika.PlainCredentials('guest', 'guest')# 使用用户名密码进行验证
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='192.168.33.35', port=5672, credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout', durable=True)
message = ' '.join(sys.argv) or "info: Hello Worlcccc!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" Sent %r" % message)
connection.close()
消费者:
import pika
credentials = pika.PlainCredentials('guest', 'guest')# 使用用户名密码进行验证
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='192.168.33.35', port=5672, credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout', durable=True)
result = channel.queue_declare(exclusive=True)# 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print('
[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r" % body)
channel.basic_consume(callback, queue=queue_name)
channel.start_consuming()
exchange type=direct
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
生产者
import pika
import sys
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='192.168.33.35', port=5672, credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', type='direct')
severity = sys.argv if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv) or 'Hello World!'
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
print(" Sent %r:%r" % (severity, message))
connection.close()
消费者
import pika
import sys
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='192.168.33.35', port=5672, credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv
if not severities:
sys.stderr.write("Usage: %s \n" % sys.argv)
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
print('
[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r:%r" % (method.routing_key, body))
channel.basic_consume(callback, queue=queue_name)
channel.start_consuming()
exchange type=topic
生产者:
消费者:
#:代表接收所有消息
Remote procedure call (RPC)
RabbitMQ常用命令:
列出所有queue,The default value is "/"
1、rabbitmqctl list_queues
修改用户的密码
2、change_password <username> <newpassword>
Redis
Redis是缓存数据库,
import redis
r = redis.Redis(host='192.168.33.35', port=6379)
r.set('foo', 'Bar')
print(r.get('foo'))
连接池方式:
import redis
pool = redis.ConnectionPool(host='192.168.33.35', port=6379)
r = redis.Redis(connection_pool=pool)
r.set('foo', 'Bar')
print(r.get('foo'))
set(name, value, ex=None, px=None, nx=False, xx=False)
在Redis中设置值,默认,不存在则创建,存在则修改
参数:
ex,过期时间(秒)
px,过期时间(毫秒)
nx,如果设置为True,则只有name不存在时,当前set操作才执行
xx,如果设置为True,则只有name存在时,岗前set操作才执行
Redis常用命令介绍
页:
[1]