云深处 发表于 2017-7-2 17:15:17

Python之RabbitMQ操作

  RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message。


实现的协议:AMQP。



术语(Jargon)



P,Producing,制造和发送信息的一方。

Queue,消息队列。

C,Consuming,接收消息的一方。



RabbitMQ安装




1 安装配置epel源
2    $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
3
4 安装erlang
5    $ yum -y install erlang
6
7 安装RabbitMQ
8    $ yum -y install rabbitmq-server
  安装rabbitmq API



1 pip install pika
2 or
3 easy_install pika
4 or
5 源码
6
7 https://pypi.python.org/pypi/pika
  使用API操作RabbitMQ
  基于Queue实现生产者消费者模型



1 #!/usr/bin/env python
2 import pika
3
4 # ######################### 生产者 #########################
5
6 connection = pika.BlockingConnection(pika.ConnectionParameters(
7         host='localhost'))
8 channel = connection.channel()
9
10 channel.queue_declare(queue='hello')
11
12 channel.basic_publish(exchange='',
13                     routing_key='hello',
14                     body='Hello World!')
15 print(" Sent 'Hello World!'")
16 connection.close()


1 #!/usr/bin/env python
2 import pika
3
4 # ########################## 消费者 ##########################
5
6 connection = pika.BlockingConnection(pika.ConnectionParameters(
7         host='localhost'))
8 channel = connection.channel()
9
10 channel.queue_declare(queue='hello')
11
12 def callback(ch, method, properties, body):
13   print(" Received %r" % body)
14
15 channel.basic_consume(callback,
16                     queue='hello',
17                     no_ack=True)
18
19 print('
[*] Waiting for messages. To exit press CTRL+C')
20 channel.start_consuming()
  1、acknowledgment 消息不丢失(订阅端消息不丢失)
  no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。





1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 import pika
6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
8 channel.queue_declare(queue='hai')#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错
9
10 def callback(ch,method,properties,body):
11   print(" Received %r" %body)#打印获得消息的内容
12   ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息
13
14 channel.basic_consume(callback,queue='hai',no_ack=False)
15 #获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息,
16 # 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认,
17 # 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失
18
19 print('
[*]Waiting for messages to exit press CTRL+C')
20 channel.start_consuming()
消费者




1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 import pika
6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
8 channel.queue_declare(queue='hai')#创建一个MQ队列,名称为 hai
9 channel.basic_publish(exchange='',routing_key='hai',body='hello world')
10 #向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容
11 print(" Sent 'hello world' ")
12 connection.close()
生产者  2、durable   消息不丢失(服务端消息不丢失)





1 # time:
2 # Auto:PANpan
3 # func:
4 import pika
5 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
6 channel=connection.channel()#创建频道,通过频道操作rabbitmq
7 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错.durable=True是让消息持久化
8
9 def callback(ch,method,properties,body):
10   print(" Received %r" %body)#打印获得消息的内容
11   ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息
12
13 channel.basic_consume(callback,queue='hai',no_ack=True)
14 #获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息,
15 # 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认,
16 # 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失
17
18 print('
[*]Waiting for messages to exit press CTRL+C')
19 channel.start_consuming()
消费者




1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 import pika
6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
8 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai
9 channel.basic_publish(exchange='',routing_key='hai',body='hello world',
10                     properties=pika.BasicProperties(delivery_mode=2,))#该语句作用为告诉rabbit服务器将消息持久化
11 #向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容
12 print(" Sent 'hello world' ")
13 connection.close()
生产者  3、消息获取顺序
  默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
  channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照索引排列





1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 import pika
6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
8 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错.durable=True是让消息持久化
9
10 def callback(ch,method,properties,body):
11   print(" Received %r" %body)#打印获得消息的内容
12   ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息
13 channel.basic_qos(prefetch_count=1)#客户端按顺序去取,默认为奇偶数
14 channel.basic_consume(callback,queue='hai',no_ack=True)
15 #获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息,
16 # 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认,
17 # 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失
18
19 print('
[*]Waiting for messages to exit press CTRL+C')
20 channel.start_consuming()
消费者




1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 import pika
6 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
7 channel=connection.channel()#创建频道,通过频道操作rabbitmq
8 channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai
9 channel.basic_publish(exchange='',routing_key='hai',body='hello world',
10                     properties=pika.BasicProperties(delivery_mode=2,))#该语句作用为告诉rabbit服务器将消息持久化
11 #向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容
12 print(" Sent 'hello world' ")
13 connection.close()
生产者  4、发布订阅

  发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
  exchange type = fanout





1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 import pika
6 import sys
7 connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接消息队列服务器
8 channel=connection.channel()#创建频道,通过频道对rabbitmq进行操作
9
10 channel.exchange_declare(exchange='logs',type='fanout')#创建exchange,名称为logs,若果该消息队列已经创建,可以省略
11 message=''.join(sys.argv) or "info: Hello Wrold"
12
13 channel.basic_publish(exchange='logs',routing_key='',body=message)#将消息添加今年队列
14 print(' sent %r'%message)
15 connection.close()
发布者




1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 import pika
6 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8')) #连接消息队列服务器
7 channel = connection.channel()#创建频道,通过频道对rabbitmq进行操作
8
9 channel.exchange_declare(exchange='logs',#创建exchange,名称为logs
10                        type='fanout')#type='fanout'作用为凡是和exchange相关联的队列,在用户给exchange发消息时,所有关联队列都会受到消息
11
12 result=channel.queue_declare(exclusive=True)#不指定队列名,有系统随机创建
13 queue_name=result.method.queue
14
15 channel.queue_bind(exchange='logs',queue=queue_name)#将exchange和当前的消息队列做一个绑定
16 print('
[*] Waiting for logs. To exit press CTRL+C')
17
18
19 def callback(ch,method,properties,body):
20   print(' %r' %body)
21
22 channel.basic_consume(callback,queue=queue_name,no_ack=True)#在队列中获取消息
23
24 channel.start_consuming()
订阅者  5、关键字发送
  exchange type = direct
  之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。





1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 import pika
6 import sys
7
8 connection = pika.BlockingConnection(pika.ConnectionParameters(
9         host='192.168.11.138'))
10 channel = connection.channel()
11
12 channel.exchange_declare(exchange='direct_logs',
13                        type='direct')
14
15 #severity = sys.argv if len(sys.argv) > 1 else 'info'
16 #message = ' '.join(sys.argv) or 'Hello World!'
17 severity='info'
18 message='test'
19 channel.basic_publish(exchange='direct_logs',
20                     routing_key=severity,
21                     body=message)
22 print(" Sent %r:%r" % (severity, message))
23 connection.close()
发布者




1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 import pika
6 import sys
7
8 connection = pika.BlockingConnection(pika.ConnectionParameters(
9         host='10.0.0.8'))
10 channel = connection.channel()
11
12 channel.exchange_declare(exchange='direct_logs',
13                        type='direct')#设置exchange类型为direct
14
15 result = channel.queue_declare(exclusive=True)#创建随机队列
16 queue_name = result.method.queue
17
18 # severities = sys.argv
19 # if not severities:
20 #   sys.stderr.write("Usage: %s \n" % sys.argv)
21 #   sys.exit(1)
22 severities=['error']
23 for severity in severities:
24   channel.queue_bind(exchange='direct_logs',
25                        queue=queue_name,
26                        routing_key=severity)#绑定关键字
27
28 print('
[*] Waiting for logs. To exit press CTRL+C')
29
30 def callback(ch, method, properties, body):
31   print(" %r:%r" % (method.routing_key, body))
32
33 channel.basic_consume(callback,
34                     queue=queue_name,
35                     no_ack=True)
36
37 channel.start_consuming()
订阅在1




1 #!/usr/bin/env python
2 # time:
3 # Auto:PANpan
4 # func:
5 #!/usr/bin/env python
6 # time:
7 # Auto:PANpan
8 # func:
9 import pika
10 import sys
11
12 connection = pika.BlockingConnection(pika.ConnectionParameters(
13         host='10.0.0.8'))
14 channel = connection.channel()
15
16 channel.exchange_declare(exchange='direct_logs',
17                        type='direct')
18
19 result = channel.queue_declare( )
20 #声明queue,确认要从中接收message的queue
21 #queue_declare函数是幂等的,可运行多次,但只会创建一次
22 #若可以确信queue是已存在的,则此处可省略该声明,如producer已经生成了该queue
23 #但在producer和consumer中重复声明queue是一个好的习惯
24 #例如:channel.queue_declare(queue='hello')
25 queue_name = result.method.queue
26
27 # severities = sys.argv
28 # if not severities:
29 #   sys.stderr.write("Usage: %s \n" % sys.argv)
30 #   sys.exit(1)
31 severities=['error','info']
32 for severity in severities:
33   channel.queue_bind(exchange='direct_logs',
34                        queue=queue_name,
35                        routing_key=severity)
36
37 print('
[*] Waiting for logs. To exit press CTRL+C')
38
39 def callback(ch, method, properties, body):
40   print(" %r:%r" % (method.routing_key, body))
41
42 channel.basic_consume(callback,
43                     queue=queue_name,
44                     no_ack=True)
45
46 channel.start_consuming()
订阅在2  6、模糊匹配

  exchange type = topic
  在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。


[*]# 表示可以匹配 0 个 或 多个 单词
[*]*表示只能匹配 一个 单词





1 #!/usr/bin/env python
2 import pika
3 import sys
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(
6         host='localhost'))
7 channel = connection.channel()
8
9 channel.exchange_declare(exchange='topic_logs',
10                        type='topic')
11
12 result = channel.queue_declare(exclusive=True)
13 queue_name = result.method.queue
14
15 binding_keys = sys.argv
16 if not binding_keys:
17   sys.stderr.write("Usage: %s ...\n" % sys.argv)
18   sys.exit(1)
19
20 for binding_key in binding_keys:
21   channel.queue_bind(exchange='topic_logs',
22                        queue=queue_name,
23                        routing_key=binding_key)
24
25 print('
[*] Waiting for logs. To exit press CTRL+C')
26
27 def callback(ch, method, properties, body):
28   print(" %r:%r" % (method.routing_key, body))
29
30 channel.basic_consume(callback,
31                     queue=queue_name,
32                     no_ack=True)
33
34 channel.start_consuming()
订阅者




1 #!/usr/bin/env python
2 import pika
3 import sys
4
5 connection = pika.BlockingConnection(pika.ConnectionParameters(
6         host='localhost'))
7 channel = connection.channel()
8
9 channel.exchange_declare(exchange='topic_logs',
10                        type='topic')
11
12 routing_key = sys.argv if len(sys.argv) > 1 else 'anonymous.info'
13 message = ' '.join(sys.argv) or 'Hello World!'
14 channel.basic_publish(exchange='topic_logs',
15                     routing_key=routing_key,
16                     body=message)
17 print(" Sent %r:%r" % (routing_key, message))
18 connection.close()
发布者  注:


订阅/发布Demo



发送消息给多个订阅者

核心思想:消息发送给exchange,每个接收方创建匿名Queue绑定到exchange,exchange发送消息给每个接收方。



Exchanges



在RabbitMQ完整的模型中,消息只能发送给一个exchange。

exchange一方面接收消息,另一方面push给queues。





exchange类型

> rabbitmqctl list_exchanges

direct

topic

headers

fanout 广播消息给已知队列
页: [1]
查看完整版本: Python之RabbitMQ操作