Python之RabbitMQ操作
RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message。实现的协议:AMQP。
术语(Jargon)
P,Producing,制造和发送信息的一方。
Queue,消息队列。
C,Consuming,接收消息的一方。
RabbitMQ安装
1 安装配置epel源
2 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
3
4 安装erlang
5 $ yum -y install erlang
6
7 安装RabbitMQ
8 $ yum -y install rabbitmq-server
安装rabbitmq API
1 pip install pika
2 or
3 easy_install pika
4 or
5 源码
6
7 https://pypi.python.org/pypi/pika
使用API操作RabbitMQ
基于Queue实现生产者消费者模型
1 #!/usr/bin/env python
2 import pika
3
4 # ######################### 生产者 #########################
5
6 connection = pika.BlockingConnection(pika.ConnectionParameters(
7 host='localhost'))
8 channel = connection.channel()
9
10 channel.queue_declare(queue='hello')
11
12 channel.basic_publish(exchange='',
13 routing_key='hello',
14 body='Hello World!')
15 print(" Sent 'Hello World!'")
16 connection.close()
1 #!/usr/bin/env python
2 import pika
3
4 # ########################## 消费者 ##########################
5
6 connection = pika.BlockingConnection(pika.ConnectionParameters(
7 host='localhost'))
8 channel = connection.channel()
9
10 channel.queue_declare(queue='hello')
11
12 def callback(ch, method, properties, body):
13 print(" Received %r" % body)
14
15 channel.basic_consume(callback,
16 queue='hello',
17 no_ack=True)
18
19 print('
[*] Waiting for messages. To exit press CTRL+C')
20 channel.start_consuming()
1、acknowledgment 消息不丢失(订阅端消息不丢失)
no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 import pika
6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
8 channel.queue_declare(queue='hai')#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错
9
10 def callback(ch,method,properties,body):
11 print(" Received %r" %body)#打印获得消息的内容
12 ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息
13
14 channel.basic_consume(callback,queue='hai',no_ack=False)
15 #获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息,
16 # 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认,
17 # 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失
18
19 print('
[*]Waiting for messages to exit press CTRL+C')
20 channel.start_consuming()
消费者
1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 import pika
6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
8 channel.queue_declare(queue='hai')#创建一个MQ队列,名称为 hai
9 channel.basic_publish(exchange='',routing_key='hai',body='hello world')
10 #向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容
11 print(" Sent 'hello world' ")
12 connection.close()
生产者 2、durable 消息不丢失(服务端消息不丢失)
1 # time:
2 # Auto:PANpan
3 # func:
4 import pika
5 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
6 channel=connection.channel()#创建频道,通过频道操作rabbitmq
7 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错.durable=True是让消息持久化
8
9 def callback(ch,method,properties,body):
10 print(" Received %r" %body)#打印获得消息的内容
11 ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息
12
13 channel.basic_consume(callback,queue='hai',no_ack=True)
14 #获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息,
15 # 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认,
16 # 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失
17
18 print('
[*]Waiting for messages to exit press CTRL+C')
19 channel.start_consuming()
消费者
1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 import pika
6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
8 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai
9 channel.basic_publish(exchange='',routing_key='hai',body='hello world',
10 properties=pika.BasicProperties(delivery_mode=2,))#该语句作用为告诉rabbit服务器将消息持久化
11 #向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容
12 print(" Sent 'hello world' ")
13 connection.close()
生产者 3、消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照索引排列
1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 import pika
6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
8 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错.durable=True是让消息持久化
9
10 def callback(ch,method,properties,body):
11 print(" Received %r" %body)#打印获得消息的内容
12 ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息
13 channel.basic_qos(prefetch_count=1)#客户端按顺序去取,默认为奇偶数
14 channel.basic_consume(callback,queue='hai',no_ack=True)
15 #获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息,
16 # 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认,
17 # 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失
18
19 print('
[*]Waiting for messages to exit press CTRL+C')
20 channel.start_consuming()
消费者
1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 import pika
6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
8 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai
9 channel.basic_publish(exchange='',routing_key='hai',body='hello world',
10 properties=pika.BasicProperties(delivery_mode=2,))#该语句作用为告诉rabbit服务器将消息持久化
11 #向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容
12 print(" Sent 'hello world' ")
13 connection.close()
生产者 4、发布订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
exchange type = fanout
1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 import pika
6 import sys
7 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接消息队列服务器
8 channel=connection.channel()#创建频道,通过频道对rabbitmq进行操作
9
10 channel.exchange_declare(exchange='logs',type='fanout')#创建exchange,名称为logs,若果该消息队列已经创建,可以省略
11 message=''.join(sys.argv) or "info: Hello Wrold"
12
13 channel.basic_publish(exchange='logs',routing_key='',body=message)#将消息添加今年队列
14 print(' sent %r'%message)
15 connection.close()
发布者
1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 import pika
6 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8')) #连接消息队列服务器
7 channel = connection.channel()#创建频道,通过频道对rabbitmq进行操作
8
9 channel.exchange_declare(exchange='logs',#创建exchange,名称为logs
10 type='fanout')#type='fanout'作用为凡是和exchange相关联的队列,在用户给exchange发消息时,所有关联队列都会受到消息
11
12 result=channel.queue_declare(exclusive=True)#不指定队列名,有系统随机创建
13 queue_name=result.method.queue
14
15 channel.queue_bind(exchange='logs',queue=queue_name)#将exchange和当前的消息队列做一个绑定
16 print('
[*] Waiting for logs. To exit press CTRL+C')
17
18
19 def callback(ch,method,properties,body):
20 print(' %r' %body)
21
22 channel.basic_consume(callback,queue=queue_name,no_ack=True)#在队列中获取消息
23
24 channel.start_consuming()
订阅者 5、关键字发送
exchange type = direct
之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 import pika
6 import sys
7
8 connection = pika.BlockingConnection(pika.ConnectionParameters(
9 host='192.168.11.138'))
10 channel = connection.channel()
11
12 channel.exchange_declare(exchange='direct_logs',
13 type='direct')
14
15 #severity = sys.argv if len(sys.argv) > 1 else 'info'
16 #message = ' '.join(sys.argv) or 'Hello World!'
17 severity='info'
18 message='test'
19 channel.basic_publish(exchange='direct_logs',
20 routing_key=severity,
21 body=message)
22 print(" Sent %r:%r" % (severity, message))
23 connection.close()
发布者
1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 import pika
6 import sys
7
8 connection = pika.BlockingConnection(pika.ConnectionParameters(
9 host='10.0.0.8'))
10 channel = connection.channel()
11
12 channel.exchange_declare(exchange='direct_logs',
13 type='direct')#设置exchange类型为direct
14
15 result = channel.queue_declare(exclusive=True)#创建随机队列
16 queue_name = result.method.queue
17
18 # severities = sys.argv
19 # if not severities:
20 # sys.stderr.write("Usage: %s \n" % sys.argv)
21 # sys.exit(1)
22 severities=['error']
23 for severity in severities:
24 channel.queue_bind(exchange='direct_logs',
25 queue=queue_name,
26 routing_key=severity)#绑定关键字
27
28 print('
[*] Waiting for logs. To exit press CTRL+C')
29
30 def callback(ch, method, properties, body):
31 print(" %r:%r" % (method.routing_key, body))
32
33 channel.basic_consume(callback,
34 queue=queue_name,
35 no_ack=True)
36
37 channel.start_consuming()
订阅在1
1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 #!/usr/bin/env python
6 # time:
7 # Auto:PANpan
8 # func:
9 import pika
10 import sys
11
12 connection = pika.BlockingConnection(pika.ConnectionParameters(
13 host='10.0.0.8'))
14 channel = connection.channel()
15
16 channel.exchange_declare(exchange='direct_logs',
17 type='direct')
18
19 result = channel.queue_declare( )
20 #声明queue,确认要从中接收message的queue
21 #queue_declare函数是幂等的,可运行多次,但只会创建一次
22 #若可以确信queue是已存在的,则此处可省略该声明,如producer已经生成了该queue
23 #但在producer和consumer中重复声明queue是一个好的习惯
24 #例如:channel.queue_declare(queue='hello')
25 queue_name = result.method.queue
26
27 # severities = sys.argv
28 # if not severities:
29 # sys.stderr.write("Usage: %s \n" % sys.argv)
30 # sys.exit(1)
31 severities=['error','info']
32 for severity in severities:
33 channel.queue_bind(exchange='direct_logs',
34 queue=queue_name,
35 routing_key=severity)
36
37 print('
[*] Waiting for logs. To exit press CTRL+C')
38
39 def callback(ch, method, properties, body):
40 print(" %r:%r" % (method.routing_key, body))
41
42 channel.basic_consume(callback,
43 queue=queue_name,
44 no_ack=True)
45
46 channel.start_consuming()
订阅在2 6、模糊匹配
exchange type = topic
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
[*]# 表示可以匹配 0 个 或 多个 单词
[*]*表示只能匹配 一个 单词
1 #!/usr/bin/env python
2 import pika
3 import sys
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(
6 host='localhost'))
7 channel = connection.channel()
8
9 channel.exchange_declare(exchange='topic_logs',
10 type='topic')
11
12 result = channel.queue_declare(exclusive=True)
13 queue_name = result.method.queue
14
15 binding_keys = sys.argv
16 if not binding_keys:
17 sys.stderr.write("Usage: %s ...\n" % sys.argv)
18 sys.exit(1)
19
20 for binding_key in binding_keys:
21 channel.queue_bind(exchange='topic_logs',
22 queue=queue_name,
23 routing_key=binding_key)
24
25 print('
[*] Waiting for logs. To exit press CTRL+C')
26
27 def callback(ch, method, properties, body):
28 print(" %r:%r" % (method.routing_key, body))
29
30 channel.basic_consume(callback,
31 queue=queue_name,
32 no_ack=True)
33
34 channel.start_consuming()
订阅者
1 #!/usr/bin/env python
2 import pika
3 import sys
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(
6 host='localhost'))
7 channel = connection.channel()
8
9 channel.exchange_declare(exchange='topic_logs',
10 type='topic')
11
12 routing_key = sys.argv if len(sys.argv) > 1 else 'anonymous.info'
13 message = ' '.join(sys.argv) or 'Hello World!'
14 channel.basic_publish(exchange='topic_logs',
15 routing_key=routing_key,
16 body=message)
17 print(" Sent %r:%r" % (routing_key, message))
18 connection.close()
发布者 注:
订阅/发布Demo
发送消息给多个订阅者
核心思想:消息发送给exchange,每个接收方创建匿名Queue绑定到exchange,exchange发送消息给每个接收方。
Exchanges
在RabbitMQ完整的模型中,消息只能发送给一个exchange。
exchange一方面接收消息,另一方面push给queues。
exchange类型
> rabbitmqctl list_exchanges
direct
topic
headers
fanout 广播消息给已知队列
页:
[1]