设为首页 收藏本站
查看: 1051|回复: 0

[经验分享] rabbitmq:centos7安装与python调用

[复制链接]

尚未签到

发表于 2017-12-9 10:37:45 | 显示全部楼层 |阅读模式
  1.centos安装rabbitmq
  官网下载或者yum list |grep rabbitmq搜索安装,官网是最新的版本



wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.14/rabbitmq-server-3.6.14-1.el7.noarch.rpm


[iyunv@greg02 src]#yum list |grep rabbitmq
librabbitmq.x86_64                      0.5.2-1.el7                    epel     
librabbitmq-devel.x86_64                0.5.2-1.el7                    epel     
librabbitmq-tools.x86_64                0.5.2-1.el7                    epel     
opensips-event_rabbitmq.x86_64          1.10.5-3.el7                   epel     
rabbitmq-java-client.noarch             3.6.0-1.el7                    epel     
rabbitmq-java-client-doc.noarch         3.6.0-1.el7                    epel     
rabbitmq-java-client-javadoc.noarch     3.6.0-1.el7                    epel     
rabbitmq-server.noarch                  3.3.5-34.el7                   epel     
[iyunv@greg02 src]#rz

[iyunv@greg02 src]#yum install -y rabbitmq-server-3.6.14-1.el7.noarch.rpm
  2.Windows安装rabbitmq



pip install pika
or
easy_install pika
  3.centos启动rabbitmq



[iyunv@greg02 src]#service rabbitmq-server start
Redirecting to /bin/systemctl start  rabbitmq-server.service
[iyunv@greg02 src]#netstat -lntp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name   
tcp        0      0 0.0.0.0:80              0.0.0.0:*               LISTEN      1188/nginx: master  
tcp        0      0 0.0.0.0:4369            0.0.0.0:*               LISTEN      2801/epmd           
tcp        0      0 0.0.0.0:22              0.0.0.0:*               LISTEN      1140/sshd           
tcp        0      0 127.0.0.1:25            0.0.0.0:*               LISTEN      2205/master         
tcp        0      0 0.0.0.0:25672           0.0.0.0:*               LISTEN      2660/beam.smp      
tcp6       0      0 :::3306                 :::*                    LISTEN      2163/mysqld         
tcp6       0      0 :::4369                 :::*                    LISTEN      2801/epmd           
tcp6       0      0 :::22                   :::*                    LISTEN      1140/sshd           
tcp6       0      0 ::1:25                  :::*                    LISTEN      2205/master         
tcp6       0      0 :::5672                 :::*                    LISTEN      2660/beam.smp
  查看log



[iyunv@greg02 src]#service rabbitmq-server status
Redirecting to /bin/systemctl status  rabbitmq-server.service
Nov 16 19:16:51 greg02 rabbitmq-server[2660]: ##########  Logs: /var/log/rabbitmq/rabbit@greg02.log
Nov 16 19:16:51 greg02 rabbitmq-server[2660]: ######  ##        /var/log/rabbitmq/rabbit@greg02-sasl.log
  切换到log目录cd /var/log/rabbitmq/



[iyunv@greg02 rabbitmq]#ls
rabbit@greg02.log  rabbit@greg02-sasl.log
[iyunv@greg02 rabbitmq]#cat rabbit\@greg02.log
=INFO REPORT==== 16-Nov-2017::19:16:51 ===
Starting RabbitMQ 3.6.14 on Erlang R16B03-1
Copyright (C) 2007-2017 Pivotal Software, Inc.
Licensed under the MPL.  See http://www.rabbitmq.com/
=INFO REPORT==== 16-Nov-2017::19:16:51 ===
node           : rabbit@greg02
home dir       : /var/lib/rabbitmq
config file(s) : /etc/rabbitmq/rabbitmq.config (not found)
cookie hash    : 6QLUzrnBCl9yXMJXg59m+Q==
log            : /var/log/rabbitmq/rabbit@greg02.log
sasl log       : /var/log/rabbitmq/rabbit@greg02-sasl.log
database dir   : /var/lib/rabbitmq/mnesia/rabbit@greg02
  显示的是没有找到配置文件,我们可以自己创建这个文件



cd /etc/rabbitmq/
vi rabbitmq.config
  编辑内容如下:



[{rabbit, [{loopback_users, []}]}].
  rabbitmq默认创建的用户guest,密码也是guest,这个用户默认只能是本机访问,localhost或者127.0.0.1,从外部访问需要添加上面的配置。
  保存配置后重启服务:



