RabbitMQ 从入门到放弃
RabbitMQ 从入门到放弃RabbitMQ官方配置文档
Pika官方配置文档
消息的properties
AMQP定义了14个消息属性,可以把properties看成是伴随着消息的数据结构,和routing_key一样,routing_key会送到exchange,而properties会和消息一起送给接收端。
delivery_mode:用于做消息持久化(delivery_mode=2);
content_type:消息内容格式;
reply_to:一般用于RPC过程,消息处理返回结果队列;
correlation_id:用于关联RPC的请求与应答;
correlation_id的使用很简单,接收端反馈消息时,要(在发聩消息的properties里)带上correlation_id,发送端接收到反馈便可以用这个对应结果(参考3的第7篇笔记,center-computer的例子是对应到发送的线程)。
correlation_id最初还是来自于发送端,发送消息时附带在它的properties中,同时附带的还有接收反馈消息的队列名称(reply_to)。
如果还有更复杂的情况,复杂到使用properties也解决不了,那么就使用更复杂的消息吧,例如json格式的消息(properties.content_type=application/json),类似correlation_id和reply_to的信息,也可以放在消息内容里
process_data_events()
由于发送端在设置接收消息反馈时,还要继续其他的过程,不能执行channel.start_consuming等在这里。设计上,我们让发送端接收到反馈时就退出,所以即使完成其他所有过程,也不能无限制等待。
发送端接收到反馈时,消息保存在self.response中,所以我们等待它被赋值就可以了。最简单的等待方式就是循环sleep,不过这里有其他机制。
self.connection.process_data_events()是一个等待消息的阻塞过程,连接的任何消息都可以使它脱离阻塞状态(有点像Ajax的事件等待机制)
while self.response is None :
self.connection.process_data_events()
# response is not None here
临时queue
正如前面学到的,对于一个queue,会有自己的名字(hello什么的),首先:
result = channel.queue_declare()
然后通过result.method.queue,系统会随机给queue命名:
queue_name = result.method.queue
如果我们想Producer与Consumer断开连接时,队列queue删除,那么需要改成下面的代码:
result = channel.queue_declare(exclusive=True)
# 删除临时队列,不再获取数据
channel.queue_delete(queue_name)
Message acknowledgment 消息确认
每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了。因为我们采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了。
如果一个Consumer异常退出了,它处理的数据能够被另外的Consumer处理,这样数据在这种情况下就不会丢失了(注意是这种情况下)。
为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack。而应该是在处理完数据后发送ack。
在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了。
如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。
acknowledgment 消息不丢失(消费者)
no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
[*]回调函数中的ch.basic_ack(delivery_tag=method.delivery_tag)
[*]basic_comsume中的no_ack=False
在接收端的callback最后:
channel.basic_ack(delivery_tag=method.delivery_tag)
ack即acknowledge(承认,告知已收到)
A message MUST not be acknowledged morethan once. The receiving peer MUST validate that a non-zero delivery-tag refersto a delivered message, and raise a channel exception if this is not the case.
除了callback函数,还要在之前设置接收消息时指定no_ack(默认False)
只有在Consumer断开连接时,RabbitMQ才会重新发送未经确认的消息。超时的情况并未考虑:无论Consumer需要处理多长时间,RabbitMQ都不会重发消息。
Message durability消息持久化
在上一节中我们知道了即使Consumer异常退出,Message也不会丢失。但是如果RabbitMQ Server退出呢?软件都有bug,即使RabbitMQ Server是完美毫无bug的(当然这是不可能的,是软件就有bug,没有bug的那不叫软件),它还是有可能退出的:被其它软件影响,或者系统重启了,系统panic了。。。
为了保证在RabbitMQ退出或者crash了数据仍没有丢失,需要将queue和Message都要持久化。
durable 消息不丢失(生产者)
消息生产者端发送消息时挂掉了,消费者接消息时挂掉了,以下方法会让RabbitMQ重新将该消息添加到队列中:
[*]Message持久化,发布消息端的basic_publish添加参数properties=pika.BasicProperties(delivery_mode=2),生产者端需要做的
[*]queue持久化,需要在声明时指定durable=True,生产者端需要做的
[*]回调函数中的ch.basic_ack(delivery_tag=method.delivery_tag),消费端需要做的
[*]basic_comsume中的no_ack=False,消费端需要做的
再次强调,Producer和Consumer都应该去创建这个queue,尽管只有一个地方的创建是真正起作用的。
====================生产者=======================
import pika
credentials = pika.PlainCredentials("openstack", "openstack")
conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.20.182", credentials=credentials))
channel = conn.channel()
channel.queue_declare(queue="hello2", durable=True)
channel.basic_publish(exchange="",
routing_key="hello2",
body="hello world....",
properties=pika.BasicProperties(delivery_mode=2)
)
print(" Sent 'Hello World!'")
conn.close()
====================消费者=======================
import pika
credentials = pika.PlainCredentials("openstack", "openstack")
conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.20.182", credentials=credentials))
channel = conn.channel()
channel.queue_declare(queue="hello2", durable=True)
def callback(ch, method, properties, body):
print(" Received %s" %body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback, queue="hello2", no_ack=False)
channel.start_consuming()
3、消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者2去队列中获取 偶数 序列的任务。
channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列
===================消费者=================
import pika
credentials = pika.PlainCredentials("openstack", "openstack")
conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.20.182", credentials=credentials))
channel = conn.channel()
channel.queue_declare(queue="hello2", durable=True)
def callback(ch, method, properties, body):
print(" Received %s" %body)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue="hello2", no_ack=False)
channel.start_consuming()
Exchange
RabbitMQ 的Messaging Model就是Producer并不会直接发送Message到queue。实际上,Producer并不知道它发送的Message是否已经到达queue。
Producer发送的Message实际上是发到了Exchange中。它的功能也很简单:从Producer接收Message,然后投递到queue中。Exchange需要知道如何处理Message,是把它放到那个queue中,还是放到多个queue中?这个rule是通过Exchange 的类型定义的。
RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储 。RabbitMQ提供了四种Exchange:fanout,direct,topic,headerheader模式在实际使用中较少,只对前三种模式进行比较。
消费者
发布和订阅 exchange type = fanout
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。
1.可以理解为路由表的模式
2.这种模式不需要RouteKey
3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。
4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。
消费者
关键字 exchange type = direct
任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue。
1.一般情况可以使用rabbitMQ自带的Exchange:”"(该Exchange的名字为空字符串,下文称其为default Exchange)。
2.这种模式下不需要将Exchange进行任何绑定(binding)操作
3.消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。
4.如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃
消费者
模糊订阅 exchange type = topic
任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上
1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
2.这种模式需要RouteKey,也许要提前绑定Exchange与Queue。
3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
4.“#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
5.同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。
页:
[1]