设为首页 收藏本站
查看: 1458|回复: 0

[经验分享] RabbitMQ、Memcached、SQLAlchemy

[复制链接]
发表于 2017-7-2 17:06:36 | 显示全部楼层 |阅读模式
  一、RabbitMQ
  1、基础概念
  rabbitMQ说白了就是一个消息队列,类似于Queue,也是生产者与消费者模型.只不过做了扩展,所不同的是Queue在内存中的消息队列,而RabbitMQ是部署在机器上的;
  一般而言,生成者往队列中放数据,而消费者从队列中取数据;


DSC0000.gif DSC0001.gif


import Queue
import threading
message = Queue.Queue(10)
def producer(i):    #生产者
while True:
message.put(i)
def consumer(i):    #消费者
while True:
msg = message.get(i)
for i in range(5):
t=threading.Thread(target=producer,args=(i,))
t.start()
for i in range(2):
t=threading.Thread(target=consumer,args=(i,))
t.start()
生产者与消费者模型  RabbitMQ单独安装在一台服务器上,作为一种消息中间件,生产者和消费者都需要能够连接上这台机器;
  配置安装epel源:      rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
  安装erlang:           yum -y install erlang
  安装rabbitmq_server并启动服务:    yum -y install rabbitmq-server                  /etc/init.d/rabbitmq-server start
  客户端(生成者和消费者)需要安装API接口pika





#!/usr/bin/env python
import pika   
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.72.80'))
channel = connection.channel()    #通道
channel.queue_declare(queue='hello')    #生成队列,名为hello
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')     #往队列中插入的数据内容
print(" [x] Sent 'Hello World!'")
connection.close()
producer