service rabbitmq-server stop
service rabbitmq-server start
  4.创建用户角色



[iyunv@greg02 rabbitmq]#rabbitmqctl add_user greg greg123
Creating user "greg"
[iyunv@greg02 rabbitmq]#rabbitmqctl set_user_tags greg administrator
Setting tags for user "greg" to [administrator]
[iyunv@greg02 rabbitmq]#rabbitmqctl set_permissions -p / greg ".*" ".*" ".*"
Setting permissions for user "greg" in vhost "/"
  队列通信:
  send端



#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()

#声明queue
channel.queue_declare(queue='hello')

#n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
  receive端



#_*_coding:utf-8_*_
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()

#You may ask why we declare the queue again ‒ we have already declared it in our previous code.
# We could avoid that if we were sure that the queue already exists. For example if send.py program
#was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs.
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
print(" [x] Received %r" % body)

channel.basic_consume(callback,
queue='hello',
no_ack=True)

print('
  • Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
      work queues
      在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多
      send端



    import pika
    import time
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
    channel = connection.channel()

    # 声明queue
    channel.queue_declare(queue='task_queue')

    # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    import sys

    message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
    channel.basic_publish(exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
    delivery_mode=2,  # make message persistent
                          )
    )
    print(" [x] Sent %r" % message)
    connection.close()
      receive端



    import pika, time

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
    channel = connection.channel()

    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(20)
    print(" [x] Done")
    print("method.delivery_tag",method.delivery_tag)
    ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_consume(callback,
    queue='task_queue',
    no_ack=True
    )

    print('
  • Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
      此时,先启动消息生产者,然后再分别启动3个消费者,通过生产者多发送几条消息,你会发现,这几条消息会被依次分配到各个消费者身上
      #no_ack 默认false,即使在处理消息的时候使用CTRL + C来杀死一个工作者,也不会丢失任何东西。 工人死后不久,所有未确认的消息将被重新发送
      公平分发
      如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。



    channel.basic_qos(prefetch_count=1)
      sender



    #!/usr/bin/env python
    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='task_queue', durable=True)

    message = ' '.join(sys.argv[1:]) or "Hello World!"
    channel.basic_publish(exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
    delivery_mode = 2, # make message persistent
                          ))
    print(" [x] Sent %r" % message)
    connection.close()
      receive



    #!/usr/bin/env python
    import pika
    import time

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='task_queue', durable=True)
    print('
  • Waiting for messages. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback,
    queue='task_queue')

    channel.start_consuming()
      订阅发布
      之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了
      交换是一件非常简单的事情。 一方面它接收来自生产者的消息,另一方则推动他们排队。 交易所必须知道如何处理收到的消息。 是否应该附加到特定的队列? 它应该附加到许多队列? 还是应该丢弃。 这些规则是由交换类型定义的。
      fanout: 所有bind到此exchange的queue都可以接收消息
    direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
    topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

       表达式符号说明:#代表一个或多个字符,*代表任何字符
          例:#.a会匹配a.a,aa.a,aaa.a等
              *.a会匹配a.a,b.a,c.a等
         注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 
      headers: 通过headers 来决定把消息发给哪些queue
      sender



    import pika
    import sys
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange='logs',exchange_type='fanout')
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    channel.basic_publish(exchange='logs',
    routing_key='',
    body=message)
    print(" [x] Sent %r" % message)
    connection.close()
      receive



    #_*_coding:utf-8_*_
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange='logs',
    exchange_type='fanout')
    result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开
    后,自动将queue删除
    queue_name = result.method.queue
    channel.queue_bind(exchange='logs',
    queue=queue_name)
    print('
  • Waiting for logs. To exit press CTRL+C')
    def callback(ch, method, properties, body):
    print(" [x] %r" % body)
    channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)
    channel.start_consuming()
    有选择的接收消息(exchange type=direct) 
      RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
      sender


    DSC0000.gif DSC0001.gif


    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs',
    type='direct')

    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='direct_logs',
    routing_key=severity,
    body=message)
    print(" [x] Sent %r:%r" % (severity, message))
    connection.close()
    View Code  receive





    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs',
    type='direct')

    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue

    severities = sys.argv[1:]
    if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

    for severity in severities:
    channel.queue_bind(exchange='direct_logs',
    queue=queue_name,
    routing_key=severity)

    print('
  • Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)

    channel.start_consuming()
    View Code
    更细致的消息过滤
      尽管使用直接交换改进了我们的系统,但它仍然有局限性 - 它不能根据多个标准进行路由选择。
      在我们的日志系统中,我们可能不仅要根据严重性来订阅日志,还要根据发出日志的来源进行订阅。 你可能从syslog unix工具知道这个概念,它根据严重性(info / warn / crit ...)和facility(auth / cron / kern ...)来路由日志。
      这会给我们很大的灵活性 - 我们可能想要听取来自'cron'的严重错误,而且还要听取来自'kern'的所有日志。
      sender





    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='topic_logs',
    exchange_type='topic')

    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='topic_logs',
    routing_key=routing_key,
    body=message)
    print(" [x] Sent %r:%r" % (routing_key, message))
    connection.close()
    View Code  receive





    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='topic_logs',
    exchange_type='topic')

    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue

    binding_keys = sys.argv[1:]
    if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

    for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
    queue=queue_name,
    routing_key=binding_key)

    print('
  • Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)

    channel.start_consuming()
    View Code  远程过程调用(RPC)Remote procedure call
       DSC0002.png
      为了说明如何使用RPC服务,我们将创建一个简单的客户端类。 它将公开一个名为call的方法,它发送一个RPC请求并阻塞,直到收到答案:



    fibonacci_rpc = FibonacciRpcClient()
    result = fibonacci_rpc.call(4)
    print("fib(4) is %r" % result)
      send-client客户端





    import pika
    import uuid
    class FibonacciRpcClient(object):
    def __init__(self):
    credentials = pika.PlainCredentials('greg', 'greg123')
    self.connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.179.130', credentials=credentials))
    channel = self.connection.channel()
    self.channel = self.connection.channel()
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    self.channel.basic_consume(self.on_response, no_ack=True,#准备接受命令结果
    queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
    """"callback方法"""
    if self.corr_id == props.correlation_id:
    self.response = body
    def call(self, n):
    self.response = None
    self.corr_id = str(uuid.uuid4()) #唯一标识符
    self.channel.basic_publish(exchange='',
    routing_key='rpc_queue',
    properties=pika.BasicProperties(
    reply_to=self.callback_queue,
    correlation_id=self.corr_id,
    ),
    body=str(n))
    count  = 0
    while self.response is None:
    self.connection.process_data_events() #检查队列里有没有新消息,但不会阻塞
    count +=1
    print("check...",count)
    return int(self.response)
    fibonacci_rpc = FibonacciRpcClient()
    print(" [x] Requesting fib(30)")
    response = fibonacci_rpc.call(5)
    print(" [.] Got %r" % response)
    View Code  receive接收端





    import pika
    import time

    credentials = pika.PlainCredentials('greg', 'greg123')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
         '192.168.179.130',credentials=credentials))

    channel = connection.channel()
    channel.queue_declare(queue='rpc_queue')

    def fib(n):
         if n == 0:
             return 0
         elif n == 1:
             return 1
         else:
             return fib(n - 1) + fib(n - 2)

    def on_request(ch, method, props, body):
         n = int(body)

         print(" [.] fib(%s)" % n)
         response = fib(n)

         ch.basic_publish(exchange='',
                          routing_key=props.reply_to,
                          properties=pika.BasicProperties(correlation_id= \
                                                              props.correlation_id),
                          body=str(response))
         ch.basic_ack(delivery_tag=method.delivery_tag)

    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(on_request, queue='rpc_queue')

    print(" [x] Awaiting RPC requests")
    channel.start_consuming()
    View Code  当然也可以用cmd,得到cmd命令结果



    ssh_rpc = SSHRpcClient()
    print(" [x] sending cmd")
    response = ssh_rpc.call("ifconfig")
    print(" [.] Got result ")
    print(response.decode("gbk"))

  • 运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
    2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
    3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
    4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
    5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
    6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
    7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
    8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

    所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-422375-1-1.html 上篇帖子: RabbitMQ的安装与基本使用 下篇帖子: 半小时读懂互联网广告新生态
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    扫码加入运维网微信交流群X

    扫码加入运维网微信交流群

    扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

    扫描微信二维码查看详情

    客服E-mail:kefu@iyunv.com 客服QQ:1061981298


    QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


    提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


    本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



    合作伙伴: 青云cloud

    快速回复 返回顶部 返回列表