上下文管理、redis发布订阅、RabbitMQ发布订阅、SQLAlchemy
一、上下文管理importcontextlib
@contextlib.contextmanager
def work_state(state_list,worker_thread):
state_list.append(worker_thread)
try:
yield
finally:
state_list.remove(worker_thread)
free_list=[]
current_thread="alex"
with work_state(free_list,current_thread):
print(123)
print(456)
#以下为执行结果:
123
456
代码执行步骤
上下文用于需要 close()方法的模块
importcontextlib
importsocket
@contextlib.contextmanager
def context_socket(host,port):
sk=socket.socket()
sk.bind((host,port))
sk.listen(5)
try:
yield sk
finally:
sk.close()
with context_socket('127.0.0.1',8888) as sock:
print(sock)
#以下为执行结果:
<socket.socket fd=224, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8888)>
二、redis 发布订阅
#redis2.py 主程序
importredis
class RedisHelper:
def __init__(self):
self.__conn=redis.Redis(host='192.168.11.87')
def public(self,msg,chan):
self.__conn.publish(chan,msg)
returnTrue
def subscribe(self,chan):
pub=self.__conn.pubsub()
pub.subscribe(chan)
pub.parse_response()
returnpub
订阅
import redis2
obj= redis2.RedisHelper()
data=obj.subscribe('fm111.7')
print(data.parse_response())
#接收到发布信息:
发布
import redis2
obj= redis2.RedisHelper()
obj.public('alex_db','f111.7')
三、RabbitMQ
import pika
#生产者 发布
connection =pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87'))
channel = connection.channel()
channel.queue_declare(queue='hello_wuwenyu') #创建队列,存在则忽略
channel.basic_publish(exchange='', routing_key='hello_wuwenyu', body='Hello World') print(" Sent 'Hello World!'") connection.close
import pika
#消费者 订阅
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87'))
channel = connection.channel()
channel.queue_declare(queue='hello_wuwenyu')#
def callback(ch,method,properties,body):
print(" Received %r" % body)
channel.basic_consume(callback,
queue='hello_wuwenyu',
no_ack=True)
print('
[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
#接收到生产者发来的消息:
[*] Waiting for messages. To exit press CTRL+C
Received b'Hello World'
2 exchange 绑定多个队列
#
import pika
#生产者 发布
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.11.87'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_fanout',
type='fanout')
message = '456'
channel.basic_publish(exchange='logs_fanout',
routing_key='',
body=message)
print(" Sent %r" % message)
connection.close()
import pika
#订阅
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.11.87'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_fanout',
type='fanout')
# 随机创建队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 绑定
channel.queue_bind(exchange='logs_fanout',
queue=queue_name)
print('
[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
#执行多次消费端,随机产生多个队列,每个队列都接收到消息:
[*] Waiting for logs. To exit press CTRL+C
b'456'
关键字
#生产者severity = 'info' severity = 'errer' 执行两次
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.11.87'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs_wuwenyu',
type='direct')
severity = 'info'
# severity = 'errer'
message = '123'
channel.basic_publish(exchange='direct_logs_wuwenyu',
routing_key=severity,
body=message)
print(" Sent %r:%r" % (severity, message))
connection.close()
#订阅 消费 客户端1
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.11.87'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs_wuwenyu',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities =['error','info','warning']
for severity in severities:
channel.queue_bind(exchange='direct_logs_wuwenyu',
queue=queue_name,
routing_key=severity)
print('
[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
#接受到的消息:
[*] Waiting for logs. To exit press CTRL+C
'error':b'123'
'info':b'123'
#订阅 消费 客户端2
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.11.87'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs_wuwenyu',
type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities =['error',]
for severity in severities:
channel.queue_bind(exchange='direct_logs_wuwenyu',
queue=queue_name,
routing_key=severity)
print('
[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
#接受到的消息:
[*] Waiting for logs. To exit press CTRL+C
'info':b'123'
四、SQLAlchemy
SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
MySQL-Python
mysql+mysqldb://:@[:]/
pymysql
mysql+pymysql://:@/[?]
MySQL-Connector
mysql+mysqlconnector://:@[:]/
cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html
步骤一:
使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
fromsqlalchemy importcreate_engine
engine =create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
engine.execute(
"INSERT INTO ts_test (a, b) VALUES ('2', 'v1')"
)
engine.execute(
"INSERT INTO ts_test (a, b) VALUES (%s, %s)",
((555, "v1"),(666, "v1"),)
)
engine.execute(
"INSERT INTO ts_test (a, b) VALUES (%(id)s, %(name)s)",
id=999, name="v1"
)
result =engine.execute('select * from ts_test')
result.fetchall()
事务操作 注:查看数据库连接:show status like 'Threads%';
步骤二:
使用 Schema Type/SQL Expression
Language/Engine/ConnectionPooling/Dialect 进行数据库操作。Engine使用Schema
Type创建一个特定的结构对象,之后通过SQL Expression
Language将该对象转换成SQL语句,然后通过 ConnectionPooling 连接数据库,再然后通过 Dialect
执行SQL,并获取结果。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
fromsqlalchemy importcreate_engine, Table, Column, Integer, String, MetaData, ForeignKey
metadata =MetaData()
user =Table('user', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
color =Table('color', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
engine =create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
metadata.create_all(engine)
# metadata.clear()
# metadata.remove()
增删改查 更多内容详见:
http://www.jianshu.com/p/e6bba189fcbd
http://docs.sqlalchemy.org/en/latest/core/expression_api.html
注:SQLAlchemy无法修改表结构,如果需要可以使用SQLAlchemy开发者开源的另外一个软件Alembic来完成。
步骤三:
使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
fromsqlalchemy.ext.declarative importdeclarative_base
fromsqlalchemy importColumn, Integer, String
fromsqlalchemy.orm importsessionmaker
fromsqlalchemy importcreate_engine
engine =create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
Base =declarative_base()
classUser(Base):
__tablename__ ='users'
id=Column(Integer, primary_key=True)
name =Column(String(50))
# 寻找Base的所有子类,按照子类的结构在数据库中生成对应的数据表信息
# Base.metadata.create_all(engine)
Session =sessionmaker(bind=engine)
session =Session()
# ########## 增 ##########
# u = User(id=2, name='sb')
# session.add(u)
# session.add_all([
# User(id=3, name='sb'),
# User(id=4, name='sb')
# ])
# session.commit()
# ########## 删除 ##########
# session.query(User).filter(User.id > 2).delete()
# session.commit()
# ########## 修改 ##########
# session.query(User).filter(User.id > 2).update({'cluster_id' : 0})
# session.commit()
# ########## 查 ##########
# ret = session.query(User).filter_by(name='sb').first()
# ret = session.query(User).filter_by(name='sb').all()
# print ret
# ret = session.query(User).filter(User.name.in_(['sb','bb'])).all()
# print ret
# ret = session.query(User.name.label('name_label')).all()
# print ret,type(ret)
# ret = session.query(User).order_by(User.id).all()
# print ret
# ret = session.query(User).order_by(User.id)
# print ret
# session.commit()
页:
[1]