设为首页 收藏本站
查看: 492|回复: 0

[经验分享] [python] 连接MySQL,以及多线程、多进程连接MySQL初探

[复制链接]

尚未签到

发表于 2017-11-13 14:17:51 | 显示全部楼层 |阅读模式
                      环境:Linux CentOS6.7,python 2.7.13
说明:连接MySQL,进行增删改查操作,并将执行的SQL和耗时记录到日志里
demo:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import MySQLdb
import logging
import time

'''
设置日志输出路径和格式
'''
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='/home/sql.log',
                    filemode='a+')

'''
执行的SQL语句可能有多个,可以在这边进行组装
'''
def sql_assemble(*args):
    sql_list = [
        'delete from table_a where id = %s' % args[0],
        'delete from table_b where content in %s' % args[1],
        'update table_c set aa = %s' % args[2],
    ]
    return sql_list


class mysqlopr(object):
    def __init__(self,ip,port,username,password,dbname,char_set='utf8'):
    '''
    创建对象的时候就要求将参数传入进来
    '''
        self.ip = ip
        self.port = port
        self.username = username
        self.password = password
        self.dbname = dbname
        self.char_set = char_set

    def connect(self):
    '''
    mysql如果开启了安全模式,insert、delete和update无法执行,需要先设置SET SQL_SAFE_UPDATES = 0
    执行单条SQL,使用execute(),执行多条SQL,可以使用executemany(),但只有INSERT和REPLACE会执行的快点。说明如下
    executemany:This method improves performance on multiple-row INSERT and REPLACE. Otherwise it is equivalent to looping over args with execute().
    '''
        self.db = MySQLdb.connect(host=self.ip,port = self.port,user=self.username,passwd=self.password,db =self.dbname,charset=self.char_set)
        self.cursor = self.db.cursor()
        #sql_s = 'SET SQL_SAFE_UPDATES = 0'
        #self.cursor.execute(sql_s)
        logging.info('link to mysqldb: '+self.dbname)
    def select(self,sql):
    '''
    sst :查询开始时间
    set :查询结束时间
    fetchone(): 只返回一行
    fetchall(): 返回所有行
    fetchmany(size): 返回size行
    rowcount: 这是一个long型只读属性,返回执行execute()方法后影响的行数。
         
    '''
        try:
            logging.info(sql)
            self.index_list = []
            sst = time.time()
            self.cursor.execute(sql)
            set = time.time()
            #select_result = self.cursor.fetchone()
            #select_result = self.cursor.fetchmany(10)
            select_result = self.cursor.fetchall()
            #print self.cursor.rowcount,type(self.cursor.rowcount)
            logging.info('select count: ' + self.cursor.rowcount + ', cost :' + str(sst - set))
        except Exception as e:
            print e
            logging.error("Error: unable to fecth data" + str(e))
    def insert(self):
    '''
    和其他方法类似,这边省略
    '''
        pass
    def update(self):
    '''
    和其他方法类似,这边省略
    '''
        pass
    def delete(self,sql):
    '''
    dst:删除操作开始时间
    det:删除操作结束时间
    删除成功提交,失败回滚
    '''
        try:
            logging.info(sql)
            dst = time.time()
            self.cursor.execute(sql)
            self.commit()
            det = time.time()
            logging.info('delete row: ' + str(self.cursor.rowcount) + ', cost :' + str(det - dst))
        except Exception as e:
            print e
            self.rollback()
            logging.error(str(sql) + ',' + str(e))
    def commit(self):
        self.db.commit()
    def rollback(self):
        self.db.rollback()
    def close(self):
       self.cursor.close()
       self.db.close()


if __name__ == '__main__':
    st = time.time()
    a = mysqlopr(ip='192.168.1.1',port = 8888,username='try',password='123456',dbname ='trydb',char_set='utf8')
    a.connect()
    '''
    如果参数相同,SQL语句不同,可以现在sql_assemble方法中配置好SQL,再把参数传递进去。
    或者这边直接调用对应的方法执行
    这里过程省略
    '''
    a.close()
    et = time.time()
    logging.info("SQL executed over,cost: " + str(et -st))
    print et - st




多线程、多进程连接MySQL说明:
1、多线程连接:
起初我使用threading.thread模块,先建立一个MySQL连接,然后由多个线程来执行具体的SQL。但发现在执行的时候,不是报MySQL连接被关闭,就是出现其他异常错误。上网查询,是因为多个线程无法共享一个数据库连接,会出现不可预测的情况,官方文档说明如下:
The MySQL protocol can not handle multiple threads using the same connection at once. Some earlier versions of MySQLdb utilized locking to achieve a threadsafety of 2. While this is not terribly hard to accomplish using the standard Cursor class (which uses mysql_store_result()), it is complicated by SSCursor (which uses mysql_use_result(); with the latter you must ensure all the rows have been read before another query can be executed. It is further complicated by the addition of transactions, since transactions start when a cursor execute a query, but end when COMMIT or ROLLBACK is executed by the Connection object. Two threads simply cannot share a connection while a transaction is in progress, in addition to not being able to share it during query execution. This excessively complicated the code to the point where it just isn't worth it.


The general upshot of this is: Don't share connections between threads. It's really not worth your effort or mine, and in the end, will probably hurt performance, since the MySQL server runs a separate thread for each connection. You can certainly do things like cache connections in a pool, and give those connections to one thread at a time. If you let two threads use a connection simultaneously, the MySQL client library will probably upchuck and die. You have been warned.

For threaded applications, try using a connection pool. This can be done using the Pool module.

官方建议使用连接池模块,参照了他人的做法,使用DBUtils.PooledDB来创建线程连接池,一次性创建多个连接,但是查询MySQL中的连接执行SQL语句情况,发现虽然建立了多个连接,但是只有一个连接有在执行SQL,其他连接都是sleep状态,至今不明白为何是这情况。网上的例子都是抄来抄去的,没人对使用DBUtils模块连接MySQL执行多线程操作的情况及效率做进一步的研究和分析。

2、多进程连接:
后面没辙,改用多进程(multiprocessing模块),可以和MySQL建立多个连接并发执行SQL。对比执行耗时,整体性能比单个进程快,但其中单个SQL的执行效率,多进程没有没有单进程执行的快。
demo:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import MySQLdb
from multiprocessing import Process,Pool

class mysqlopr():
    '''省略'''

pool = Pool(5)        ####设置进程数
for i in range(10):
    pool.apply_async(func=run_sql_func, args=(arg,))       ####异步执行
    #pool.apply(func=run_sql_func, args=(arg,))                    ####同步执行,官方不建议使用,python3.+版本已无该方法
pool.close()
pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。




                  


运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-407019-1-1.html 上篇帖子: mysqldump实现简单备份恢复 下篇帖子: mysql主从同步
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表