erlchina 发表于 2017-7-2 16:21:06

redis,rabbitmq,SqlAlchemy

redis发布和订阅

  发布者一旦发送消息,那么所有订阅者都会收到。
  RedisHelper




#!/usr/bin/env python
#-*- coding:utf-8 -*-
importredis
class redishelper:
def __init__(self):
self.__conn = redis.Redis(host='192.168.11.87')
def public(self, msg, chan):
self.__conn.publish(chan, msg)
return msg,chan
def subscribe(self, chan):
pub = self.__conn.pubsub()
pub.subscribe(chan)
pub.parse_response()
return pub


发布者



#!/usr/bin/env python
#-*- coding:utf-8 -*-
import b1
obj = b1.redishelper()      #实例化方法
obj.public('aaaaaa','fm111.7')          #执行发布

订阅者


obj = b1.redishelper()         #实例化方法
data = obj.subscribe('fm111.7')#调用订阅方法
while True:
msg = data.parse_response()   #接收发布消息
print(msg)

RabbitMQ
  RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
  MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
  安装




#安装配置epel源
$ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
#安装erlang
$ yum -y install erlang
#安装RabbitMQ
$ yum -y install rabbitmq-server
#启动、关闭服务
service rabbitmq-server start/stop

API
python -m pip install pika
1.基于RabbitMQ实现生产者消费者模型。对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
生产者代码



#!/usr/bin/env python
#-*- coding:utf-8 -*-
import pika
# ######################### 发消息 #########################
#连接rabbitmq服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.11.158'))
#创建频道
channel = connection.channel()
#如果没有这个队列会创建一个
channel.queue_declare(queue='hello')
#向队列插入数值 routing_key是队列名 body是要插入的内容
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" Sent 'Hello World!'")
#关闭连接
connection.close()

  消费者代码





#!/usr/bin/env python
#-*- coding:utf-8 -*-
import pika
# ##########################取消息 ##########################
#连接rabbitmq服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.11.158'))
#创建频道
channel = connection.channel()
#如果生产者没有运行创建队列,那么消费者创建队列
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" Received %r" % body)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
print('
[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


  当生产者生成一条数据,被消费者接收,消费者中断后如果不超过10秒,连接的时候数据还在。当超过10秒之后,重新链接,数据将消失。消费者等待链接。
  2.消息不丢失(数据持久化)
  1.当把no_ack=false时,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,RabbitMQ会重新将该任务添加到队列中。
  2.durable
  生产者代码



#!/usr/bin/env python
import pika
#链接rabbit服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
#创建频道
channel = connection.channel()
#创建队列,使用durable方法
channel.queue_declare(queue='hello', durable=True)
#如果想让队列实现持久化那么加上durable=True
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2,
#标记我们的消息为持久化的 - 通过设置 delivery_mode 属性为 2
#这样必须设置,让消息实现持久化
))
#这个exchange参数就是这个exchange的名字. 空字符串标识默认的或者匿名的exchange:如果存在routing_key, 消息路由到routing_key指定的队列中。
print(" 开始队列'")
connection.close()

  消费者代码




#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
#创建频道
channel = connection.channel()
#创建队列,使用durable方法
channel.queue_declare(queue='hello', durable=True)

