设为首页 收藏本站
查看: 912|回复: 0

[经验分享] rabbitmq redis

[复制链接]

尚未签到

发表于 2017-7-2 13:31:40 | 显示全部楼层 |阅读模式
RabbitMQ
  RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。
  对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列

用rabbitmq实现一个简单的生产者消费者模型
  发送端代码



import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25'))
channel = connection.channel()
channel.queue_declare(queue="hello")
channel.basic_publish(exchange='',
routing_key = 'hello',
body='hello world',
)
print("Send hello world")
connection.close()
  接收端代码



import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25'))
channel = connection.channel()
channel.queue_declare(queue="hello")

def callback(ch,method,properties,body):
     print(ch,method,properties)
     print("received %s" %body)

channel.basic_consume(callback,
                       queue='hello',
                       no_ack=True)

print("waiting for messages to exit press 'CTRL+C'")
channel.start_consuming()
  通过上述代码便可以实现一个简单的生产者消费者模型,但是现在的结果是:当开启多个消费者程序的时候,启动生产者发送消息,这个时候只有一个可以收到,并且再次启动,会下一个消费者收到,类似一个轮询的关系。

acknowledgment 消息不丢失(通过客户端设置实现)
  通过no_ack = False参数设置,如果消费者遇到情况突然中断了没有收到,那么RabbitMQ会重新将任务添加到队列中
  下面将接收端的代码进行更改:



#AUTHOR:FAN
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25'))
channel = connection.channel()
channel.queue_declare(queue="hello")
def callback(ch,method,properties,body):
print(ch,method,properties)
time.sleep(10)
print("received %s" %body)
channel.basic_consume(callback,
queue='hello',
no_ack=False)
print("waiting for messages to exit press 'CTRL+C'")
channel.start_consuming()
  标注的地方就是代码修改的地方,通过将no_ack更改为False,以及在callback回到函数这里让等待10s,这样启动接收端后,再启动发送算,在还没有打印数据的时候将客户端关闭,然后再启动,发现依然可以收到刚才发送端发送的数据。
  但是这种方式只能实现客户端断开重新连接的时候数据不丢失,如果是rabbitmq挂了的情况如何解决?

durable消息不丢失(通过在服务端设置保证数据不丢失)
  这个时候生产者和消费者的代码都需要改动
  发送者代码



import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25'))
channel = connection.channel()

channel.queue_declare(queue='fan',durable=True)

channel.basic_publish(exchange='',
                       routing_key='fan',
                       body='hello world',
                       properties = pika.BasicProperties(
                           delivery_mode=2
                       ))

print("send 'hello world'")
connection.close()
  接收者的代码



import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25'))
channel = connection.channel()

channel.queue_declare(queue='fan',durable=True)

def callback(ch,method,properies,body):
     print("received %s" %body)
     time.sleep(10)
     print("is ok")
     ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,
                       queue='fan',
                       no_ack=False)

print("waitting for messages.To exit press CTRL+C")
channel.start_consuming()
  这样即使在接收者接收数据过程中rabbitmq服务器出现问题了,在服务恢复之后,依然可以收到数据

发布订阅
DSC0000.png

  发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
  通过exchange type = fanout参数实现
  代码例子:
  发布者:



#AUTHOR:FAN
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.8.103'))
channel = connection.channel()

channel.exchange_declare(exchange="fan",
                          type='fanout')

message = ' '.join(sys.argv[1:]) or "info :hello world"
channel.basic_publish(exchange = 'fan',
                       routing_key='',
                       body=message)

print("send %s" %message)
connection.close()
  订阅者:



#AUTHOR:FAN
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.8.103'))
channel = connection.channel()
channel.exchange_declare(exchange="fan",
type='fanout')
#随机生成队列名字
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
#将exchange和队列绑定
channel.queue_bind(exchange='fan',
queue=queue_name)
print("waiting for fan ,To exit press CTRL+C")
def callback(ch,method,proerties,body):
print("---",body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
关键字发送
DSC0001.png

  通过参数:exchange type = direct实现
  之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
  代码例子如下:
  消费者代码:



#AUTHOR:FAN
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs_1',
                          type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
     sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
     exit(1)
print(severities)
for severity in severities:
     print(severity)
     channel.queue_bind(exchange='direct_logs_1',
     queue=queue_name,
     routing_key=severity)
print("waiting for logs,To exit press CTRL+C")
def callback(ch,method,properties,body):
     print("%s:%s" %(method.routing_key,body))

channel.basic_consume(callback,
                       queue=queue_name,
                       no_ack=True)
channel.start_consuming()
  生产者代码



import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.25'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs_1',
                          type='direct')

print(sys.argv)
severity = sys.argv[1] if len(sys.argv) >1 else "error"
message = ' '.join(sys.argv[2:]) or 'hello world'
channel.basic_publish(exchange='direct_logs_1',
                       routing_key = severity,
                       body = message)
print("send %s:%s" %(severity,message))
connection.close()
模糊匹配
DSC0002.png

  通过参数exchange type = topic实现
  在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
  # 表示可以匹配 0 个 或 多个 单词
  *  表示只能匹配 一个 单词
  --------------------还没有整理完

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390363-1-1.html 上篇帖子: POJ 1860 Bellman改判断正环 下篇帖子: (二)RabbitMQ使用笔记
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表