python学习-day11
一、RabbitmqRabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。RabbitMQ使用的是AMQP协议,它是一种二进制协议。默认启动端口 5672。在 RabbitMQ 中,如下图结构:
[*]左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。生产者需要完成的任务:
[*]
创建RabbitMQ连接
获取信道
声明交换器
创建消息
发布消息
关闭信道
关闭RabbitMQ连接
[*]中间即是 RabbitMQ,其中包括了 交换机 和 队列。
[*]右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。消费者需要完成的任务:
创建RabbitMQ连接
获取信道
声明交换器
声明队列
队列和交换器绑定
消费信息
关闭信道
关闭RabbitMQ连接
[*]Exchange: 接受客户端发送的消息,并根据Binding将消息路由给服务器中的队列,Exchange分为direct, fanout, topic三种。
[*]Binding: 连接Exchange和Queue,包含路由规则。
[*]Queue: 消息队列,存储还未被消费的消息。
[*]Message: Header+Body
[*]Channel: 通道,执行AMQP的命令;一个连接可创建多个通道以节省资源
1. dircted exchange
路由键exchange,该交换机收到消息后会把消息发送到指定routing-key的queue中。rabbitmq内部默认有一个特殊的dircted exchange,该交换机的name是空字符串,所有queue都默认binding 到该交换机上。所有binding到该交换机上的queue,routing-key都和queue的name一样。
生产者:
import pika
credentials = pika.PlainCredentials('admin', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.170.134',5672,'/',credentials))
channel = connection.channel()
# 声明queue
channel.queue_declare(queue='hello')
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" Sent 'Hello World!'")
connection.close()
View Code 消费者:
import pika
credentials = pika.PlainCredentials('admin', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.170.134',5672,'/',credentials))
channel = connection.channel()
def callback(ch, method, properties, body):
print(" Received %r" % body)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
print('
[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
View Code 队列绑定关键字,发送者将数据关键字发送到消息Exchange,Exchange根据关键字判定应该将数据发送至指定队列。
生产者:
import pika,sys
credentials = pika.PlainCredentials('admin', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.170.134',5672,'/',credentials))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = sys.argv if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" Sent %r:%r" % (severity, message))
connection.close()
View Code 消费者:
import pika,sys
credentials = pika.PlainCredentials('admin', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.170.134',5672,'/',credentials))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv
if not severities:
sys.stderr.write("Usage: %s \n" % sys.argv)
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print('
[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
View Code 运行结果:
2. fanout exchange
发布/订阅exchange ,发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,二发布者发布消息时,会将消息放置在所有相关队列中。任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。
生产者:
import pika,sys
credentials = pika.PlainCredentials('admin', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.170.134',5672,'/',credentials))
channel = connection.channel()
# 声明queue
channel.exchange_declare(exchange='logs',
type='fanout')
message = ' '.join(sys.argv) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" Sent %r" % message)
connection.close()
View Code 消费者:
import pika
credentials = pika.PlainCredentials('admin', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.170.134',5672,'/',credentials))
channel = connection.channel()
result = channel.queue_declare(exclusive=True)# 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print('
[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
View Code 3. topic exchange
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入"路由值"和"关键字"进行匹配,匹配成功,则将数据发送到指定队列。
[*] # :表示可以匹配0个或多个单词;
[*] * :表示只能匹配一个单词。
生产者:
import pika,sys
credentials = pika.PlainCredentials('admin', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.170.134',5672,'/',credentials))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" Sent %r:%r" % (routing_key, message))
connection.close()
View Code 消费者:
import pika,sys
credentials = pika.PlainCredentials('admin', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.170.134',5672,'/',credentials))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv
if not binding_keys:
sys.stderr.write("Usage: %s ...\n" % sys.argv)
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print('
[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
View Code 二、基于rabbitmq的RPC
基于rabbitmq的rpc实现流程:
(1)首先客户端发送一个reply_to和corrention_id的请求,发布到RPC队列中;
(2)服务器端处理这个请求,并把处理结果发布到一个回调Queue,此Queue的名称应当与reply_to的名称一致
(3)客户端从回调Queue中得到先前correlation_id设定的值的处理结果。如果碰到和先前不一样的corrention_id的值,将会忽略而不是抛出异常。
对于上面所提到的回调Queue中的消费处理使用的是BasicProperties类。
服务端:
import pika
cre_publiser = pika.PlainCredentials('admin', '123456')
conn_para = pika.ConnectionParameters('192.168.170.134',5672,'/',cre_publiser)
connection = pika.BlockingConnection(conn_para)
# 建立会话
channel = connection.channel()
# 声明RPC请求队列
channel.queue_declare(queue='rpc_queue')
# 数据处理方法
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
# 对RPC请求队列中的请求进行处理
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
# 调用数据处理方法
response = fib(n)
# 将处理结果(响应)发送到回调队列
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
# 负载均衡,同一时刻发送给该服务器的请求不超过一个
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,
queue='rpc_queue')
print(" Awaiting RPC requests")
channel.start_consuming()
View Code 客户端:
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.cre_publiser = pika.PlainCredentials('admin', '123456')
self.conn_para = pika.ConnectionParameters('192.168.170.134',5672,'/',self.cre_publiser)
self.connection = pika.BlockingConnection(self.conn_para)
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response,
no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" Requesting fib(6)")
response = fibonacci_rpc.call(6)
print(" [.] Got %r" % response)
View Code
页:
[1]