jxdiscuz 发表于 2017-7-2 12:21:07

python学习-day11

  一、Rabbitmq
  RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。RabbitMQ使用的是AMQP协议,它是一种二进制协议。默认启动端口 5672。在 RabbitMQ 中,如下图结构:



[*]左侧 P 代表 生产者,也就是往 RabbitMQ 发消息的程序。生产者需要完成的任务:
[*]


创建RabbitMQ连接
获取信道
声明交换器
创建消息
发布消息
关闭信道
关闭RabbitMQ连接
[*]中间即是 RabbitMQ,其中包括了 交换机 和 队列。
[*]右侧 C 代表 消费者,也就是往 RabbitMQ 拿消息的程序。消费者需要完成的任务:


创建RabbitMQ连接
获取信道
声明交换器
声明队列
队列和交换器绑定
消费信息
关闭信道
关闭RabbitMQ连接
[*]Exchange: 接受客户端发送的消息,并根据Binding将消息路由给服务器中的队列,Exchange分为direct, fanout, topic三种。
[*]Binding: 连接Exchange和Queue,包含路由规则。
[*]Queue: 消息队列,存储还未被消费的消息。
[*]Message: Header+Body
[*]Channel: 通道,执行AMQP的命令;一个连接可创建多个通道以节省资源
  1. dircted exchange
  路由键exchange,该交换机收到消息后会把消息发送到指定routing-key的queue中。rabbitmq内部默认有一个特殊的dircted exchange,该交换机的name是空字符串,所有queue都默认binding 到该交换机上。所有binding到该交换机上的queue,routing-key都和queue的name一样。
  生产者:





import pika
credentials = pika.PlainCredentials('admin', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
   '192.168.170.134',5672,'/',credentials))
channel = connection.channel()

# 声明queue
channel.queue_declare(queue='hello')

# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
                     routing_key='hello',
                     body='Hello World!')
print(" Sent 'Hello World!'")
connection.close()
View Code  消费者:





import pika
credentials = pika.PlainCredentials('admin', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
   '192.168.170.134',5672,'/',credentials))
channel = connection.channel()
def callback(ch, method, properties, body):
   print(" Received %r" % body)

channel.basic_consume(callback,
                     queue='hello',
                     no_ack=True)

print('
[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
View Code  队列绑定关键字,发送者将数据关键字发送到消息Exchange,Exchange根据关键字判定应该将数据发送至指定队列。
  生产者:





import pika,sys

credentials = pika.PlainCredentials('admin', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
   '192.168.170.134',5672,'/',credentials))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
                        type='direct')

severity = sys.argv if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                     routing_key=severity,
                     body=message)
print(" Sent %r:%r" % (severity, message))
connection.close()
View Code  消费者:





import pika,sys

credentials = pika.PlainCredentials('admin', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
   '192.168.170.134',5672,'/',credentials))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                        type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv
if not severities:
   sys.stderr.write("Usage: %s \n" % sys.argv)
   sys.exit(1)

for severity in severities:
   channel.queue_bind(exchange='direct_logs',
                        queue=queue_name,
                        routing_key=severity)

print('
[*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
   print(" %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                     queue=queue_name,
                     no_ack=True)

channel.start_consuming()
View Code  运行结果:
  2. fanout exchange
  发布/订阅exchange ,发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,二发布者发布消息时,会将消息放置在所有相关队列中。任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。
  生产者:





import pika,sys

credentials = pika.PlainCredentials('admin', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
   '192.168.170.134',5672,'/',credentials))
channel = connection.channel()
# 声明queue
channel.exchange_declare(exchange='logs',
                        type='fanout')

message = ' '.join(sys.argv) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                     routing_key='',
                     body=message)
print(" Sent %r" % message)
connection.close()
View Code  消费者:





import pika

credentials = pika.PlainCredentials('admin', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
   '192.168.170.134',5672,'/',credentials))
channel = connection.channel()

result = channel.queue_declare(exclusive=True)# 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                  queue=queue_name)

