|
一:介绍:(induction)
Rabbitmq 是一个消息中间件。他的思想就是:接收和发送消息。你可以把它想成一个邮政局。当你把你的邮件发送到邮箱的,首先你需要确认的是:邮政员先生能把你的邮件发送给你想发送的地方。在这个比喻中,rabbitmq就是一个邮箱、一个邮政局、一个邮递员。
在这里rabbitmq和传统邮政局的区别:rabbitmq不处理信纸。取而代之的是:接收、储存、发送二进制数的消息。
rabbitmq和消息用如下专业术语:
生产者意思发送。A程序发送消息被称为:producer。我们可以用如下图标表示生产者:
队列的意思:邮箱。他是rabbitmq的内部结构。虽然消息经过rabbitmq和你的程序。他也可以储存在队列里。一个队列的消息绑定是没有限制。他可以储存任何你喜欢的数据。他本质上就是无限大的缓存区。很多生产者可以发送消息到队列中,而消费者们可以尝试从队列里获取
数据。队列A 如下图标表示,队列名字标注上面:
消费者:意思是接收数据 。一个消费者类型的程序大多数是在等待数据的接收。我们用如下C图标表示:
如上的生产者、消费者、消息队列大多数是存储在不同的机器上。
二:hello word
一个helloword 队列并不复杂。他只是发送消息、接收消息、打印输出到屏幕。为了实现这些功能,我们需要2个程序:一个是发送消息;另一个接收消息并打印输出消息。
我们的设想如下:
生产者发送消息到hello队列,消费者从hello队列获取消息。
三:发送
我们第一个程序是send.py。他将帮我们发送一个简单的消息到队列里。我们首先需要做的是和rabbitmq server端建立起连接。
建立连接的程序片段如下:
1 #!/usr/bin/env python
2 import pika
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 'localhost'))
6 channel = connection.channel()#通信频道
我们现在就可以进行连接了。把消息发送到本地机器(localhost)上,如果我们相连接消息中间件(rabbitmq)在不同的机器上。我们可以在上面的connection里把localhost写成ip地址或者可以解析的主机名字。
第二行的代码,是通信频道。在我们发送的时候需要确认接收的队列是否存在。如果我们发送的消息到不存在的队列,这个消息会被丢弃。让我们一起建立一个接收消息的队列。我们命令为:hello。
1 channel.queue_declare(queue='hello')
现在我们可以准备发送消息了。我们第一条消息,只是一个简单的字符串hello world!我们把这个字符串发送到我们的hello队列里。
在rabbitmq里,消息不是直接发送到队列里。他发送的到队列里是通过exchange。exchange先不做过多介绍。我们现在需要知道怎么使用默认的exchange,是由空字符串(“”)来表示。这个是特殊的exchange,他帮助我们把消息发送到我们想发送的队列中。
在默认的exchange中,我们需要填写我们要发送的队列的名字。即routiing_key这个参数中。
1 channel.basic_publish(exchange='',
2 routing_key='hello',
3 body='Hello World!')
4 print(" [x] Sent 'Hello World!'")
在关闭连接的时候,需要确认我们的消息已经发送到rabbitmq 或者我们的消息已经从网络缓存写入硬盘中。
1 connection.close()
四:接收
我们第二个程序:receive.py,他将接收我们消息从hello队列里并输出到屏幕里。
当然了,首先我们需要连接到rabbitmq服务端,连接的代码和之前的代码是一样的。
下一步,我们需要确认队列是否存在,我们可以根据参数:queue_declare来声明和创建一个新的队列。
你可以运行多次这样的命令,但是真正只创建只有一个队列。
1 channel.queue_declare(queue='hello')
你会有疑问,在productor程序已经创建一个队列为什么这里还写这个。这是因为我们不确定我们的程序运行顺序,比如说先运行productor的话,队列hello创建,但是如果先运行consumer的程序的话,该队列就不存在。所以这里在productor和consumer再次写这段代码
避免这个问题,如果队列存在,该条创建命令不会生效。
note:如何查看我们的rabbitmq里有哪些队列呢?
我们可以在rabbitmq服务器上运行如下命令:可以查看当前rabbitmq有哪些queue和以及每个queue有多少个消息,可以使用:需要管理员用户哦。
1 [iyunv@localhost ~]# rabbitmqctl list_queues
2 Listing queues ...
3 hello1 0
4 lmd 0
5 ...done.
在接收消息的程序里能复杂些,实际上是callback函数订阅queue的时候,才起作用。不管什么时候我们获取消息的时候,这个callback函数都会被模块Pika的库调用。callback函数在我们的程序里只是简单的打印输出到屏幕上。
1 def callback(ch, method, properties, body):
2 print(" [x] Received %r" % body)
接下来,我们需要告诉rabbitmq,这个我们自定义的callback函数将会接收我们的队列hello的消息。
1 channel.basic_consume(callback,
2 queue='hello',
3 no_ack=True)#这里的no_ack 意思是消息不做回执确认。
如上的命令想成功运行需要确保我们订阅的hello队列存在。 我们在上面代码的中已经用queue_declare 来声明和创建了这个队列了。
最后我们根据我们的需要,来执行一个循环语句等待接收消息和运行我们的callback函数如果你需要的话。
1 print('
Waiting for messages. To exit press CTRL+C')
2 channel.start_consuming()#循环接收我们的消息,接收之后并执行我们的callback函数。
我们把之前的代码整合在一起:
PUT TOGTHER:
send.py 代码:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
receive.py代码:
1 #!/usr/bin/env python
2 import pika
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 host='localhost'))
6 channel = connection.channel()
7
8 channel.queue_declare(queue='hello')
9
10 def callback(ch, method, properties, body):
11 print(" [x] Received %r" % body)
12
13 channel.basic_consume(callback,
14 queue='hello',
15 no_ack=True)
16
17 print('
Waiting for messages. To exit press CTRL+C')
18 channel.start_consuming()
现在我们可以在一个终端运行我们的代码。用我们的send.py程序来发送消息:
1 $ python send.py
2 [x] Sent 'Hello World!'
生产者程序会每执行一次结束。让我们来接收消息:
1 $ python receive.py
2
Waiting for messages. To exit press CTRL+C
3 [x] Received 'Hello World!'
Cheers!我们已经成功发送第一个消息通过rabbitmq,神奇吧,小伙伴!我相信聪明的你已经注意到:recvied.py 程序并没有执行一次结束,依然保持准备接收从hello队列里的接下来发的消息,这是由于参数:channel.start_consuming()的功劳。当然你可以用Ctrl-c来终止。
二:进阶
一:工作队列(Work Queues)
在第一部分的时候,我们已经写了发送和接收消息从一个hello队列中的2个程序。这部分将介绍一个工作队列,用于分配一些复杂的任务处理。
这个工作队列的主要思想是:避免创建一些可以立马的执行完毕的任务,而是一些需要耗时等待的一些任务。一会介绍我们任务。我们将把我们的任务封装(encapsulate)成消息并把消息发送给对垒。
然后,通过后台运行一个程序来队列获取任务并且执行该任务。如果你运行多个这样的程序的话,任务消息将会被这个程序所平分(shared)。
这个场景多数会用在web程序里,那种不能短时间被短暂的http请求所执行的任务。
二:准备(preparing)
在先前我们程序中我们简单的发送的"Hello World!"。现在我们将发送字符串来支撑我们的复杂任务。我们这个只是模拟一下,我们手里并没有复杂的任务。你可以想象成一个图片的大小的调整或者一个pdf文件的渲染(rendered)。让我们一起伪造一下该过程是一个
繁忙的过程。通过time模块的sleep()函数来实现。我们将通过字符串的句点.个数来模拟我们任务的复杂度。没存在一个句点我们将通过sleep函数来耗时一秒。比如说:hello... 将会耗时3秒。
我们可以通过我们之前的send.py程序稍作修改即可。通过随意杜撰的字符串来实现这个任务的复杂,通过命令行并发送给我们的工作队列(task_queue),我们把发送的脚本叫做: new_task.py:
1 import sys
2
3 message = ' '.join(sys.argv[1:]) or "Hello World!"
4 channel.basic_publish(exchange='',
5 routing_key='task_queue',
6 body=message,
7 properties=pika.BasicProperties(
8 delivery_mode = 2, # make message persistent
9 ))
10 print(" [x] Sent %r" % message)
我们之前的receive.py脚本也需要做一些更改,他需要从task_queue获取一个任务,他需要通过检索消息体中字符串的句点的个数来运行我们的任务。我们将下面的程序叫做:worker.py:
1 import time
2
3 def callback(ch, method, properties, body):
4 print(" [x] Received %r" % body)
5 time.sleep(body.count(b'.'))
6 print(" [x] Done")
三:循环获取任务策略:
使用任务队列的一个好处是我们可以很容易平行(parallelise)执行我们的任务。如果我们创建一个备份日志的任务。我们可以通过多加一些工作程序来解决。
首先我们尝试通过2个窗口来运行我们的woker.py脚本在同一时间内。这2个窗口就是我们2个消费者:C1 和 C2。
1 shell1$ python worker.py
2
Waiting for messages. To exit press CTRL+C
1 shell2$ python worker.py
2
Waiting for messages. To exit press CTRL+C
接下来我们将会发布一些任务消息。一旦你启动消息者程序,你就可以通过new_task.py来发布一些消息。
1 shell3$ python new_task.py First message.
2 shell3$ python new_task.py Second message..
3 shell3$ python new_task.py Third message...
4 shell3$ python new_task.py Fourth message....
5 shell3$ python new_task.py Fifth message.....
我们一起看下我们消费者接收哪些消息从我们task_queue队列中:
1 shell1$ python worker.py
2
Waiting for messages. To exit press CTRL+C
3 [x] Received 'First message.'
4 [x] Received 'Third message...'
5 [x] Received 'Fifth message.....'
1 shell2$ python worker.py
2
Waiting for messages. To exit press CTRL+C
3 [x] Received 'Second message..'
4 [x] Received 'Fourth message....'
通过上面的观察,默认情况下RabbitMQ将会把每个消息发送下一个消费者。顺序的,每个消费者获得的任务都是等差数列(相邻2个消息(包含数字)相减的值相等)。这种方式的分发消息我们循环发送消息。你可以通过多运行几个这样的woker.py来验证。
三:消息确认(Message acknowledgment)
执行一个任务耗时几秒钟。你会有一个疑问:如果我们一个消费者在执行任务的时候需要耗时很长时间的过程中,死掉的话。目前我们的代码,一旦rabbitmq发送消息给消费者之后,他会立刻把该消息从队列中移除。或者你杀死一个正在运行的woker的话,我们将会丢失这个消息,
以及我们将会丢失所有发送给这个woker的所有消息。
但是我们并不想丢失这些任务消息。如果worker(消费者)死掉的话,我们想把丢失的消息再次发送给另一个存活的woker并执行(消费者)。为了确保我们的消息不能被丢失,RabbitMQ给我们提供了一个acknowkedgement(消息确认)。在消费者收到消息并执行完的时候会发送一个
ack 确认告诉RabbitMQ 已经接收到并处理过这个消息。而RabbitMQ会把这个消息删除掉。
如果一个消费者因为以下原因而死掉:
1)通信chanel关闭。
2)连接关闭。
3)TCP连接丢失。
因为如上原因没有给RabbitMQ发送ack确认的话,RabbitMQ会认为这个消息没有被执行完,会把这个消息重新放在相应的消息队列中。如果这个时候,还有在线的消费者,这个消息会被很快重新发送给在线的消费者。这种方式保证了如果消费者因为某些原因而死亡,消息不被丢失。
消息没有超时时间,只有当消费者挂掉的时候,RabbitMQ才会重新发送消息,即使这个消息被执行很久很久的时间。
默认情况,消息的acknowledgments是被打开的。之前的代码例子中,我们通过:no_ack=True的标识来关闭这个功能。也就是说默认情况这个参数:no_ack=Flase.
在适当的时候,把no_ack=True移走这个标识,让woker程序在执行完消息之后,发送ack确认。
1 def callback(ch, method, properties, body):
2 print " [x] Received %r" % (body,)
3 time.sleep( body.count('.') )
4 print " [x] Done"
5 ch.basic_ack(delivery_tag = method.delivery_tag)#发送ack跟rabbitMQ消息确认。
6
7 channel.basic_consume(callback,
8 queue='hello')
使用如上代码,我们可以确认,即使你把其中的一个正在处理消息的woker程序杀死或者Ctrl+C终止程序。不会丢失任何消息。没有ack的确认的消息,一会会被重新发送给另一个woker。
四:消息持久化(Message durability)
我们已经学会,当消费者挂掉的时候,保证消息不丢失的情况。但是当RabbitMQ挂掉的时候消息还是会被丢失。
如果RabbitMQ没有做任何设置,当RabbitMQ停止或者崩溃的话,他会丢失所有队列以及消息。除非你做一些特殊设置。
需要做2件事情,保证消息不丢失:
1)标记队列为持久化:已经存在的RabbitMQ队列不能重新定义为持久化。
2)标记消息为持久化。
首先我们做第一件事情,保证RabbitMQ不会丢失我们的队列。因为我们需要在创建的队列的声明这个队列是持久化队列(durable)
1 channel.queue_declare(queue='hello', durable=True)#在创建队列的时候声明队列是持久化队列。
即使这个命令我们正确执行之后,但是并没有生效。因为我们已经定义一个队列hello,在声明创建的时候并没有说明这个队列是持久化。RabbitMQ并不允许重新定义一个已经存在的队列的不同的参数。他将会返回一个错误给程序。
那么我们重新定义一个不存在的队列。
1 channel.queue_declare(queue='work_queue', durable=True)
同样这个声明需要在生产者和消费者程序上需要写这行代码。
做了如上声明,可以保证RabbitMQ在被重启的时候,队列不会被丢失。接下来我们需要标记消息为持久化。
方法之前的代码中已经出现:
消息持久化:
properties=pika.BasicProperties(
delivery_mode = 2,
)
1 channel.basic_publish(exchange='',
2 routing_key="task_queue",
3 body=message,
4 properties=pika.BasicProperties(
5 delivery_mode = 2, # make message persistent
6 ))
注意(NOTE):
如上的标记消息为持久化,并不能完全保证消息不丢失,即使你告诉RabbitMQ需要把消息写入磁盘。但是仍然有一小段时间内RabbitMQ没有把消息写入磁盘中。RabboitMQ并没有把每条消息写入磁盘中。也许会把消息写入缓存中。而并不是真正写入硬盘中。
这个标记消息持久化并不健全。但是对于简单任务队列来说已经足够。如果你需要保证消息不被丢失,可以参考下:publisher confirms.
五:公平分配(Fair dispatch)
也许你发现上面的循环分配方式并不是我们想要的。比如,如下场景:
当所有的基数消息很繁重,相反偶数消息很轻巧。这就会导致一个woker 处理很缓慢,而另一个woker几乎没有任何消息需要等待处理。RabbitMQ并不知道消息情况,他依然会这样分配消息给woker。
这种情况,是因为RabbitMQ只是分配消息当消息进入队列的时候,RabbitMQ并查看消费者有多少没有确认的消息。他只是盲目(blindly)分配消息给消费者。
为避免这个情况,我们可以用:basic.qos方法来设置prefetch_count=1。这个设置告诉RabbitMQ不要给woker每次多于一个消息。换句话说,不要在woker当前处理的消息没有返回ack确认的时候,在分发一个消息给woker。也就是说当woker空闲的时候才会被分配
一个消息。
1 channel.basic_qos(prefetch_count=1)
注意(NOTE)队列的大小:
如果所有wokers都很繁忙的情况下,队列会被填满。你需要时刻注意这个问题,这种情况:
1)添加更多的wokers。
2)或者更改其他的消息分配策略。
代码合并(Putting it all together):
Final code of our new_task.py script:
1 #!/usr/bin/env python
2 import pika
3 import sys
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(
6 host='localhost'))
7 channel = connection.channel()
8
9 channel.queue_declare(queue='task_queue', durable=True)##队列持久化。
10
11 message = ' '.join(sys.argv[1:]) or "Hello World!"
12 channel.basic_publish(exchange='',
13 routing_key='task_queue',
14 body=message,
15 properties=pika.BasicProperties(
16 delivery_mode = 2, # make message persistent消息持久化。
17 ))
18 print(" [x] Sent %r" % message)
19 connection.close()
And our worker:
1 #!/usr/bin/env python
2 import pika
3 import time
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(
6 host='localhost'))
7 channel = connection.channel()
8
9 channel.queue_declare(queue='task_queue', durable=True)
10 print('
Waiting for messages. To exit press CTRL+C')
11
12 def callback(ch, method, properties, body):
13 print(" [x] Received %r" % body)
14 time.sleep(body.count(b'.'))
15 print(" [x] Done")
16 ch.basic_ack(delivery_tag = method.delivery_tag)#消息回执确认。
17
18 channel.basic_qos(prefetch_count=1)#公平分配策略。
19 channel.basic_consume(callback,
20 queue='task_queue')
21
22 channel.start_consuming()#循环等待消息。
使用消息确认和公平策略去设置我们工作队列。设置持久化选项保证我们的任务存活即使RabbitMQ被重启。
三:订阅和发布
上面的例子,我们创建一个队列。队列将每个任务精确的发送给每个woker。这部分我们将做一个完全不一样的。我们将一个消息发送一个多个消费者。这部分叫做发布和订阅。
为了说明这个部门我们创建一个简单的日志收集系统。他将包含2部分。第一部分是发送日志消息,第二部分接收日志消息并打印输出日志消息。
我们接收程序拷贝多个来模拟多个消费者去接收消息。这种方式我们运行一个接收日志和将日志写入磁盘中。同时我们将运行另一个接收和输出屏幕的接收程序。
基本上,发布的日志消息将会广播给所有的接收者。
一:Exchanges
之前的我们介绍的部分中,我们发送一个消息给队列和从队列中接收消息。现在我们接收下RabbitMQ消息传输模式:
我们简单回顾下我们介绍的知识:
1)生产者是一个发送消息的应用程序。
2)队列是一个存储消息的buffer。
3)消费者是一个接收消息的应用程序。
消息传输模式核心思想是:在RabbitMQ中生产者从来都不是将消息直接发送给队列。实际上生产者并不知道自己的发送的给那个队列。
而是生产者只能讲消息发送exchange。exchange是很简单的东西。一方面他从生产这接收消息,另一方面消息推送给队列。exchange必须准确知道从生产者接收的消息去向。消息被追加到一个特殊的队列中?多个特殊队列?或者被丢弃?
所有的规则都是在exchange类型中定义的。
这里有几个有效的exchange类型:
direct, topic, headers and fanout。
首先我们来学习下:
一:fanout类型。
定义:
1 channel.exchange_declare(exchange='logs',
2 type='fanout')
fanout类型的exchange很简单,从他的名字你也许猜到他的含义。他只是广播所有的消息给绑定他的所有队列并队列接收这些消息。这正是我们设置的日志收集系统所想要的。
注意(note):
列出所有RabbitMQ所有的exchange。
1 rabbitmqctl list_exchanges
2 Listing exchanges ...
3 direct
4 amq.direct direct
5 amq.fanout fanout
6 amq.headers headers
7 amq.match headers
8 amq.rabbitmq.log topic
9 amq.rabbitmq.trace topic
10 amq.topic topic
11 ...done.
在这列表中有一些以amq.* exchange。和一些没有名字的exchange。这些是默认创建的exchange
二:Nameless exchange
在之前的部分的时候,我们并没有讲过exchange,但是我们仍然可以将消息发送到队列中。这是因为我们用的是默认exchange.也就是说之前写的:exchange='',空字符串表示默认的exchange。
之前的代码结构:
1 channel.basic_publish(exchange='',
2 routing_key='hello',
3 body=message)
exchange = 参数表示exchange 的名字,空字符串只是默认或者没有的exchange:消息被路由到队列根据:routing_key. 如果routing_key的值存在的话。
现在,我们可以用我们自己命名的exchange来代替默认的exchange。
1 channel.basic_publish(exchange='logs',
2 routing_key='',
3 body=message)
二:临时队列(Temporary queues)
是否还记得之前我们使用的队列,每个队列都有自己的名字(还记得hello和task_queue?)队列的名字对于我们来说很重要,我们将指引wokers到相同的队列。给一个队列命名很重要,当你想把这个队列给生产者和消费者使用。
但是对于我们日志来说无所谓,我们只想接收所有的日志消息。而不是一个“片段”。同样我们只是关系即将发送的消息而不是过去的消息。解决这个问题我们需要2件事情:
1)首先我们需要连接到RabbitMQ.需要一个全新的。空队列。为了实现这个我们创建一个随机队列名字,是一个好方法。让RabbitMQ选择一个随机的队列名字给我们。
我们可以给queue_declare不提供任何参数来实现:
1 result = channel.queue_declare()
然后用result对象的方法method.queue创建一个随机队列名字。比如:名字就想:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
2)第二件事情,当消费者断开连接的时候,这个队列需要销毁。专用标识:
1 result = channel.queue_declare(exclusive=True)##标识这个队列在断开连接的时候被销毁。
三:绑定(Bindings)
我们已经创建一个fanout类型的exchange和一个随机队列。现在需要告诉exchange发送消息给我们的队列。exchange和queue之间的关系叫做binding 绑定。
1 channel.queue_bind(exchange='logs',
2 queue=result.method.queue)
那现在logsexchange将消息发送到我们的queue。
你可以用命令行显示:
1 rabbitmqctl list_bindings.
四:把所有设置整合在一起(Putting it all together)
生产者程序,是发送日志消息。看起来和之前没多少区别。重要的区别:现在我们将消息发布到我们的logs exchange而不是没有名字默认的exchange。我们同样需要一个routing_key当我们发送消息的时候。但是在fanout类型中这个可以忽略。
emit_log.py code:
1 import pika
2 import sys
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 host='localhost'))#创建连接。
6 channel = connection.channel()#创建通信频道。
7
8 channel.exchange_declare(exchange='logs',
9 type='fanout')#创建一个fanout类型的exchange。
10
11 message = ' '.join(sys.argv[1:]) or "info: Hello World!"
12 channel.basic_publish(exchange='logs',#指定exchange。
13 routing_key='',#routing_key可以忽略。
14 body=message)
15 print(" [x] Sent %r" % message)
16 connection.close()
正如你所见,当我们建一个连接之后我们声明一个exchange。此步必不可少,如果发布到一个不存在的exchange会报权限问题。
消息会因为exchange没有绑定队列而丢失。但是对于我们来说我们已经绑定的不存在这个问题。如果没有consumer在接收消息。消息会被我们丢弃。
The code for receive_logs.py:
1 #!/usr/bin/env python
2 import pika
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 host='localhost'))#创建连接。
6 channel = connection.channel()#创建通信频道。
7
8 channel.exchange_declare(exchange='logs',
9 type='fanout')#再次声明一个exchange 以及类型。
10
11 result = channel.queue_declare(exclusive=True)#创建一个队列,在断开连接的时候该队列会删除掉。
12 queue_name = result.method.queue#队列的名字。
13
14 channel.queue_bind(exchange='logs',
15 queue=queue_name)#exchange绑定队列。
16
17 print('
Waiting for logs. To exit press CTRL+C')
18
19 def callback(ch, method, properties, body):
20 print(" [x] %r" % body)
21
22 channel.basic_consume(callback,
23 queue=queue_name,
24 no_ack=True)#设置不消息确认。
25
26 channel.start_consuming()#循环等待接收消息。
我们已经完成我们的需求。如果你想把日志保存到文件中。打开窗口运行如下命令:
1 $ python receive_logs.py > logs_from_rabbit.log
如果你想在你屏幕看见消息,你可以打开新的窗口 运行如下命令:
1 $ python receive_logs.py
做了如上操作我们就可以发送我们日志消息了。
1 $ python emit_log.py
使用rabbitmqctl list_bindings 你可以看到代码绑定的我们想要的队列。如果你运行2个终端接收消息 看起来像这样:
1 rabbitmqctl list_bindings
2 Listing bindings ...
3 exchange amq.gen-3l_KUxC92rzcjB5QVvDcfg queue amq.gen-3l_KUxC92rzcjB5QVvDcfg []
4 exchange amq.gen-QlgGf1orpKVxFePt5u-8pw queue amq.gen-QlgGf1orpKVxFePt5u-8pw []
5 exchange hello1 queue hello1 []
6 exchange lmd queue lmd []
7 logs_fanout exchange amq.gen-3l_KUxC92rzcjB5QVvDcfg queue amq.gen-3l_KUxC92rzcjB5QVvDcfg []
8 logs_fanout exchange amq.gen-QlgGf1orpKVxFePt5u-8pw queue amq.gen-QlgGf1orpKVxFePt5u-8pw []
9 ...done.
五:Routing
先前的例子我们建立一个简单的日志收集系统。日志消息以广播的形式发送到所有的消费者。
在这个章节我们将会加入一个新的策略:让订阅者接收一些特定的消息。比如说:我们只将严重级别的日志写入磁盘,同样也满足将所有接收的 所有日志消息打印输出到屏幕上。
一:绑定(Bindings)
在之前的例子我们已经创建了绑定了,你的代码类似如下:
1 channel.queue_bind(exchange=exchange_name,
2 queue=queue_name)
这种绑定是建立在exchange和队列之间的。这种简单的设置,使队列只对从exchange发送的消息感兴趣。
绑定创建了一个特殊的routing_key参数。区别之前的绑定的队列名字,我们叫他为绑定关键字。如下是我们创建的绑定关键字的代码:
1 channel.queue_bind(exchange=exchange_name,
2 queue=queue_name,
3 routing_key='black')
也就是说绑定官架子依赖于exchange的类型。在fanout类型中routing_key这个关键字是空字符串,忽略这个值。
四:关键字绑定(Direct exchange)
我们之前的日志收集系统是将所有的消息以广播的形式发送到所有消费者。
我们设想是按照severity来筛选我们的日志消息。比如:我们只想要这样的脚本,只有接受到严重错误级别的消息写入硬盘中。不想写入info或者warning级别的日志消息而浪费磁盘空间。
我们之前用fanout类型的exchange 并没有给我们太多的伸缩性,他只能盲目把消息发送给消费者。
接下来我们将会用diect类型的exchange来代替 fanout。direct exchange 的算法很简单:消息会按照routing_key的配置的关键字准确匹配的队列,才发送给这个队列。
如下图所示:
这种设置,我们可以看到名字为X的xchange绑定2个队列:Q1、Q2。Q1队列被绑定一个关键字orange。Q2队列绑定2个关键字,一个关键字是black 一个是green.
这种设置的RabbitMQ。生产者发送的消息给exchange包含关键字,比如说orange,消息将会路由到Q1这个队列。如果消息携带的关键字包含black或者green的时候将会路由到Q2队列上。其他的关键字的消息将会被丢弃。
多次绑定(Multiple bindings)
多个队列绑定相同的关键字很完美的结合,在我们的例子中,我们将增加一个black关键字绑定名字是X的exchange和队列Q1. 在这种绑定情况下,direct exchange看起来想fanout模式那样,将消息广播给所有匹配到black的所有列(Q1,Q2)。
发送日志端:(Emitting logs)
我们将用采用direct类型的exchange来重构我们的日志收集系统。取代之前的fanout的模式。我们将提供一个关键字(severity )做为routing_key。这种模式下,我们可以根据我们选择的关键字来接收消息。
首先来看下发送日志消息端:
和之前一样我们首先需要创建一个exchange:
1 channel.exchange_declare(exchange='direct_logs',
2 type='direct')#关键字模式。
接下来我们发布消息:
1 channel.basic_publish(exchange='direct_logs',
2 routing_key=severity,#severity关键字的集合。
3 body=message)
severity可以是一个或者多个关键字,比如说是:“info”,"warning"、“error”。其中的一种或者多种。
订阅端:Subscribing
订阅端的代码和之前的代码没有什么区别,只是我们现在绑定了我们感兴趣关键字的队列。
1 result = channel.queue_declare(exclusive=True)
2 queue_name = result.method.queue
3
4 for severity in severities:
5 channel.queue_bind(exchange='direct_logs',#exchange的名字。
6 queue=queue_name,#随机队列名字。
7 routing_key=severity)#绑定关键字
Putting it all together
The code for emit_log_direct.py:
1 #!/usr/bin/env python
2 import pika
3 import sys
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(
6 host='localhost'))
7 channel = connection.channel()
8
9 channel.exchange_declare(exchange='direct_logs',
10 type='direct')#声明exchange的类型:关键字类型。
11
12 severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
13 message = ' '.join(sys.argv[2:]) or 'Hello World!'
14 channel.basic_publish(exchange='direct_logs',
15 routing_key=severity,#绑定关键字,将消息发送哪个关键字的队列中。
16 body=message)
17 print(" [x] Sent %r:%r" % (severity, message))
18 connection.close()
The code for receive_logs_direct.py:
1 #!/usr/bin/env python
2 import pika
3 import sys
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(
6 host='localhost'))
7 channel = connection.channel()
8
9 channel.exchange_declare(exchange='direct_logs',
10 type='direct')#声明exchange类型:关键字类型。
11
12 result = channel.queue_declare(exclusive=True)#创建随机队列,当断开连接的这个队列将删除。
13 queue_name = result.method.queue#分配随机队列的名字。
14
15 severities = sys.argv[1:]
16 if not severities:
17 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
18 sys.exit(1)
19
20 for severity in severities:
21 channel.queue_bind(exchange='direct_logs',
22 queue=queue_name,
23 routing_key=severity)#该消费者绑定的关键字。
24
25 print('
Waiting for logs. To exit press CTRL+C')
26
27 def callback(ch, method, properties, body):
28 print(" [x] %r:%r" % (method.routing_key, body))
29
30 channel.basic_consume(callback,
31 queue=queue_name,
32 no_ack=True)#消息没有回执确认。
33
34 channel.start_consuming()#循环等待消息接收。
如果你想你的日志文件只保存warning和error 而不包含info内容。打开终端输入如下命令:
1 $ python receive_logs_direct.py warning error > logs_from_rabbit.log
如你想看所有的日志消息在你终端屏幕上你可以重新打开一个窗口并输入如下命令:
1 $ python receive_logs_direct.py info warning error
2
Waiting for logs. To exit press CTRL+C
或者,你只想接收错误的日志消息,你可以输入以下命令:
1 $ python emit_log_direct.py error "Run. Run. Or it will explode."
2 [x] Sent 'error':'Run. Run. Or it will explode.'
五:模糊匹配模式(Topics模式)
之前我们的日志收集系统,采用fanout模式的 exchange,可以通过广播让消费者接收所有的日志消息,之后我们改成关键字的模式(direct)根据消息的发送的关键字路由到我们的想发送的队列。
即时采用direct 类型的exchange来改善我们的日志收集系统,但是他仍然有局限性:不能讲消息根据多个规则来进行路由。
在我们的日志系统中不仅仅根据关键字(severity)来进行订阅,而是基于源程序发送的日志消息。
你应该了解这种需求多大Unix操作系统中的syslog工具。他就是需要绑定包括关键字(severity (info/warn/crit...))和(auth/cron/kern...)。这种情况。
这个需求给了我们很多筛选条件。我们也许只关心cron日志里的critical errors的消息,和kern的所有日志消息。
为满足上述的需求我们进行的日志收集系统采取topic exchange.
Topic exchange
消息发送给topic类型的exchange时候,不能随意定义routing_key.他必须是一系列的单词列表。由圆点来进行限定。单词可以随意,但通常是填写一些可以表明消息的功能的一些单词,比如说:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。也可以自定义一些i喜欢的单词,
但是routing_key的长度大小限制在255字节。
绑定的key必须是相同的格式结构,Topic exchange的逻辑和direct(关键字类型)一样,消息在发送的时候需要携带一个特殊的routing_key.并发送到所有的跟exichange绑定的队列相匹配绑定key的队列。
实现这些功能需要做以下2件事情:
1)*匹配一个单词。
2)#可以代替0到多个单词。
最简单例子如下:
我们将发送一些消息,消息的内容是形容一些动物。这些消息在发送的时候会携带一个routing_key,routing_key的内容包含3个单词,其中有2个句点(.)。第一个单词在routing_key中形容快速,第2个单词是颜色。第三个单词是物种:"<celerity>.<colour>.<species>".
我们创建3个绑定:Q1绑定关键字:"*.orange.*" Q2:"*.*.rabbit" 和 "lazy.#".
这样绑定的意义是:
1)Q1是表示对所有橘黄色的动物感兴趣。
2)Q2是想接收所有关于兔子以及所有懒惰的东西所有事情。
一个消息中routing_key="quick.orange.rabbit" 会被发送到Q1和Q2.而携带routing_key="lazy.orange.elephant"的消息 ,也同样发送到这2个队列中。
另一方面:携带routing_key= "quick.orange.fox"消息 只能发送到Q1队列。而routing_key="lazy.brown.fox"只能发送到Q2队列。而携带routing_key="quick.brown.fox"的消息没有匹配任何绑定队列,该消息会被丢弃。
如果我们发送的消息携带的routing_key含有一个单词或者四个单词比如:"orange" or "quick.orange.male.rabbit"? 这些消息因为匹配不了任何绑定而会被丢弃。
另一方面像:"lazy.orange.male.rabbit" 即时他有4个单词,单也会匹配Q2而发送到Q2队列中。
注意:模糊匹配(Topic exchange)是一个很强大的策略。
当一个队列绑定的key为”#“ ,这个队列会接收所有内容的消息。在fanout类型的队列中routing_key是无值(”“空字符串)。也就是说fanout模式直接接收所有的消息。
如果在模糊匹配中,绑定的关键字没有"#"和”*“的话,topic类型的exchange和dict(关键字模式)的工作原理是一样的。
Putting it all together
我们将使用topic 类型的exchange在我们的日志收集系统中。 假设(assumption )我们绑定的routing_key有2个关键字:"<facility>.<severity>".
The code for emit_log_topic.py:
1 #!/usr/bin/env python
2 import pika
3 import sys
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(
6 host='localhost'))
7 channel = connection.channel()
8
9 channel.exchange_declare(exchange='topic_logs',
10 type='topic')#创建topic类型的exchange。模糊匹配。
11
12 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
13 message = ' '.join(sys.argv[2:]) or 'Hello World!'
14 channel.basic_publish(exchange='topic_logs',
15 routing_key=routing_key,#绑定关键字
16 body=message)
17 print(" [x] Sent %r:%r" % (routing_key, message))
18 connection.close()
The code for receive_logs_topic.py:
1 #!/usr/bin/env python
2 import pika
3 import sys
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(
6 host='localhost'))
7 channel = connection.channel()
8
9 channel.exchange_declare(exchange='topic_logs',
10 type='topic')#声明exchange的类型为模糊匹配。
11
12 result = channel.queue_declare(exclusive=True)#创建随机一个队列当消费者退出的时候,该队列被删除。
13 queue_name = result.method.queue#创建一个随机队列名字。
14
15 binding_keys = sys.argv[1:]
16 if not binding_keys:
17 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
18 sys.exit(1)
19
20 for binding_key in binding_keys:
21 channel.queue_bind(exchange='topic_logs',
22 queue=queue_name,
23 routing_key=binding_key)
24
25 print('
Waiting for logs. To exit press CTRL+C')
26
27 def callback(ch, method, properties, body):
28 print(" [x] %r:%r" % (method.routing_key, body))
29
30 channel.basic_consume(callback,
31 queue=queue_name,
32 no_ack=True)
33
34 channel.start_consuming(
当输入如下,会被所有日志消息会被接收:
1 python receive_logs_topic.py "#"
接收所有从“kern”关键字的日志:
1 python receive_logs_topic.py "kern.*"
如果你想所有关于"critical" 日志消息:
1 python receive_logs_topic.py "*.critical"
你可以建立多个绑定:
1 python receive_logs_topic.py "kern.*" "*.critical"
那么发送日志消息端的routing_Key="kern.critical"
1 python emit_log_topic.py "kern.critical" "A critical kernel error"
六 : RPC
在第二节中,我们学习了怎么使用工作队列(work Queues)通过多个消费者执行我们的任务。
但是,如果我们想在运程一台机器上执行一个函数等待函数的返回结果呢?哈哈,这个悲伤的故事。 这种方式叫做运程执行或者RPC。
在这个小章节中我们将使用RabiitMQ 来创建一个RPC远程调用系统:创建一个client端,和一个远程执行的RPC server。我们没一个值得耗时的任务。我们将创建一个仿造RPC服务执行返回一个数字。
客户端接口:
为了说明一个RPC服务是怎么使用和运行的我们定义一个简单的客户端类。他将会揭露 通过发送给RPC 请求,远程调用l方法,客户端并阻塞等待RPC server返回执行结果。
1 fibonacci_rpc = FibonacciRpcClient()
2 result = fibonacci_rpc.call(4)
3 print("fib(4) is %r" % result)
NOTE:
虽然RPC是一个不错的功能对于计算机来说。但是这个功能遭到很多人的诟病。程序不会意识到RPC是本地的函数调用或者是一个缓慢的RPC。这就导致给系统添加了很多没必要复杂的调试。
有如下建议,请思考:
1)确认函数的调用是远程还是本地调用。
2)在你的系统文档中,需要清晰的表达组件之间的依赖关系。
3)当RPC server执行调用时间过长或者挂掉的时候,客户端该怎么处理?
Callback queue(调用队列):
通过RabiitMQ来实现RPC功能很简单。客户端发送请求消息,服务端返回应答消息。为了获得应答消息,客户端需要给RPC server发送一个callback队列和请求消息。我们一起尝试下:
1 result = channel.queue_declare(exclusive=True)#创建随机队列,当我们断开来接的时候,RabbitMQ会把该队列删除掉。(exclusive=True)
2 callback_queue = result.method.queue
3
4 channel.basic_publish(exchange='',
5 routing_key='rpc_queue',
6 properties=pika.BasicProperties(
7 reply_to = callback_queue,#执行结果把结果传入我们创建的随机队列中。
8 ),
9 body=request)
消息属性:
AMQP协议给消息提供了14种不同的属性。下面是我们经常用的几种属性:
1:delivery_mode:使消息持久化。(delivery_mode=2)#持久化包括队列持久化(channel.queue_declare(queue='hello', durable=True)、消息持久化(delivery_mode=2)。以及消息传输过程的消息回执确认。(ch.basic_ack(delivery_tag = method.delivery_tag)#消息回执确认。)
2:content_type:设置消息的编码类型。比如果JSON,他的设置就是application/json.
3:reply_to:通常后面跟存放着调用执行结果的队列名字。
4:correlation_id:用来RPC应答的时候,唯一标识。来确定将结果返回那个主机。
Correlation id
之前建议的时候,我们为每个RPC请求建立一个callback队列。看起来不错,但是有更好的方法,我们为每个客户端单独建立一个callback队列。
如果这样又出现新的问题:反馈的消息他不清楚把结果消息是哪个请求的。那correlation_id 属性就是解决这个问题。我们将设置一个唯一的值给每个请求。当callback队列接收消息的时候会查看这个属性。然后基于这个属性的匹配来确认是那个请求。如果不匹配,返回的消息会被丢弃。
你会问为什么不直接返回错误,而是在callback队列中直接丢弃呢?这是因为,如果RPCserver 在发送完结果消息的时候,如果没有发送回执确认,死掉话,当RPC server重启之后,这个消息请求不会丢失,还会继续处理这个消息。
结构:
整个RPC工作过程是这样:
1)当客户端启动,会建立一个随机回收队列。
2)对于发送RPC请求的消息携带2个属性:1:reply_to 结果返回的队列。;2:correlation_id 为每个请求设置唯一的值。
3)消息请求被发送到rpc_queue队列。
4)RPC server端的woker在rpc_queue队列中等待请求,当请求进来,他会执行请求并把结果消息发送给客户端通过reply_to参数写的队列名字。
5)客户端在callback队列中等待数据返回,当有消息出现,他会查看消息的属性:correlation_id 如果属性值匹配之前的发送请求的消息的correlation_id ,他就会把该消息的结果发送给应用程序。
Putting it all together
The code for rpc_server.py:
1 #!/usr/bin/env python
2 import pika
3
4 connection = pika.BlockingConnection(pika.ConnectionParameters(
5 host='localhost'))
6
7 channel = connection.channel()
8
9 channel.queue_declare(queue='rpc_queue')
10
11 def fib(n):#调用函数
12 if n == 0:
13 return 0
14 elif n == 1:
15 return 1
16 else:
17 return fib(n-1) + fib(n-2)
18
19 def on_request(ch, method, props, body):
20 n = int(body)
21
22 print(" [.] fib(%s)" % n)
23 response = fib(n)
24
25 ch.basic_publish(exchange='',
26 routing_key=props.reply_to,
27 properties=pika.BasicProperties(correlation_id = \
28 props.correlation_id),
29 body=str(response))##发送结果
30 ch.basic_ack(delivery_tag = method.delivery_tag)#消息回执确认。
31
32 channel.basic_qos(prefetch_count=1)#消息分配方式。
33 channel.basic_consume(on_request, queue='rpc_queue')#获得请求的队列参数。
34
35 print(" [x] Awaiting RPC requests")
36 channel.start_consuming()
代码解析:
1)客户端和服务端建立连接和声明队列。
2)声明调用函数。
3)我们声明一个调用返回,这个也是RPC server的核心。当RPC接收客户端消息之后,执行调用函数,并将结果消息返回。
4) 因为我们不可能运行单个server端,如果是多个的话,为了我们采取公平分配消息的原则(当RPCserver执行完一个请求,才接受下一个请求。)
The code for rpc_client.py:
1 #!/usr/bin/env python
2 import pika
3 import uuid
4
5 class FibonacciRpcClient(object):
6 def __init__(self):
7 self.connection = pika.BlockingConnection(pika.ConnectionParameters(
8 host='localhost'))
9
10 self.channel = self.connection.channel()
11
12 result = self.channel.queue_declare(exclusive=True)
13 self.callback_queue = result.method.queue
14
15 self.channel.basic_consume(self.on_response, no_ack=True,
16 queue=self.callback_queue)
17
18 def on_response(self, ch, method, props, body):
19 if self.corr_id == props.correlation_id:
20 self.response = body
21
22 def call(self, n):
23 self.response = None
24 self.corr_id = str(uuid.uuid4())
25 self.channel.basic_publish(exchange='',
26 routing_key='rpc_queue',
27 properties=pika.BasicProperties(
28 reply_to = self.callback_queue,
29 correlation_id = self.corr_id,
30 ),
31 body=str(n))
32 while self.response is None:
33 self.connection.process_data_events()
34 return int(self.response)
35
36 fibonacci_rpc = FibonacciRpcClient()
37
38 print(" [x] Requesting fib(30)")
39 response = fibonacci_rpc.call(30)
40 print(" [.] Got %r" % response)
客户端代码解析:
1)我们建立连接,通信channel和声明消息一个返回的callback队列。
2)我们声明一个callback队列,以便接收RPCserver的请求消息。
3)on_response每次有返回消息的时候都被执行,他的任务很简单,每次检查消息的correlation_id 是否是我们需要的请求结果。如果是,他就把请求结果返回并跳出循环。
4)下一步,我们定义我们主函数call方法,他负责接收请求结果以及发送RPC请求。
5)在这个方法,我们创建一个唯一标识:orrelation_id ,并保存他,on_response函数会用该值比对RPC返回的消息结果。
6)接下来我们发布了一个请求消息。消息携带2个属性:reply_to and correlation_id.
7)然后客户端循环等待RPCserver的请求结果消息。
8)最后我们将结果返回给客户端。
启动RPCserver端:
1 $ python rpc_server.py
2 [x] Awaiting RPC requests
客户端发送请求:
1 $ python rpc_client.py
2 [x] Requesting fib(30)
我们如上代码的比较简单:
比如:
1)如果server端未运行,客户端该如何处理?
2)客户端需要设置超时时间吗?
3)如果服务端发生异常的时候,是否该把异常发送给客户端?
4)怎么处理非法消息呢?
If you want to experiment, you may find the rabbitmq-management plugin useful for viewing the queues. |
|