def callback(ch, method, properties, body):
print(" Received %r" % body)
import time
time.sleep(10)
print('ok')
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback,
queue='hello',
no_ack=False)
print('
[*] 等待队列. To exit press CTRL+C')
channel.start_consuming()


  注:标记消息为持久化的并不能完全保证消息不会丢失,尽管告诉RabbitMQ保存消息到磁盘,当RabbitMQ接收到消息还没有保存的时候仍然有一个 短暂的时间窗口. RabbitMQ不会对每个消息都执行同步fsync(2) --- 可能只是保存到缓存cache还没有写入到磁盘中,这个持久化保证不是很强,但这比我们简单的任务queue要好很多,如果你想很强的保证你可以使用 publisher confirms
  3.消息获取顺序
  默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
  channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列
  消费者代码



#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()
# make message persistent
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
print(" Received %r" % body)
import time
time.sleep(10)
print 'ok'
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)   #表示谁来谁取,不再按照奇偶排列
channel.basic_consume(callback,
queue='hello',
no_ack=False)
print('
[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

  发布与订阅
  发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
  发布者发送的消息实际是先发送到exchange,然后由exchange发送到相对应的队列。
  exchange类型可用: direct , topic , headers 和 fanout 。
  exchange type = fanout

  发布者



#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
message = ' '.join(sys.argv) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" Sent %r" % message)
connection.close()

  订阅者



#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
result = channel.queue_declare(exclusive=True)       #队列断开后自动删除临时队列
queue_name = result.method.queue                     # 队列名采用服务端分配的临时队列
channel.queue_bind(exchange='logs',
queue=queue_name)
print('
[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()

  关键字发送( exchange type = direct)
  之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

  生产者



#!/usr/bin/env python
#-*- coding:utf-8 -*-
##########发送消息
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.11.87'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
severity = 'info'
message = 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" Sent %r:%r" % (severity, message))
connection.close()
消费者


#!/usr/bin/env python
#-*-coding=utf-8-*-
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.11.87'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = ['error','warning','info']
if not severities:
sys.stderr.write("Usage: %s \n" % sys.argv)
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print('
[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()

模糊匹配( exchange type = topic)
  在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。



#表示可以匹配0个或多个单词
*表示只能匹配一个单词
发送者路由值            队列中
old.boy.python          old.*-- 不匹配
old.boy.python          old.#-- 匹配

  生产者



#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" Sent %r:%r" % (routing_key, message))
connection.close()

  消费者



#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv
if not binding_keys:
sys.stderr.write("Usage: %s ...\n" % sys.argv)
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print('
[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()

SQLAlchemy
  SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

  schema/type:定义的一种映射格式,把表映射成类。
  sql expression language: 封装了增删改查的sql语句
  engine:引擎
  connection pooling: 连接池
  dialect:用于和数据库API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作



MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]

  示例,中间状态 演示一个过程





#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy import create_engine
#初始化数据库连接
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)
# 执行SQL
# cur = engine.execute(
#   "INSERT INTO hosts (host, color_id) VALUES ('1.1.1.22', 3)"
# )
# 新插入行自增ID
# cur.lastrowid
# 执行SQL
# cur = engine.execute(
#   "INSERT INTO hosts (host, color_id) VALUES(%s, %s)",[('1.1.1.22', 3),('1.1.1.221', 3),]
# )

# 执行SQL
# cur = engine.execute(
#   "INSERT INTO hosts (host, color_id) VALUES (%(host)s, %(color_id)s)",
#   host='1.1.1.99', color_id=3
# )
# 执行SQL
# cur = engine.execute('select * from hosts')
# 获取第一行数据
# cur.fetchone()
# 获取第n行数据
# cur.fetchmany(3)
# 获取所有数据
# cur.fetchall()

  增删改查



#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey
metadata = MetaData()
user = Table('user', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
color = Table('color', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
conn = engine.connect()
# 创建SQL语句,INSERT INTO "user" (id, name) VALUES (:id, :name)
conn.execute(user.insert(),{'id':7,'name':'seven'})
conn.close()
# sql = user.insert().values(id=123, name='wu')
# conn.execute(sql)
# conn.close()
# sql = user.delete().where(user.c.id > 1)
# sql = user.update().values(fullname=user.c.name)
# sql = user.update().where(user.c.name == 'jack').values(name='ed')
# sql = select()
# sql = select()
# sql = select().where(user.c.id==color.c.id)
# sql = select().order_by(user.c.name)
# sql = select().group_by(user.c.name)
# result = conn.execute(sql)
# print result.fetchall()
# conn.close()

  完整示例



from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
fromsqlalchemy.orm import sessionmaker
Base = declarative_base() #生成一个SqlORM 基类

engine = create_engine("mysql+mysqldb://root@localhost:3306/test",echo=False)

class Host(Base):
__tablename__ = 'hosts'
id = Column(Integer,primary_key=True,autoincrement=True)
hostname = Column(String(64),unique=True,nullable=False)
ip_addr = Column(String(128),unique=True,nullable=False)
port = Column(Integer,default=22)
Base.metadata.create_all(engine) #创建所有表结构
if __name__ == '__main__':
SessionCls = sessionmaker(bind=engine) #创建与数据库的会话session class ,注意,这里返回给session的是个class,不是实例
session = SessionCls()
#h1 = Host(hostname='localhost',ip_addr='127.0.0.1')
#h2 = Host(hostname='ubuntu',ip_addr='192.168.2.243',port=20000)
#h3 = Host(hostname='ubuntu2',ip_addr='192.168.2.244',port=20000)
#session.add(h3)
#session.add_all( )
#h2.hostname = 'ubuntu_test' #只要没提交,此时修改也没问题
#session.rollback()
#session.commit() #提交
res = session.query(Host).filter(Host.hostname.in_(['ubuntu2','localhost'])).all()
print(res)
页: [1]
查看完整版本: redis,rabbitmq,SqlAlchemy