QQ叫紫珊 发表于 2017-7-2 15:30:23

python-rabbitmq

  简单producer:



import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
#declar
channel.queue_declare(queue='hello')#声明一个queue

channel.basic_publish(exchange='',
routing_key='hello',   #routing_key就是声明的queue的名字
body='Hello World!')   #消息内容
print(" Send 'hello World!'")
connection.close()

  简单consumer:



import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
'''
#如果确认这个queue已经存在, 可以不写下面语句,但是这里不声明,如果消费者先执行,就会出错。
'''
channel.queue_declare(queue='hello')
def callback(ch,method,properties,body):
'''
:param ch: 管道的内存对象
:param method:
:param properties:
:param body: 消息内容
:return:
'''
print('---->',ch,method,properties,body)
time.sleep(30)   
print(" Recevied %r" % body)

channel.basic_consume(callback,#如果收到消息,就调用callback函数来处理消息
queue='hello', #从哪个队列收消息,收到消息执行callback
#no_ack=True   #True代表不确认,无论callback消息处理失败还是完成,都不会和生产端确认,默认是Flase,代表确认
)
print('
[*] Waiting for messages.To exit press CTRL+C')
channel.start_consuming()#start 执行命令

  队列以及消息持久化:



#producer
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
#declar
channel.queue_declare(queue='hello3',durable=True)#声明一个queue,durable参数设定持久化,注意持久化的只是队列,而不是消息内容
channel.basic_publish(
exchange='',
routing_key='hello3',#routing_key就是声明的queue的名字
body='Hello World!', #消息内容
properties=pika.BasicProperties(delivery_mode=2)   #消息持久化参数
)
print(" Send 'hello World!'")
connection.close()

#consumer
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
'''
#如果确认这个queue已经存在, 可以不写下面语句,但是这里不声明,如果消费者先执行,就会出错。
'''
channel.queue_declare(queue='hello3',durable=True) #durable参数要与生产端保持一致
#channel.queue_declare(queue='hello2')
def callback(ch,method,properties,body):
'''
:param ch: 管道的内存对象
:param method:
:param properties:
:param body: 消息内容
:return:
'''
print('---->',ch,method,properties,body)
time.sleep(5)
print(" Recevied %r" % body)
ch.basic_ack(delivery_tag=method.delivery_tag)#配合no_ack参数,处理完成,返回生产端确认

channel.basic_qos(prefetch_count=1) #表示处理完一条再给我发消息
channel.basic_consume(callback,#如果收到消息,就调用callback函数来处理消息
queue='hello3', #从哪个队列收消息,收到消息执行callback
#no_ack=True   #True代表不确认,无论callback消息处理失败还是完成,都不会和生产端确认,默认是Flase,代表确认
)
print('
[*] Waiting for messages.To exit press CTRL+C')
channel.start_consuming()#start 执行命令

  rabbitMQ广播之:fanout (订阅发布)



#producer:
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host = 'localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')#fanout表示广播
#message = ' '.join(sys.argv) or "info: Hello World!"
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" Sent %r" % message)
connection.close()

#consumer:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout'
)
result = channel.queue_declare(exclusive=True)   #exclusive排他的,唯一的,随机分配一个唯一的名字,消费者断开后自动删除
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print(' Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()

  广播之direct:



#producer
#!/usr/bin/python3
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare('direct_logs',
type='direct')
severity = sys.argv if len(sys.argv)>1 else 'info'
message = ' '.join(sys.argv) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" Sent %r:%r" % (severity, message))
connection.close()

#consumer
#!/usr/bin/python3
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv
if not severities:
sys.stderr.write("Usage: %s \n" % sys.argv)
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print('
[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r:%r" % (method.routing_key,body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()

  topic消息过滤广播:



#producer
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" Sent %r:%r" % (routing_key, message))
connection.close()
#consumer
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv
if not binding_keys:
sys.stderr.write("Usage: %s ...\n" % sys.argv)
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)

print('
[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()

  rpc:



#sev:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)

def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id= props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)   #返回对端确认处理完成

#channel.basic_qos(prefetch_count=1)#此参数设定了处理消息的最大数量
channel.basic_consume(on_request,    #回调函数
queue='rpc_queue')    #指定队列
print(" Awaiting RPC requests")
channel.start_consuming()

#cli:
import pika
import uuid
import time

class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response,
no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id: #当接收到对端返回时先判断correlation_id是否与返回的id相同,
# 确保队列的一致性和唯一性
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())   #产生一个随机数,赋给correlation_id传给对端,对端返回时再把这个随机数返回
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events() #非阻塞版的start_consumer()
#print("no msg...")
#time.sleep(0.5)
return int(self.response)

fibonacci_rpc = FibonacciRpcClient()
print(" Requesting fib(30)")
while True:
num = input("input>:").strip()
if num.isdigit() and int(num) > 0:
response = fibonacci_rpc.call(str(num))
print(" [.] Got %r" % response)
else:
print("请输入大于0的整数")
页: [1]
查看完整版本: python-rabbitmq