设为首页 收藏本站
查看: 2055|回复: 0

[经验分享] 第十 redis&rabbitMQ

[复制链接]

尚未签到

发表于 2017-12-9 11:48:33 | 显示全部楼层 |阅读模式
   一、redis 操作
  1.连接方式
  1.1.插入键值对



#!/usr/bin/env python
#-*- coding:utf-8 -*-
import redis
#插入一个键值对
r = redis.Redis(host='192.168.161.129',port=6379,password="60887")
r.set('foo','bar')
print(r.get('foo'))
  运行结果:
DSC0000.png

  1.2.连接池



#!/usr/bin/env python
#-*- coding:utf-8 -*-
import redis
#创建一个连接池,避免每次建立、释放连接的开销
pool = redis.ConnectionPool(host='192.168.161.129',port=6379,password="60887")
r = redis.Redis(connection_pool=pool)
r.set('name','ckl')
print(r.get('name'))
  运行结果:
DSC0001.png

  2.string 类型
  2.1.设置值过期时长




#ex:过期时长
r.set('age',20,ex=10)
print(r.get('age'))
time.sleep(11)
print(r.get('age'))
  运行结果:
DSC0002.png

  十秒后失效
  2.2.如果值不错在,则赋值




#age 不存在,执行操作
r.set('age',21,nx=True)
print(r.get('age'))
  运行结果:
DSC0003.png

  2.3.批量设值




#批量设值
r.mset({'k1':'v1','k2':'v2'})
print(r.mget('k1','k2'))
  运行结果:
DSC0004.png

  2.4.为某个键设置一个新的值,但返回旧的值




#设值新值,获取原来的值
print(r.getset('k1','md1'))
print(r.get('k1'))
  运行结果:
DSC0005.png

  2.5.相当于切片




#获取返回值的范围
r.set('qc','lostAtShangHai')
print(r.getrange('qc',3,7))
  运行结果:
DSC0006.png

  2.6.修改值的部分内容




#修改值的部分内容
r.setrange('qc',3,'KKK')
print(r.get('qc'))
#b'losKKKShangHai'
  运行结果:
DSC0007.png

  2.7.相当于统计UV的方法




#统计UV
r.setbit('uv_count',5,1)
r.setbit('uv_count',8,1)
r.setbit('uv_count',6,1)
r.setbit('uv_count',6,1)
print(r.bitcount('uv_count'))
  运行结果:
DSC0008.png

  2.8.统计PV方法




#统计PV,
r.incr('pv_count',3)
r.incr('pv_count',3)
r.incr('pv_count',3)
r.incr('pv_count',3)
print(r.get("pv_count"))
  运行结果:
DSC0009.png

  2.9.在值的后面增加内容



#在后面追加内容
r.set('wu','kaka')
print(r.get('wu'))
r.append('wu','sasa')
print(r.get('wu'))
  运行结果:
DSC00010.png

  3.hash 类型
  3.1.插入单条数据




#插入单条数据
r.hset('taihu','wuxi','yuantouzhu')
print(r.hscan('taihu'))
  运行结果:
DSC00011.png

  3.2.获取批量值




#获取批量值
r.hmset('hubo',{'qinghaihu':'qinghai','panyanghu':'panyang'})
print(r.hmget('hubo','qinghaihu','panyanghu'))
  运行结果:
DSC00012.png

  3.3.获取所有键值




#获取所有键值
print(r.hgetall('hubo'))
  运行结果:
DSC00013.png

  3.4.获取键值对数量



#获取键值对的个数
print(r.hlen('hubo'))
  运行结果:
DSC00014.png

  4.list 类型
  4.1.插入值,获取所有值



#获取所有的键值
r.lpush('diuList',11,22,44,66)
print(r.lrange('diuList',0,-1))
  运行结果:
DSC00015.png

  4.2.插入值



#在值22的前面插入值88
r.linsert('diuList','BEFORE',22,88)
print(r.lrange('diuList',0,-1))
  运行结果:
DSC00016.png

  4.3.获取第一个值及获取元素个数



#获取第一个值
print(r.lindex('diuList',0))
#获取元素个数
print(r.llen('diuList'))
  运行结果:
DSC00017.png

  5.set 类型
  5.1.添加数据



r.sadd('SetA','888')
r.sadd('SetA','999')
r.sadd('SetB','444')
r.sadd('SetB','555')
print(r.sscan('SetA'))
print(r.sscan('SetB'))
  运行结果:
DSC00018.png

  5.2.移动值



#将A的888移到B里
r.smove('SetA','SetB','888')
print(r.sscan('SetA'))
print(r.sscan('SetB'))
  运行结果:
DSC00019.png

  二、redis发布订阅
  2.1.发布订阅类方法



#!/usr/bin/env python
#-*- coding:utf-8 -*-
import redis
class RedisHelper:
def __init__(self):
self.__conn = redis.Redis(host='192.168.161.129',port=6379,password="60887")
self.chan_sub = 'fm101.5'
self.chan_pub = 'fm99.6'  
#发布频道方法
def public(self,msg):
self.__conn.publish(self.chan_pub,msg)
return True
#订阅频道方法
def subscribe(self):
pub = self.__conn.pubsub()  
pub.subscribe(self.chan_sub)
pub.parse_response()
return pub
  2.2.订阅



#!/usr/bin/env python
#-*- coding:utf-8 -*-
from day10.RedisSub import RedisHelper
obj = RedisHelper()
redis_sub = obj.subscribe()
while True:
msg = redis_sub.parse_response()
print(msg)
  2.3.发布



#!/usr/bin/env python
#-*- coding:utf-8 -*-
import redis
r = redis.Redis(host='192.168.161.129',port=6379,password="60887")
r.publish('fm101.5','who are u?')
  运行多个订阅
DSC00020.png

DSC00021.png

DSC00022.png

  开始发布:
DSC00023.png

  查看订阅结果:
DSC00024.png

DSC00025.png

DSC00026.png

   二、rabbitMQ
  2.1.安装rabbitMQ略过
  2.1.1.消息发送接收
  生产端示例



#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pika
credentials = pika.PlainCredentials('guest','guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.161.129',5672,'/',credentials))
#定义管道
channel = connection.channel()
#定义queue名称
channel.queue_declare(queue='ckl')
#rabbitMQ 不能直接将消息发送,而需要将消息通过交换器,此处是空,转发给queue
channel.basic_publish(exchange='',
routing_key='ckl',
body='Hello your m kitty')
print(" [x] Sent 'Hello World!'")
connection.close()
  运行结果:
DSC00027.png

  查看rabbitMQ队列:
DSC00028.png

  2.1.2.客户端示例



#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pika
credentials = pika.PlainCredentials('guest','guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.161.129',5672,'/',credentials))
channel = connection.channel()
#定义队列,如果消费端先启动,队列不存在,就会报错。如果发现队列没有,就自己生成,如果已经存在,就忽略。
channel.queue_declare(queue='ckl')

