设为首页 收藏本站
查看: 1305|回复: 0

[经验分享] python 之路12 RabbitMQ Python 操作mysql

[复制链接]

尚未签到

发表于 2017-7-4 17:14:03 | 显示全部楼层 |阅读模式
  1. RabbitMQ简介
  rabbitmq服务类似于mysql、apache服务,只是提供的功能不一样。rabbimq是用来提供发送消息的服务,可以用在不同的应用程序之间进行通信。
  2.安装RabbitMQ Ubuntu 14.04
  sudo apt-get install rabbitmq-server
  安装好后,rabbitmq服务就已经启动好了。接下来看下python编写Hello World!的实例。实例的内容就是从send.py发送“Hello World!”到rabbitmq,receive.py  从rabbitmq接收send.py发送的信息  
DSC0000.png

rabbitmq消息发送流程(来源rabbitmq官网)
  其中P表示produce,生产者的意思,也可以称为发送者,实例中表现为send.py;C表示consumer,消费者的意思,也可以称为接收者,实例中表现为receive.py;中间红色的表示队列的意思,实例中表现为hello队列。
  python使用rabbitmq服务,可以使用现成的类库pika、txAMQP或者py-amqplib,这里选择了pika。
  4.安装pika
  sudo apt-get install python3-pip
  sudo pip3 install pika
  5.发布 连接到rabbitmq服务器,因为是在本地测试,所以就用localhost就可以了。
  import pika


conn = pika.BlockingConnection(pika.ConnectionParameters(
    host = 'localhost'
))  #链接本地rabbitmq
chan = conn.channel()  #创建一个通信频道
chan.queue_declare(queue='wyx')  #声明消息队列,消息将在这个队列中进行传递。如果将消息发送到不存在的队列,rabbitmq将会自动清除这些消息。
chan.basic_publish(   #发布
    exchange='',   
    routing_key='wyx', #路由键, 相当于字典里面的key
    body='hello wyx'  #发送的内容  相当于字典里面的value
)
print('sent hello wyx')
6.订阅

import pika
conn = pika.BlockingConnection(pika.ConnectionParameters(host = 'localhost')) #链接本地rabbitmq
chan = conn.channel() #创建链接频道
chan.queue_declare('wyx') #如果有wyx这个消息队列,则不创建,否则创建
def callback(ch,method,properties,body):
    print('revice %r' % body)  #接收发布者消息函数,并打印出内容
chan.basic_consume(callback,
                   queue='wyx',
                   no_ack=True)
print('wait')
chan.start_consuming()  

7.fanout类型 RabbitMQ
  上面的演示,消息是依次发送给绑定到该队列的接收端。如果要广播出去,就要使用交换机,本篇演示了交换机的工作方式也就是exchange的fanout类型
8.发布者

import pika
conn = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost')) #链接RabbitMQ
chan = conn.channel()  #创建频道
chan.exchange_declare(  #创建exchange,名字为logs,类型为fanout,所有队列都可以收到,如果没有log这个名字则创建
    exchange='logs',
    type = 'fanout'
)
mess = 'wyx' #需要发布的消息
chan.basic_publish(
    exchange='logs',  #exchage名字
    routing_key='',  #存放的键
    body=mess
)
print('hah %s' % mess)
conn.close()
9.订阅者

import pika
conn = pika.BlockingConnection(pika.ConnectionParameters(
    host='127.0.0.1',  #链接redis
))
chan = conn.channel()  #创建频道,根据链接创建
chan.exchange_declare(exchange='logs',type='fanout')  #创建exchange,名字为logs,类型为fanout,所有队列都可以收到,如果没有log这个名字则创建
result = chan.queue_declare(exclusive=True)  #随机创建一个队列
queue_name  = result.method.queue   #队列名字
chan.queue_bind(exchange='logs',queue=queue_name)  #把频道和所有队列绑定
print('wait')
def callback(ch,method,properties,body):  #body为接受的消息
    print('haaj %r' % body)
chan.basic_consume(callback,queue=queue_name,no_ack=True)  #no_ack 是否做持久话,False为做持久化,True为不做持久化
chan.start_consuming()

10. 远程结果返回
  在发布端执行一个命令,订阅者执行命令,并且返回结果

11.发布者

#!/usr/bin/env python3
#coding=utf8
import pika
class Center(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))  #链接本地rabbitmq
        self.channel = self.connection.channel()  #创建频道
        #定义接收返回消息的队列
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(self.on_response,
                                   no_ack=True,
                                   queue=self.callback_queue)
    #定义接收到返回消息的处理方法
    def on_response(self, ch, method, props, body):
        self.response = body

    def request(self, n):
        self.response = None
        #发送计算请求,并声明返回队列
        self.channel.basic_publish(exchange='',
                                   routing_key='exec_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         ),
                                   body=str(n))
        #接收返回的数据
        while self.response is None:
            self.connection.process_data_events()
        return self.response
