RabbitMQ RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。 MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。 MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka。(saltsatck底层采用的就是ZeroMq) 1)吞吐量(TPS):ZeroMq最好、RabbitMq 次之, ActiveMq 最差 2)持久化:ZeroMq不支持、RabbitMq和ActiveMq都支持
3)可用性、可靠性:RabbitMq最好,ActiveMq次之,ZeroMq最差 4)高并发:从实现语言来看,RabbitMQ最高,原因是它的实现语言是天生具备高并发高可用的erlang语言 RabbitMQ安装
安装API
使用API操作RabbitMQ
生产者 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| import pika
# connection 一个TCP的连接、
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# channel 是建立在TCP连接中的一个虚拟连接
channel = connection.channel()
# 声明一个queue
channel.queue_declare(queue='hello')
# n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
|
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
# connection 一个TCP的连接、 channel 是建立在TCP连接中的一个虚拟连接
channel = connection.channel()
# 再次声明原因是因为再包含众多队列的RabbitMQ里面 我们不确定此次使用的队列是否已经声明过
# 再次声明确保能够正常使用
channel.queue_declare(queue='hello')
# ch 管道内存地址
# 回调函数
def callback(ch, method, properties, body):
print("---->", ch, method, properties)
print(" [x] Received %r" % body)
# 开始消费消息
channel.basic_consume(callback,
queue='hello',
no_ack=True # 确认消息
)
print(' Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
|
1)no-ack = False 消息不丢失
no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。 消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='10.0.0.111'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print 'ok'
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue='hello',
# 和服务端确认消息 确保消息不丢失
no_ack=False)
print(' Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
|
2)durable 消息持久化 生产者 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| #!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111'))
channel = connection.channel()
# 消息持久化
channel.queue_declare(queue='hello', durable=True)
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!',
# 消息持久化
properties=pika.BasicProperties(
delivery_mode=2,
))
print(" [x] Sent 'Hello World!'")
connection.close()
|
消费者 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| #!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111'))
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True)
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print 'ok'
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue='hello',
no_ack=False)
print(' Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
|
3)消息存取顺序 默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者2去队列中获取 偶数 序列的任务。 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| #!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.111'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print 'ok'
ch.basic_ack(delivery_tag = method.delivery_tag)
# 谁来谁取 不按奇偶等顺序来取
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='hello',
no_ack=False)
print(' Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
|
4)发布订阅
Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。queue也是通过这个routing keys来做的绑定 有三种类型的Exchanges:direct, fanout,topic。 每个实现了不同的路由算法。 exchange type = fanout 发布者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| #!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
|
订阅者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| #!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print(' Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
|
5)关键字发送
exchange type = direct 生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs_test_1',
type='direct')
severity = 'error'
message = '123'
# severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
# message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs_test_1',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
|
消费者 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
| import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='127.0.0.1'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs_test_1',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# severities = sys.argv[1:]
# if not severities:
# sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
# sys.exit(1)
severities = ['error']
for severity in severities:
channel.queue_bind(exchange='direct_logs_test_1',
queue=queue_name,
routing_key=severity)
print(' Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
|
7)模糊匹配
exchange type = topic 1
2
| # 表示可以匹配 0 个 或 多个 单词
* 表示只能匹配 一个 单词
|
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
| #!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(' Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
|
生产者 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| #!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
|
|