def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue='ckl',
no_ack=True)
print('
  • Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
      运行结果:
    DSC00029.png

      消息接收后查看队列
    DSC00030.png

       2.2.队列持久化及消息持久化
      2.2.1.队列持久化



    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import pika
    credentials = pika.PlainCredentials('guest','guest')
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.161.129',5672,'/',credentials))
    #定义管道
    channel = connection.channel()
    #定义queue名称
    channel.queue_declare(queue='zld',durable=True)
    #rabbitMQ 不能直接将消息发送,而需要将消息通过交换器,此处是空,转发给queue
    channel.basic_publish(exchange='',
    routing_key='zld',
    body='Hello your m kitty')
    print(" [x] Sent 'Hello World!'")
    connection.close()
      红色部分定义持久化
      运行生产程序:
    DSC00031.png

      运行消费程序:
    DSC00032.png

      重启服务器,查看queue持久化:
    DSC00033.png

      zld这个queue持久化,消息没有持久化
       2.3.定义消息队列持久化
      2.3.1.生产程序:



    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import pika
    credentials = pika.PlainCredentials('guest','guest')
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.161.129',5672,'/',credentials))
    #定义管道
    channel = connection.channel()
    #定义queue名称
    channel.queue_declare(queue='zld',durable=True)
    #rabbitMQ 不能直接将消息发送,而需要将消息通过交换器,此处是空,转发给queue
    channel.basic_publish(exchange='',
    routing_key='zld',
    body='jintian',
    properties = pika.BasicProperties(delivery_mode=2,)
    )
    print(" [x] Sent 'Hello World!'")
    connection.close()
      定义消息持久化,红色部分
      2.3.2.消费程序



    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import pika
    credentials = pika.PlainCredentials('guest','guest')
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.161.129',5672,'/',credentials))
    channel = connection.channel()
    #定义队列,如果消费端先启动,队列不存在,就会报错。如果发现队列没有,就自己生成,如果已经存在,就忽略。
    channel.queue_declare(queue='ckl')

    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)
    channel.basic_consume(callback,
    queue='zld',
    #no_ack=True
    )
    print('
  • Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
      定义消息持久化,红色部分
      运行生产程序:
       DSC00034.png
      运行消费程序:
    DSC00035.png

      关闭消费程序,修改生产程序



    ....
    channel.basic_publish(exchange='',
    routing_key='zld',
    body='life is ...',
    properties = pika.BasicProperties(delivery_mode=2,)
    )
    ....
      运行生产程序:
    DSC00036.png

      重启rabbitMQ服务,查看队列:
    DSC00037.png

      运行消费程序:
    DSC00038.png

       3.rabbitMQ 广播消息,类似发布订阅
      之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了
      Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
      
    fanout: 所有bind到此exchange的queue都可以接收消息
    direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
    topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

       表达式符号说明:#代表一个或多个字符,*代表任何字符
          例:#.a会匹配a.a,aa.a,aaa.a等
              *.a会匹配a.a,b.a,c.a等
         注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 



    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import pika
    credentials = pika.PlainCredentials('guest','guest')
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.161.129',5672,'/',credentials))
    channel = connection.channel()
    #定义exchange名称及类型,不需要知道queue
    channel.exchange_declare(exchange='logs',type='fanout')
    message = 'hi,it is me'
    channel.basic_publish(exchange='logs',
    routing_key='',
    body = message,
    )
    print(" [x] %s" % message)
    connection.close()
      



    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import pika
    credentials = pika.PlainCredentials('guest','guest')
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.161.129',5672,'/',credentials))
    channel = connection.channel()
    #定义exchange及类型
    channel.exchange_declare(exchange='logs',type='fanout')
    ## 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    #绑定exchange及queue
    channel.queue_bind(exchange='logs',queue=queue_name)
    print('
  • Waiting for messages. To exit press CTRL+C')
    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True
    )
    channel.start_consuming()
      运行发布端:
    DSC00039.png

      运行多个客户端:
    DSC00040.png

    DSC00041.png

    DSC00042.png

      4.有选择性的接收发送消息
      服务器程序



    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import pika
    import sys
    credentials = pika.PlainCredentials('guest','guest')
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.161.129',5672,'/',credentials))
    channel = connection.channel()
    channel.exchange_declare(exchange='direct_logs',type='direct')
    serverity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    message = ' '.join(sys.argv[2:]) or 'honichi'
    channel.basic_publish(exchange='direct_logs',
    routing_key=serverity,
    body = message,
    )
    print(" [x] %s : %s " % (serverity,message))
    connection.close()
      客户端程序:



    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import pika
    import sys
    credentials = pika.PlainCredentials('guest','guest')
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.161.129',5672,'/',credentials))
    channel = connection.channel()
    channel.exchange_declare(exchange='direct_logs',type='direct')
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    severities = sys.argv[1:]
    if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
    for severity in severities:
    channel.queue_bind(exchange='direct_logs',
    queue=queue_name,
    routing_key=severity)
    print('
  • Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)
    channel.start_consuming()
      运行服务器端:
    DSC00043.png

      客户端1:
    DSC00044.png

      客户端2:
    DSC00045.png

      客户端3:
    DSC00046.png

      
      生产端程序:



    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import pika
    import sys
    credentials = pika.PlainCredentials('guest','guest')
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.161.129',5672,'/',credentials))
    channel = connection.channel()
    channel.exchange_declare(exchange='topic_logs',type='topic')
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'haha.info'
    message = ' '.join(sys.argv[2:]) or 'saihoulei'
    channel.basic_publish(exchange='topic_logs',
    routing_key = routing_key,
    body = message,
    )
    print(" [x] %s : %s " % (routing_key,message))
    connection.close()
      消费端程序:



    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import pika
    import sys
    credentials = pika.PlainCredentials('guest','guest')
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.161.129',5672,'/',credentials))
    channel = connection.channel()
    channel.exchange_declare(exchange='topic_logs',type='topic')
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    binding_keys = sys.argv[1:]
    if not binding_keys:
    sys.stderr.write("Usage: %s [bind_key].....\n" % sys.argv[0])
    sys.exit(1)
    for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
    queue=queue_name,
    routing_key = binding_key)
    print('
  • Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)
    channel.start_consuming()
      运行服务端程序:
      发送多次不同内容的消息
       DSC00047.png
      消费端运行:
      只能收到自己监听相关内容的
    DSC00048.png

    DSC00049.png

  • 运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
    2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
    3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
    4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
    5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
    6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
    7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
    8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

    所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-422387-1-1.html 上篇帖子: Python操作rabbitmq系列(四):根据类型订阅消息 下篇帖子: 消息队列 (Message Queue)
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    扫码加入运维网微信交流群X

    扫码加入运维网微信交流群

    扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

    扫描微信二维码查看详情

    客服E-mail:kefu@iyunv.com 客服QQ:1061981298


    QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


    提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


    本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



    合作伙伴: 青云cloud

    快速回复 返回顶部 返回列表