cundeng 发表于 2017-7-2 16:26:07

RabbitMQ 从入门到放弃

  RabbitMQ 从入门到放弃
  RabbitMQ官方配置文档
  Pika官方配置文档

消息的properties



AMQP定义了14个消息属性,可以把properties看成是伴随着消息的数据结构,和routing_key一样,routing_key会送到exchange,而properties会和消息一起送给接收端。
delivery_mode:用于做消息持久化(delivery_mode=2);
content_type:消息内容格式;
reply_to:一般用于RPC过程,消息处理返回结果队列;
correlation_id:用于关联RPC的请求与应答;
correlation_id的使用很简单,接收端反馈消息时,要(在发聩消息的properties里)带上correlation_id,发送端接收到反馈便可以用这个对应结果(参考3的第7篇笔记,center-computer的例子是对应到发送的线程)。
correlation_id最初还是来自于发送端,发送消息时附带在它的properties中,同时附带的还有接收反馈消息的队列名称(reply_to)。
如果还有更复杂的情况,复杂到使用properties也解决不了,那么就使用更复杂的消息吧,例如json格式的消息(properties.content_type=application/json),类似correlation_id和reply_to的信息,也可以放在消息内容里


process_data_events()



由于发送端在设置接收消息反馈时,还要继续其他的过程,不能执行channel.start_consuming等在这里。设计上,我们让发送端接收到反馈时就退出,所以即使完成其他所有过程,也不能无限制等待。
发送端接收到反馈时,消息保存在self.response中,所以我们等待它被赋值就可以了。最简单的等待方式就是循环sleep,不过这里有其他机制。
self.connection.process_data_events()是一个等待消息的阻塞过程,连接的任何消息都可以使它脱离阻塞状态(有点像Ajax的事件等待机制)
while self.response is None :
self.connection.process_data_events()
# response is not None here


临时queue
  正如前面学到的,对于一个queue,会有自己的名字(hello什么的),首先:

result = channel.queue_declare()

  然后通过result.method.queue,系统会随机给queue命名:

queue_name = result.method.queue

  如果我们想Producer与Consumer断开连接时,队列queue删除,那么需要改成下面的代码:

result = channel.queue_declare(exclusive=True)

# 删除临时队列,不再获取数据
channel.queue_delete(queue_name)
Message acknowledgment 消息确认
  每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了。因为我们采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。
  如果一个Consumer异常退出了,它处理的数据能够被另外的Consumer处理,这样数据在这种情况下就不会丢失了(注意是这种情况下)。


  为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack。而应该是在处理完数据后发送ack。
  在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了。


  如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。

acknowledgment 消息不丢失(消费者)
  no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。


[*]回调函数中的ch.basic_ack(delivery_tag=method.delivery_tag)
[*]basic_comsume中的no_ack=False



在接收端的callback最后:
channel.basic_ack(delivery_tag=method.delivery_tag)
ack即acknowledge(承认,告知已收到)
A message MUST not be acknowledged morethan once. The receiving peer MUST validate that a non-zero delivery-tag refersto a delivered message, and raise a channel exception if this is not the case.
除了callback函数,还要在之前设置接收消息时指定no_ack(默认False)
只有在Consumer断开连接时,RabbitMQ才会重新发送未经确认的消息。超时的情况并未考虑:无论Consumer需要处理多长时间,RabbitMQ都不会重发消息。



Message durability消息持久化
  在上一节中我们知道了即使Consumer异常退出,Message也不会丢失。但是如果RabbitMQ Server退出呢?软件都有bug,即使RabbitMQ Server是完美毫无bug的(当然这是不可能的,是软件就有bug,没有bug的那不叫软件),它还是有可能退出的:被其它软件影响,或者系统重启了,系统panic了。。。
  为了保证在RabbitMQ退出或者crash了数据仍没有丢失,需要将queue和Message都要持久化。

durable 消息不丢失(生产者)
  消息生产者端发送消息时挂掉了,消费者接消息时挂掉了,以下方法会让RabbitMQ重新将该消息添加到队列中:


[*]Message持久化,发布消息端的basic_publish添加参数properties=pika.BasicProperties(delivery_mode=2),生产者端需要做的
[*]queue持久化,需要在声明时指定durable=True,生产者端需要做的
[*]回调函数中的ch.basic_ack(delivery_tag=method.delivery_tag),消费端需要做的
[*]basic_comsume中的no_ack=False,消费端需要做的
  再次强调,Producer和Consumer都应该去创建这个queue,尽管只有一个地方的创建是真正起作用的。



====================生产者=======================
import pika
credentials = pika.PlainCredentials("openstack", "openstack")
conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.20.182", credentials=credentials))
channel = conn.channel()
channel.queue_declare(queue="hello2", durable=True)
channel.basic_publish(exchange="",
routing_key="hello2",
body="hello world....",
properties=pika.BasicProperties(delivery_mode=2)
)
print(" Sent 'Hello World!'")
conn.close()

====================消费者=======================
import pika
credentials = pika.PlainCredentials("openstack", "openstack")
conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.20.182", credentials=credentials))
channel = conn.channel()
channel.queue_declare(queue="hello2", durable=True)
def callback(ch, method, properties, body):
print(" Received %s" %body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback, queue="hello2", no_ack=False)
channel.start_consuming()


3、消息获取顺序
  默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者2去队列中获取 偶数 序列的任务。
  channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列



===================消费者=================
import pika
credentials = pika.PlainCredentials("openstack", "openstack")
conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.20.182", credentials=credentials))
channel = conn.channel()
channel.queue_declare(queue="hello2", durable=True)
def callback(ch, method, properties, body):
print(" Received %s" %body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue="hello2", no_ack=False)
channel.start_consuming()


Exchange
  RabbitMQ 的Messaging Model就是Producer并不会直接发送Message到queue。实际上,Producer并不知道它发送的Message是否已经到达queue。
  Producer发送的Message实际上是发到了Exchange中。它的功能也很简单:从Producer接收Message,然后投递到queue中。Exchange需要知道如何处理Message,是把它放到那个queue中,还是放到多个queue中?这个rule是通过Exchange 的类型定义的。
  RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储 。RabbitMQ提供了四种Exchange:fanout,direct,topic,headerheader模式在实际使用中较少,只对前三种模式进行比较。
  消费者

发布和订阅 exchange type = fanout
  发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

  任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。
  1.可以理解为路由表的模式
  2.这种模式不需要RouteKey
  3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。
  4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。
  消费者
  关键字 exchange type = direct

  
  任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue。
  1.一般情况可以使用rabbitMQ自带的Exchange:”"(该Exchange的名字为空字符串,下文称其为default Exchange)。
  2.这种模式下不需要将Exchange进行任何绑定(binding)操作
  3.消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。
  4.如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃
  消费者
  模糊订阅 exchange type = topic

  
  任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上
  1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
  2.这种模式需要RouteKey,也许要提前绑定Exchange与Queue。
  3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
  4.“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
  5.同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。
页: [1]
查看完整版本: RabbitMQ 从入门到放弃