#!/usr/bin/env python
# _*_ coding:utf-8 _*_
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.72.80'))
channel = connection.channel()
channel.queue_declare(queue='hello')    #生成一个队列叫做hello,在消费者中可以省略,但前提是生成者首先运行起来,产生消息队列hello,否在在下面往hello队列中取数据的时候,会出错;队列不会重复去定义;
def callback(ch, method, properties, body):    #body表示拿到的数据
print(" [x] Received %r" % body)
channel.basic_consume(callback,    #从hello队列中拿数据,拿到数据就会执行callback函数;no_ack是为了防止消息丢失,如果为False,那么如果消费者没有给生成者返回ack,那么生成者就会将数据再次放入到队列中。
queue='hello',
no_ack=True)
print('
  • Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    consumer  2、RabbitMQ消息失联策略
      关于no_ack=False,如果消费者将队列中的数据取走而没有给应答,那么重新启动消费者的时候依然可以看到之前的数据。





    #!/usr/bin/env python
    # _*_ coding:utf-8 _*_
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.72.80'))
    channel = connection.channel()
    channel.queue_declare(queue='hello')
    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)   
    #拿到数据之后,中断,再次执行看能否接受到数据
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)
    channel.basic_consume(callback,
    queue='hello',
    no_ack=False)
    print('
  • Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    消费者失联  3、RabbitMQ服务器异常策略
      如果服务器宕机,列中的数据就会丢失,解决方法是利用持久化,即将数据保存早硬盘中;
      持久化是在生产者部分实现的,要实现持久化,队列和消息必须都要持久化,对于消费者,也可以不指定,取决于先启动生产者还是消费者;





    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.72.80'))
    channel = connection.channel()
    # make message persistent
    channel.queue_declare(queue='hello', durable=True)
    channel.basic_publish(exchange='',
    routing_key='hello',
    body='Hello World!',
    properties=pika.BasicProperties(
    delivery_mode=2, # make message persistent
    ))
    print(" [x] Sent 'Hello World!'")
    connection.close()
    生产者




    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.72.80'))
    channel = connection.channel()
    # make message persistent
    channel.queue_declare(queue='hello', durable=True)    #对于之前创建的队列,不能够实现持久化;只能对新创建的队列实现持久化
    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print 'ok'
    ch.basic_ack(delivery_tag = method.delivery_tag)
    channel.basic_consume(callback,
    queue='hello',
    no_ack=False)
    print('
  • Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    消费者  4、RabbitMQ消息获取顺序
      假设有两个消费者,分别往队列中拿数据,一个拿奇数,一个拿偶数,奇数简单而偶数复杂,那么一个消费者永远繁忙,而另一个永远轻松,channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列;
      5、RabbitMQ发布和订阅
      在消息的生成者和消费者中间通过exchange消息中间件实现,发布者在消息发布到队列之前不会将数据发布到队列中,而是放置到exchange中(之前的exchange为空),exchange的名字可以是任意的,exchange可以和多个队列进行绑定,这样exchange会将数据向绑定的队列中各发送一份数据;这样就实现了订阅和发布的功能啦~~~
    DSC0002.png

      以下创建fanout





    import pika
    import sys
    connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.72.80'))
    channel = connection.channel()
    channel.exchange_declare(exchange='logs',     #发布者只需绑定exchange
    type='fanout')
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    channel.basic_publish(exchange='logs',
    routing_key='',
    body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    发布者




    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.72.80'))
    channel = connection.channel()
    channel.exchange_declare(exchange='logs',
    type='fanout')
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    channel.queue_bind(exchange='logs',    #消费者需要将队列和exchange绑定起来
    queue=queue_name)
    print('
  • Waiting for logs. To exit press CTRL+C')
    def callback(ch, method, properties, body):
    print(" [x] %r" % body)
    channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)
    channel.start_consuming()
    接受者  6、RabbitMQ实现发布订阅和关键字匹配
      解释fanout类型:exchange收到消息会将数据直接发送给队列
      关键字匹配:根据关键字将exchange的数据发送到不同的队列中;





    #!/usr/bin/env python
    # _*_ coding:utf-8 _*_
    import pika
    import sys
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='192.168.72.80'))
    channel = connection.channel()
    channel.exchange_declare(exchange='direct_logs',type='direct')    #必须为direct类型
    message ='Hello World!'
    channel.basic_publish(exchange='direct_logs',
    routing_key='db',    #生产者发送的关键字
    body=message)
    print(" [x] Sent %r:%r" % ('dali', message))
    connection.close()
    生产者




    #!/usr/bin/env python
    # _*_ coding:utf-8 _*_
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.72.80'))
    channel = connection.channel()
    channel.exchange_declare(exchange='direct_logs',
    type='direct')
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    channel.queue_bind(exchange='direct_logs',
    queue=queue_name,
    routing_key="dali")
    channel.queue_bind(exchange='direct_logs',
    queue=queue_name,
    routing_key="db")    #匹配到的关键字
    print('
  • Waiting for logs. To exit press CTRL+C')
    def callback(ch, method, properties, body):
    print(" [x] %r" % body)
    channel.basic_consume(callback,queue=queue_name,no_ack=True)
    channel.start_consuming()
    消费者1




    #!/usr/bin/env python
    # _*_ coding:utf-8 _*_
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.72.80'))
    channel = connection.channel()
    channel.exchange_declare(exchange='direct_logs',
    type='direct')
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    channel.queue_bind(exchange='direct_logs',
    queue=queue_name,
    routing_key="erbi")
    channel.queue_bind(exchange='direct_logs',
    queue=queue_name,
    routing_key="db")    #匹配到的关键字
    print('
  • Waiting for logs. To exit press CTRL+C')
    def callback(ch, method, properties, body):
    print(" [x] %r" % body)
    channel.basic_consume(callback,
    queue=queue_name,
    no_ack=True)
    channel.start_consuming()
    消费者2  关键字模糊匹配:
      #匹配0和或多个单词
      *只能匹配一个单词
      监控作业内容:





    1、使用Redis实现配置信息和数据的传输与存储。
    2、执行命令采集指标
    3、机器分类、实现监控模板
    如:{ 'CPU':{'cmd',}}
    机器根据主机名取模板,key为主机名,value为模板;
    课可以通过反射类实现:
    def cpu():
    return 'CPU指标'
    s='hi'
    func=getattr(api,"hi")
    result = func()
    其中api.py最少的监控指标为两项
    4、某个指标超过多长时间内n次报警;
    5、使用线程:
    a、每隔几秒,去取自己的模板
    b、使用自己的模板
    6、运行程序获取监控指标,通过Redis的订阅和发布功能实现;
    需求  二、Memcached
      1、基础概念
      将经常访问的内容放在缓存中,Memcache是一种缓存的机制,是管理内存的; Memcached不同于Redis,只有一种数据类型,即key-value;
      安装memcached:
      wget http://memcached.org/latest


    tar -zxvf memcached-1.x.x.tar.gz

    cd memcached-1.x.x

    ./configure && make && make test && sudo make install

    PS:依赖libevent

           yum install libevent-devel

           apt-get install libevent-dev



    使用python操作memcached需要安装相关模块:

    https://pypi.python.org/pypi/python-memcached



    启动memcached:   memcached -d -m 128 -u root -p 12000

    成功启动之后,可以使用telnet连接memcached操作了.



    2、set和set






    import memcache
    mc = memcache.Client(['192.168.72.80:12000'], debug=True)    #debug表示在执行的时候,如果出错,信息就会以debug的模式输出
    mc.set("foo", "bar")  
    ret = mc.get('foo')
    print ret
    set和get  set还具有修改键值对的功能
      3、Memcached集群
      memcached集群主机中后面会追加权重值,在memcached server内存中会维护一个memcached主机列表,主机列表中主机的重复此处与权重值相关





    假如要执行mc.set("foo", "bar")
    1、将foo转换为一个数字
    import binascii
    print memcache.cmemcache_hash('foo')    #将字符串foo转换为数字
    2、将该数字和主机列表长度求余数,将余数作为索引,以此来找到列表中主机的IP,将该KEY_VALUE值存入主机中;
    原理  4、add
      增加键值对,如果已经存在该键,报错.
      5、replace
      修改某个键的值,如果该键不存在,报错/异常
      6、set和set_multi





    import memcache
    mc = memcache.Client([('192.168.72.80:12000',1)], debug=True)
    mc.set_multi({'key1': 'val1', 'key2': 'val2'})
    print mc.get('key1')
    set_multi  7、delete和delete_multi
      如果存在该键,删除,不存在,不出异常





    import memcache
    mc = memcache.Client([('192.168.72.80:12000',1)], debug=True)
    mc.set_multi({'key1': 'val1', 'key2': 'val2'})
    print mc.get('key1')
    mc.delete_multi(['key1','key2'])    #键值存在,删除,不存在,报异常
    delete_multi

    8、get和get_multi






    import memcache
    mc = memcache.Client([('192.168.72.80:12000',1)], debug=True)
    mc.set_multi({'key1': 'val1', 'key2': 'val2'})
    print mc.get('key1')
    #mc.delete_multi(['key1','key2'])
    item_dict = mc.get_multi(["key1","key2"])
    print item_dict    #获取结果为字典
    get_multi  9、append和prepend
      append:指定key的值,在该key后面追加内容
      prepend:指定key的值,在该key的值前面追加内容





    import memcache
    mc = memcache.Client([('192.168.72.80:12000',1)], debug=True)
    mc.set_multi({'k1': 'val1', 'key2': 'val2'})
    print mc.get('k1')
    mc.append('k1','after')    #如果K1的值之前不存在,报异常
    print mc.get('k1')
    mc.prepend('k1','before')
    print mc.get('k1')
    append和prepend  10、desr和incr
      incr:自增,默认自增1
      decr:自减,默认自减1





    import memcache
    mc = memcache.Client(['192.168.72.80:12000'], debug=True)
    mc.set('k1','777')
    mc.incr('k1')
    print mc.get('k1')
    mc.incr('k1',10)
    mc.decr('k1')
    print mc.get('k1')
    mc.decr('k1',10)
    print mc.get('k1')
    decr和incr  11、gets和cas
      具体查看


    具体查看memcache.py源代码,本质上是利用socket来send和Recv来实现上述功能的;







    redis文档:http://doc.redisfans.com/









    三、SQLALchemy

    参考地址:

              http://www.cnblogs.com/alex3714/articles/5978329.html



    sqlarchemy是一款著名的python ORM框架,可以让对数据库小白的用户或者开发者不必写蛋疼的SQL,而直接通过程序语言对数据库进行操作,sqlarchemy提供的程序语言方法,可以先转换为SQL,再通过SQL对数据库进行相关操作;

    可以有四种方式+sqlarchemy进行连接数据库操作,我这里采用的是pymysql,因为windows上安装mysql-python之类的包,会出现各种问题,并且python3目前还不支持mysql-python;



    1、创建表;

    sqlarchemy创建数据库表有两种方式:MetaData和declarative_base



    使用MetaData




    from sqlalchemy import create_engine,select, Table, Column, Integer, String, MetaData, ForeignKey
    metadata = MetaData()
    user = Table('user', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(20)),
    )
    color = Table('color', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(20)),
    )
    engine = create_engine("mysql+pymysql://root:@192.168.74.20:3306/test", max_overflow=5)
    metadata.create_all(engine)

      declarative_base



    from sqlalchemy import create_engine,and_,or_,func,Table
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String,ForeignKey
    from  sqlalchemy.orm import sessionmaker,relationship
    Base = declarative_base() #生成一个SqlORM 基类

    Host2Group = Table('host_2_group',Base.metadata,
    Column('host_id',ForeignKey('host.id'),primary_key=True),
    Column('group_id',ForeignKey('group.id'),primary_key=True),
    )


    engine = create_engine("mysql+pymysql://root:@192.168.74.20:3306/test",echo=True)            #echo=True表示打印出创建过程

    class Host(Base):
    __tablename__ = 'host'
    id = Column(Integer,primary_key=True,autoincrement=True)
    hostname = Column(String(64),unique=True,nullable=False)
    ip_addr = Column(String(128),unique=True,nullable=False)
    port = Column(Integer,default=22)
    #group_id = Column(Integer, ForeignKey('group.id'))
    groups = relationship('Group',
    secondary=Host2Group,
    backref='host_list')
    #group =relationship("Group",backref='host_list')
    #group =relationship("Group",back_populates='host_list')
    def __repr__(self):
    return  "<id=%s,hostname=%s, ip_addr=%s>" %(self.id,
    self.hostname,
    self.ip_addr)
    class Group(Base):
    __tablename__ = 'group'
    id = Column(Integer,primary_key=True)
    name = Column(String(64),unique=True,nullable=False)
    #host_id = Column(Integer,ForeignKey('hosts.id'))
    #host_list =relationship("Host", back_populates="group")
    #hosts =relationship("Host")
    def __repr__(self):
    return  "<id=%s,name=%s>" %(self.id,self.name)
    Base.metadata.create_all(engine) #创建所有表结构

      2、增删改查



    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey
    metadata = MetaData()
    user = Table('user', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(20)),
    )
    color = Table('color', metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(20)),
    )
    engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
    conn = engine.connect()
    # 创建SQL语句,INSERT INTO "user" (id, name) VALUES (:id, :name)
    conn.execute(user.insert(),{'id':7,'name':'seven'})
    conn.close()
    # sql = user.insert().values(id=123, name='wu')
    # conn.execute(sql)
    # conn.close()
    # sql = user.delete().where(user.c.id > 1)
    # sql = user.update().values(fullname=user.c.name)
    # sql = user.update().where(user.c.name == 'jack').values(name='ed')
    # sql = select([user, ])
    # sql = select([user.c.id, ])
    # sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id)
    # sql = select([user.c.name]).order_by(user.c.name)
    # sql = select([user]).group_by(user.c.name)
    # result = conn.execute(sql)
    # print result.fetchall()
    # conn.close()

  • 运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
    2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
    3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
    4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
    5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
    6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
    7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
    8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

    所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390490-1-1.html 上篇帖子: 问题 1036: C语言程序设计教程(第三版)课后习题9.1 下篇帖子: 《中国程序化广告技术生态图》2016年第二季更新发布
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    扫码加入运维网微信交流群X

    扫码加入运维网微信交流群

    扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

    扫描微信二维码查看详情

    客服E-mail:kefu@iyunv.com 客服QQ:1061981298


    QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


    提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


    本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



    合作伙伴: 青云cloud

    快速回复 返回顶部 返回列表