center = Center()  #实例化#Center类,自动执行__init__函数
print( " 请输入想要执行的命令")
mess = input('Please enter the command you want to execute').strip()
response = center.request(mess)  #执行结果
print(" [.] 执行结果 %r" % (response,) )

12 订阅者

#!/usr/bin/env python3
#coding=utf8
import pika
import  subprocess
#连接rabbitmq服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()
#定义队列
channel.queue_declare(queue='exec_queue')
print( '等待执行命令')
#执行命令,并return
def exec_cmd(n):
    t = subprocess.Popen(n,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
    stdout = t.stdout.read()
    stderr = t.stderr.read()
    if stdout:
        return stdout
    else:
        return stderr
#定义接收到消息的处理方法
def request(ch, method, properties, body):
    print( " [.] 执行命令 (%s)"  % (body,))
    body = str(body,encoding='utf-8')
    response = exec_cmd(body)
    #将计算结果发送回控制中心
    ch.basic_publish(exchange='',
                     routing_key=properties.reply_to,
                     body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(request, queue='exec_queue')
channel.start_consuming()

13. 使用MySQLdb操作mysql数据库,并连接数据库
  
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import MySQLdb
# 打开数据库连接
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# 使用execute方法执行SQL语句
cursor.execute("SELECT VERSION()")
# 使用 fetchone() 方法获取一条数据库。
data = cursor.fetchone()
print("Database version : %s " % data)
# 关闭数据库连接
db.close()
14.创建数据库表

import MySQLdb
# 打开数据库连接
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# 如果数据表已经存在使用 execute() 方法删除表。
cursor.execute("DROP TABLE IF EXISTS EMPLOYEE")
# 创建数据表SQL语句
sql = """CREATE TABLE EMPLOYEE (
FIRST_NAME  CHAR(20) NOT NULL,
LAST_NAME  CHAR(20),
AGE INT,  
SEX CHAR(1),
INCOME FLOAT )"""
cursor.execute(sql)
# 关闭数据库连接
db.close()
15.数据库插入操作

import MySQLdb
# 打开数据库连接
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# SQL 插入语句
sql = """INSERT INTO EMPLOYEE(FIRST_NAME,
LAST_NAME, AGE, SEX, INCOME)
VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""
try:
# 执行sql语句
cursor.execute(sql)
# 提交到数据库执行
db.commit()
except:
# Rollback in case there is any error
db.rollback()
# 关闭数据库连接
db.close()

16.数据库查询操作

import MySQLdb
# 打开数据库连接
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# SQL 查询语句
sql = "SELECT * FROM EMPLOYEE \
WHERE INCOME > '%d'" % (1000)
try:
# 执行SQL语句
cursor.execute(sql)
# 获取所有记录列表
results = cursor.fetchall()
for row in results:
fname = row[0]
lname = row[1]
age = row[2]
sex = row[3]
income = row[4]
# 打印结果
print "fname=%s,lname=%s,age=%d,sex=%s,income=%d" % \
(fname, lname, age, sex, income )
except:
print("Error: unable to fecth data")
# 关闭数据库连接
db.close()


17.数据库更新操作

import MySQLdb
# 打开数据库连接
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# SQL 更新语句
sql = "UPDATE EMPLOYEE SET AGE = AGE + 1
WHERE SEX = '%c'" % ('M')
try:
# 执行SQL语句
cursor.execute(sql)
# 提交到数据库执行
db.commit()
except:
# 发生错误时回滚
db.rollback()
# 关闭数据库连接
db.close()
18.删除操作
import MySQLdb
# 打开数据库连接
db = MySQLdb.connect("localhost","testuser","test123","TESTDB" )
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# SQL 删除语句
sql = "DELETE FROM EMPLOYEE WHERE AGE > '%d'" % (20)
try:
# 执行SQL语句
cursor.execute(sql)
# 提交修改
db.commit()
except:
# 发生错误时回滚
db.rollback()
# 关闭连接
db.close()
19.事物机制,事务机制可以确保数据一致性。

# SQL删除记录语句
sql = "DELETE FROM EMPLOYEE WHERE AGE > '%d'" % (20)
try:
# 执行SQL语句
cursor.execute(sql)
# 向数据库提交
db.commit()
except:
# 发生错误时回滚
db.rollback()





  



  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-390746-1-1.html 上篇帖子: 循序渐进Python3(十)-- 0 下篇帖子: 分享个免费的货币汇率API
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表