设为首页 收藏本站
查看: 465|回复: 0

[经验分享] rabbitmq集群搭建(centos6.5)

[复制链接]

尚未签到

发表于 2018-4-25 13:03:55 | 显示全部楼层 |阅读模式
  一:rabbitmq的安装:
  参考:http://www.blogjava.net/hellxoul/archive/2014/06/25/415135.html
  http://blog.haohtml.com/archives/15249
  
  说明:修改机器名字后再安装(为后面集群做准备)
  vi /etc/sysconfig/network 修改名字
  vi /etc/hosts 修改地址映射表,如192.168.1.112   rabbitmq-node1.com rabbitmq-node1 #做集群时设置
  重启
  1.安装erlang:
  #rpm -Uvh http://mirrors.hustunique.com/epel/6/x86_64/epel-release-6-8.noarch.rpm
  #yum install erlang
  测试:erl:
  io:format("hello").
  2.安装rabbitmq:
  上传rabbitmq-server-3.3.4-1.noarch.rpm
  #yum install rabbitmq-server-3.3.4-1.noarch.rpm
  
  3.安装完成
  4.启动:
  /etc/init.d/rabbitmq-server start
  5.开启插件:
  rabbitmq-plugins enable rabbitmq_management
  允许远程访问:
  vi /etc/rabbitmq.config
  [
  {rabbit, [{tcp_listeners, [5672]}, {loopback_users, []}]}
  ].
  
  关闭iptables:service iptables stop
                chkconfig iptables off禁止开机启动
  访问http://ip:15672 账号密码为guest
  二:集群搭建
  参考:http://www.cnblogs.com/flat_peach/archive/2013/04/07/3004008.html
  http://www.centoscn.com/CentosServer/cluster/2014/1216/4324.html
  
  说明:3台机器,名为rabbitmq-node1,rabbitmq-node2,rabbitmq-node3
  1.修改地址映射:
  vi /etc/hosts 修改地址映射表,如192.168.1.112   rabbitmq-node1.com rabbitmq-node1
  2.设置节点cookie:
  先关闭rabbitmq-server:/etc/init.d/rabbitmq-server stop
  cook存放在/var/lib/rabbitmq/.erlang.cookie文件,该文件权限为400,修改前需先修改为777,修改后再改为400
  把其中一台的该文件内容复制到其他所有机器上,使集群中的所有机器cookie保持一致,否则不能通讯
  chmod 777  /var/lib/rabbitmq/.erlang.cookie
  复制好后启动rabbitmq:/etc/init.d/rabbitmq-server start
  3.rabbitmq-node1,rabbitmq-node2作为内存节点,rabbitmq-node3作为磁盘节点,连接起来做集群:
  rabbitmq-node1:
  rabbitmqctl stop_app :停止应用
  rabbitmqctl join_cluster --ram rabbit@rabbitmq-node3 :增加节点
  rabbitmqctl start_app :开启应用
  rabbitmqctl cluster_status :查看状态
  rabbitmq-node2:
  rabbitmqctl stop_app :停止应用
  rabbitmqctl join_cluster --ram rabbit@rabbitmq-node3 :增加节点
  rabbitmqctl start_app :开启应用
  rabbitmqctl cluster_status :查看状态
  rabbitmq-node3:
  rabbitmqctl cluster_status :查看状态
  
  如果要使rabbitmq-node1或rabbitmq-node2在集群里也是磁盘节点,join_cluster 命令去掉--ram参数即可: rabbitmqctl join_cluster rabbit@rabbitmq-node3
  集群模式分为普通模式(上述模式)和镜像模式:
  普通模式:消息实体只存在其中一个节点,当连接在其他节点的消费者获取消息时,其他节点先从该节点获取消息,再传给消费者,容易造成该节点瓶颈。
  镜像模式:消息实体会主动在各节点之间同步,而不是在消费者获取时才同步。降低系统性能消耗带宽
    在任意节点执行:rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}',就成为镜像模式了
  三:python操作rabbitmq:
  参考:  http://www.01happy.com/python-rabbitmq-work-queues/
  http://www.01happy.com/python-rabbitmq-exchanges/
  http://yidao620c.iteye.com/blog/1947338
  1.发送消息:
  #!/usr/bin/env python
  # -*- coding: utf-8 -*-
  import sys
  import pika
  connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))   #连接服务器
  channel = connection.channel()    #创建channel
  channel.queue_declare(queue = 'hello')    #声明消息队列,如果队列不存在将自动清除这些信息。本利中为hello
  if len (sys.argv) < 2 :
  print 'message is empty!'
  sys.exit(0)
  message = sys.argv[1]
  channel.basic_publish(exchange = '', routing_key='hello', body = message)    #发送消息到上面声明的队列中。exchange为交换器,能精确指定该发到哪个队列;route_key为队列的名称,body为消息体
  print "[x] sent: '" + message + "'\n"
  connection.close()    #关闭连接
  
  2.接收消息:
  #!/usr/bin/env python
  # -*- coding: utf-8 -*-
  import pika
  connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost' ))
  channel = connection.channel()
  channel.queue_declare(queue = 'hello' )
  print '
  • Waiting for messages. To exit press CTRL+C'
      def callback(ch, method, properties, body):    #回调函数,处理接受的消息
      print body
      channel.basic_consume(callback, queue = 'hello' , no_ack = True )    #告诉rabbitmq使用callback接收消息,no_ack=True 表示消费完了这个消息以后不主动把完成状态通知rabbitmq。
      channel.start_consuming()    #开始接收信息,进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出
      
      可以开启多个接收实例,每个实例可以接收一部分消息,多个实例接收的消息的总和为发送的消息数。
      ##########################################################################################
      上面的消息接收实例,有一个处理消息出现错误后,消息就将会丢掉,不会被其他的实例再处理。为了保证每一个消息都不会被丢掉,可以在处理时加消息确认,
      即修改回调函数,修改如下:
      def callback(ch, method, properties, body):
      print " [x] Received %r" % (body,)
      time.sleep(5)
      print " [x] Done"
      ch.basic_ack(delivery_tag = method.delivery_tag)  #增加确认
      channel.basic_consume(callback, queue='hello', no_ack=False) #需要ack
      
      ##########################################################################################
      消息持久化:
      rabbit挂掉的话任务就会丢失,需要消息持久化
      channel.queue_declare(queue='hello', durable=True)    #增加durable=True,已存在的队列不行,会报错。需要重新创建队列
      channel.basic_publish(exchange = '', routing_key='hello', body = message,properties=pika.BasicProperties(delivery_mode = 2,))
      
      公平调度:
      有的任务需要时间长,有的时间短,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。
      channel.basic_qos(prefetch_count=1)
      
      发送任务源码:
      #!/usr/bin/env python
      import pika
      import sys
      
      connection = pika.BlockingConnection(pika.ConnectionParameters(
      host='localhost'))
      channel = connection.channel()
      
      channel.queue_declare(queue='task_queue', durable=True)
      
      message = ' '.join(sys.argv[1:]) or "Hello World!"
      channel.basic_publish(exchange='',
        routing_key='task_queue',
        body=message,
        properties=pika.BasicProperties(
       delivery_mode = 2, # make message persistent
        ))
      print " [x] Sent %r" % (message,)
      connection.close()
      
      接收任务源码:
      #!/usr/bin/env python
      import pika
      import time
      
      connection = pika.BlockingConnection(pika.ConnectionParameters(
      host='localhost'))
      channel = connection.channel()
      
      channel.queue_declare(queue='task_queue', durable=True)
      print '
  • Waiting for messages. To exit press CTRL+C'
      
      def callback(ch, method, properties, body):
      print " [x] Received %r" % (body,)
      time.sleep( body.count('.') )
      print " [x] Done"
      ch.basic_ack(delivery_tag = method.delivery_tag)
      
      channel.basic_qos(prefetch_count=1)
      channel.basic_consume(callback,
        queue='task_queue')
      
      channel.start_consuming()
      
      
      
      
      
      

  • 运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
    2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
    3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
    4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
    5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
    6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
    7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
    8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

    所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-451926-1-1.html 上篇帖子: CentOS 6 下单独记录 iptables 日志 下篇帖子: CentOS7 安装 scikit
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    扫码加入运维网微信交流群X

    扫码加入运维网微信交流群

    扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

    扫描微信二维码查看详情

    客服E-mail:kefu@iyunv.com 客服QQ:1061981298


    QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


    提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


    本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



    合作伙伴: 青云cloud

    快速回复 返回顶部 返回列表