设为首页 收藏本站
查看: 1156|回复: 0

[经验分享] RabbitMQ 使用(一)

[复制链接]

尚未签到

发表于 2017-12-8 22:47:25 | 显示全部楼层 |阅读模式
RabbitMQ中的使用
这篇文章将会介绍关于RabbbitMQ的使用,并且使用的是kombo(客户端的Python实现)来实现;
安装
如果使用的是mac安装的话,可以先安装到指定的位置,接着配置命令访问路径:

  • cd ~
  • vi .bash_profile,输入下面两行

    RABBIT_HOME=/usr/local/Cellar/rabbitmq/3.6.9_1
    PATH=$PATH:$RABBIT_HOME/sbin
  • esc,:wq保存并退出即可
启动和停止
开始:sudo rabbitmq-server start结束:sudo rabbitmq-server stop
Producer 和 Consumer
首先我们需要知道Producer和Consumer的初始化和其对应的publish和consumer方法。
Producer

class kombu.Producer(channel, exchange=None, routing_key=None,
serializer=None, auto_declare=None,
compression=None, on_return=None)
# 发布消息
.publish(body, routing_key=None, delivery_mode=None, mandatory=False,
immediate=False, priority=0, content_type=None, content_encoding=None,
serializer=None, headers=None, compression=None, exchange=None,
retry=False, retry_policy=None, declare=None, expiration=None, **properties)
Consumer

class kombu.Consumer(channel, queues=None, no_ack=None,
auto_declare=None, callbacks=None, on_decode_error=None,
on_message=None, accept=None, prefetch_count=None, tag_prefix=None)
# 消费
.consume(no_ack=None)
Hello world
当收到消息的时候,除非你已经对这个message进行了相关的操作,否则像是某个消费者的通道关闭等特殊情况下,RabbitMQ不会丢失掉这个信息,如果存在其它的消费者,则丢给其它消费者,没有就扔回队列中;当然你也可以通过no_ack=True来关闭消息确认机制。

from kombu import Exchange, Queue, Connection, Consumer, Producer
task_queue = Queue('tasks', exchange=Exchange('tasks', type='direct'), routing_key='tasks')
# 生产者
with Connection('amqp://guest@localhost:5672//') as conn:
with conn.channel() as channel:
producer = Producer(channel)
producer.publish({'hello': 'world'},
retry=True,
exchange=task_queue.exchange,
routing_key=task_queue.routing_key,
declare=[task_queue])
def get_message(body, message):
print(body)
# message.ack()
# 消费者
with Connection('amqp://guest@localhost:5672//') as conn:
with conn.channel() as channel:
consumer = Consumer(channel, queues=task_queue, callbacks=[get_message,], prefetch_count=10)
consumer.consume(no_ack=True)
生产者和消费者相互对应,这样一个简易的消息队列就可以使用了。
任务队列
我们将创建一个工作队列,专门用来处理分配耗时的任务。原理就是将任务封装成一个消息,由客户端发送到消息队列中,而后台运行的工作进程负责弹出任务并且分配给消费者来执行任务。这种方案在一些IO密集型的情况下很有用,比如在短时间内HTTP请求窗口中无法处理复杂的任务。

  • 我们先创建相关的exchange和queue,queues.py文件如下:

    from kombu import Exchange, Queue
    task_exchange = Exchange('tasks', type='direct')
    task_queues = [Queue('high', exchange=task_exchange, routing_key='high'),
    Queue('middle', exchange=task_exchange, routing_key='middle'),
    Queue('low', exchange=task_exchange, routing_key='low')]
  • 接下来再创建消费者,worker.py文件如下:

    from kombu.mixins import ConsumerMixin
    from queues import task_queues
    # 消费者
    class Worker(ConsumerMixin):
    def __init__(self, connection):
    self.connection = connection
    def get_consumers(self, Consumer, channel):
    consumer = Consumer(queues=task_queues, callbacks=[self.process_task], accept=['text/plain', 'json', 'pickle'])
    consumer.qos(prefetch_count=10)  # 最多一下子获取10个任务
    return [consumer]
    def process_task(self, body, message):
    fun = body['fun']; args = body['args']; kwargs = body['kwargs']
    try:
    fun(*args, **kwargs)
    except Exception as exc:
    print(exc)
    message.requeue()
    else:
    message.ack()
    if __name__ == '__main__':
    from kombu import Connection
    with Connection('amqp://guest@localhost:5672//') as conn:
    try:
    worker = Worker(conn)
    worker.run()
    except KeyboardInterrupt:
    print('bye bye')
  • 创建需要传递给消费者执行的任务,tasks.py如下:

    def hello_task(who='world'):
    import time
    print('wait one second')
    time.sleep(1)
    print('Hello {}'.format(who))
  • 最后,创建生产者,client.py如下:

    from kombu.pools import producers
    from queues import task_exchange
    routing_keys = {
    'high': 'high',
    'middle': 'middle',
    'low': 'low'
    }
    # 将消息序列化后发送到队列中
    def send_as_task(connection, fun, key='middle', args=(), kwargs={}):
    payload = {'fun': fun, 'args': args, 'kwargs': kwargs}
    routing_key = routing_keys[key]
    with producers[connection].acquire(block=True) as producer:
    producer.publish(payload, serializer='pickle', exchange=task_exchange,
    routing_key=routing_key, declare=[task_exchange])
    if __name__ == '__main__':
    from kombu import Connection
    from tasks import hello_task
    with Connection('amqp://guest@localhost:5672//') as conn:
    send_as_task(conn, fun=hello_task, args=('wang',))
上面的代码主要实现的是,将hello_task这个任务经过pickle序列化以后发送到指定的middle消息队列中,接着消费者(可以开多个进程)从中取出消息后再执行任务。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-422286-1-1.html 上篇帖子: Publish/Subscribe 下篇帖子: [LeetCode] 226. Invert Binary Tree JAVA
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表