print('
[*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
   print(" %r" % body)

channel.basic_consume(callback,
                     queue=queue_name,
                     no_ack=True)

channel.start_consuming()
View Code  3. topic exchange
  在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入"路由值"和"关键字"进行匹配,匹配成功,则将数据发送到指定队列。


[*]  # :表示可以匹配0个或多个单词;

[*]  * :表示只能匹配一个单词。

  生产者:





import pika,sys

credentials = pika.PlainCredentials('admin', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
   '192.168.170.134',5672,'/',credentials))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                        type='topic')

routing_key = sys.argv if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                     routing_key=routing_key,
                     body=message)
print(" Sent %r:%r" % (routing_key, message))
connection.close()
View Code  消费者:





import pika,sys

credentials = pika.PlainCredentials('admin', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
   '192.168.170.134',5672,'/',credentials))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                        type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv
if not binding_keys:
   sys.stderr.write("Usage: %s ...\n" % sys.argv)
   sys.exit(1)

for binding_key in binding_keys:
   channel.queue_bind(exchange='topic_logs',
                        queue=queue_name,
                        routing_key=binding_key)

print('
[*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
   print(" %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                     queue=queue_name,
                     no_ack=True)

channel.start_consuming()
View Code  二、基于rabbitmq的RPC
  基于rabbitmq的rpc实现流程:
  (1)首先客户端发送一个reply_to和corrention_id的请求,发布到RPC队列中;
  (2)服务器端处理这个请求,并把处理结果发布到一个回调Queue,此Queue的名称应当与reply_to的名称一致
  (3)客户端从回调Queue中得到先前correlation_id设定的值的处理结果。如果碰到和先前不一样的corrention_id的值,将会忽略而不是抛出异常。
  对于上面所提到的回调Queue中的消费处理使用的是BasicProperties类。
  服务端:





import pika

cre_publiser = pika.PlainCredentials('admin', '123456')
conn_para = pika.ConnectionParameters('192.168.170.134',5672,'/',cre_publiser)
connection = pika.BlockingConnection(conn_para)

# 建立会话
channel = connection.channel()

# 声明RPC请求队列
channel.queue_declare(queue='rpc_queue')

# 数据处理方法
def fib(n):
   if n == 0:
         return 0
   elif n == 1:
         return 1
   else:
         return fib(n-1) + fib(n-2)

# 对RPC请求队列中的请求进行处理
def on_request(ch, method, props, body):
   n = int(body)

   print(" [.] fib(%s)" % n)

   # 调用数据处理方法
   response = fib(n)

   # 将处理结果(响应)发送到回调队列
   ch.basic_publish(exchange='',
                      routing_key=props.reply_to,
                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
                      body=str(response))
   ch.basic_ack(delivery_tag = method.delivery_tag)

# 负载均衡,同一时刻发送给该服务器的请求不超过一个
channel.basic_qos(prefetch_count=1)

channel.basic_consume(on_request,
                     queue='rpc_queue')

print(" Awaiting RPC requests")
channel.start_consuming()
View Code  客户端:





import pika
import uuid
class FibonacciRpcClient(object):
   def __init__(self):
         self.cre_publiser = pika.PlainCredentials('admin', '123456')
         self.conn_para = pika.ConnectionParameters('192.168.170.134',5672,'/',self.cre_publiser)
         self.connection = pika.BlockingConnection(self.conn_para)

         self.channel = self.connection.channel()

         result = self.channel.queue_declare(exclusive=True)
         self.callback_queue = result.method.queue

         self.channel.basic_consume(self.on_response,
                                    no_ack=True,
                                    queue=self.callback_queue)

   def on_response(self, ch, method, props, body):
         if self.corr_id == props.correlation_id:
             self.response = body

   def call(self, n):
         self.response = None
         self.corr_id = str(uuid.uuid4())
         self.channel.basic_publish(exchange='',
                                    routing_key='rpc_queue',
                                    properties=pika.BasicProperties(
                                    reply_to=self.callback_queue,
                                    correlation_id=self.corr_id,
                                    ),
                                    body=str(n))
         while self.response is None:
             self.connection.process_data_events()
         return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(" Requesting fib(6)")
response = fibonacci_rpc.call(6)
print(" [.] Got %r" % response)
View Code
页: [1]
查看完整版本: python学习-day11