设为首页 收藏本站
查看: 727|回复: 0

[经验分享] 基于binlog来分析mysql的行记录修改情况(python脚本分析)

[复制链接]

尚未签到

发表于 2017-12-13 12:33:54 | 显示全部楼层 |阅读模式
1 import pymysql  

  2 from pymysql.cursors import DictCursor  

  3 import re  

  4 import os  

  5 import sys  

  6 import datetime  

  7 import time  

  8 import logging  

  9 import importlib  

10 importlib.reload(logging)  

11 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ')  

12  
13
  
14 usage=''' usage: python [script's path] [option]
  
15 ALL options need to assign:
  
16
  
17 -h     : host, the database host,which database will store the results after analysis
  
18 -u     : user, the db user
  
19 -p     : password, the db user's password
  
20 -P     : port, the db port
  
21 -f     : file path, the binlog file
  
22 -tr    : table name for record , the table name to store the row record
  
23 -tt    : table name for transaction, the table name to store transactions
  
24 Example: python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f=/tmp/stock_binlog.log -tt=flashback.tbtran -tr=flashback.tbrow
  
25
  
26 '''
  
27

  
28>  
29     def __init__(self):
  
30         #初始化
  
31         self.host=''
  
32         self.user=''
  
33         self.password=''
  
34         self.port='3306'
  
35         self.fpath=''
  
36         self.tbrow=''
  
37         self.tbtran=''
  
38
  
39         self._get_db()
  
40         logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tb_for_record={},tb_for_tran={}'.format(self.host,self.user,self.port,self.fpath,self.tbrow,self.tbtran))
  
41
  
42         self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8')
  
43         self.cur = self.mysqlconn.cursor(cursor=DictCursor)
  
44         logging.info('MySQL which userd to store binlog event connection is ok')
  
45
  
46         self.begin_time=''
  
47         self.end_time=''
  
48         self.db_name=''
  
49         self.tb_name=''
  
50
  
51     def _get_db(self):
  
52         #解析用户输入的选项参数值,这里对password的处理是明文输入,可以自行处理成是input格式,
  
53         #由于可以拷贝binlog文件到非线上环境分析,所以password这块,没有特殊处理
  
54         logging.info('begin to assign values to parameters')
  
55         if len(sys.argv) == 1:
  
56             print(usage)
  
57             sys.exit(1)
  
58         elif sys.argv[1] == '--help':
  
59             print(usage)
  
60             sys.exit()
  
61         elif len(sys.argv) > 2:
  
62             for i in sys.argv[1:]:
  
63                 _argv = i.split('=')
  
64                 if _argv[0] == '-h':
  
65                     self.host = _argv[1]
  
66                 elif _argv[0] == '-u':
  
67                     self.user = _argv[1]
  
68                 elif _argv[0] == '-P':
  
69                     self.port = int(_argv[1])
  
70                 elif _argv[0] == '-f':
  
71                     self.fpath = _argv[1]
  
72                 elif _argv[0] == '-tr':
  
73                     self.tbrow = _argv[1]
  
74                 elif _argv[0] == '-tt':
  
75                     self.tbtran = _argv[1]
  
76                 elif _argv[0] == '-p':
  
77                     self.password = _argv[1]
  
78                 else:
  
79                     print(usage)
  
80
  
81     def create_tab(self):
  
82         #创建两个表格:一个用户存储事务情况,一个用户存储每一行数据修改的情况
  
83         #注意,一个事务可以存储多行数据修改的情况
  
84         logging.info('creating table ...')
  
85         create_tb_sql ='''CREATE TABLE IF NOT EXISTS  {} (
  
86                           `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  
87                           `begin_time` datetime NOT NULL,
  
88                           `end_time` datetime NOT NULL,
  
89                           PRIMARY KEY (`auto_id`)
  
90                         ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  
91                         CREATE TABLE IF NOT EXISTS  {} (
  
92                           `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  
93                           `sqltype` int(11) NOT NULL COMMENT '1 is insert,2 is update,3 is delete',
  
94                           `tran_num` int(11) NOT NULL COMMENT 'the transaction number',
  
95                           `dbname` varchar(50) NOT NULL,
  
96                           `tbname` varchar(50) NOT NULL,
  
97                           PRIMARY KEY (`auto_id`),
  
98                           KEY `sqltype` (`sqltype`),
  
99                           KEY `dbname` (`dbname`),
  
100                           KEY `tbname` (`tbname`)
  
101                         ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  
102                         truncate table {};
  
103                         truncate table {};
  
104                         '''.format(self.tbtran,self.tbrow,self.tbtran,self.tbrow)
  
105
  
106         self.cur.execute(create_tb_sql)
  
107         logging.info('created table {} and {}'.format(self.tbrow,self.tbtran))
  
108
  
109     def rowrecord(self):
  
110         #处理每一行binlog
  
111         #事务的结束采用 'Xid =' 来划分
  
112         #分析结果,按照一个事务为单位存储提交一次到db
  
113         try:
  
114             tran_num=1    #事务数
  
115             record_sql='' #行记录的insert sql
  
116             tran_sql=''   #事务的insert sql
  
117
  
118             self.create_tab()
  
119
  
120             with open(self.fpath,'r') as binlog_file:
  
121                 logging.info('begining to analyze the binlog file ,this may be take a long time !!!')
  
122                 logging.info('analyzing...')
  
123
  
124                 for bline in binlog_file:
  
125
  
126                     if bline.find('Table_map:') != -1:
  
127                         l = bline.index('server')
  
128                         n = bline.index('Table_map')
  
129                         begin_time = bline[:l:].rstrip(' ').replace('#', '20')
  
130
  
131                         if record_sql=='':
  
132                             self.begin_time = begin_time[0:4] + '-' + begin_time[4:6] + '-' + begin_time[6:]
  
133
  
134                         self.db_name = bline[n::].split(' ')[1].replace('`', '').split('.')[0]
  
135                         self.tb_name = bline[n::].split(' ')[1].replace('`', '').split('.')[1]
  
136                         bline=''
  
137
  
138                     elif bline.startswith('### INSERT INTO'):
  
139                        record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (1,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)
  
140
  
141                     elif bline.startswith('### UPDATE'):
  
142                        record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (2,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)
  
143
  
144                     elif bline.startswith('### DELETE FROM'):
  
145                        record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (3,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)
  
146
  
147                     elif bline.find('Xid =') != -1:
  
148
  
149                         l = bline.index('server')
  
150                         end_time = bline[:l:].rstrip(' ').replace('#', '20')
  
151                         self.end_time = end_time[0:4] + '-' + end_time[4:6] + '-' + end_time[6:]
  
152                         tran_sql=record_sql+"insert into {}(begin_time,end_time) VALUES ('{}','{}')".format(self.tbtran,self.begin_time,self.end_time)
  
153
  
154                         self.cur.execute(tran_sql)
  
155                         self.mysqlconn.commit()
  
156                         record_sql = ''
  
157                         tran_num += 1
  
158
  
159         except Exception:
  
160             return 'funtion rowrecord error'
  
161
  
162     def binlogdesc(self):
  
163         sql=''
  
164         t_num=0
  
165         r_num=0
  
166         logging.info('Analysed result printing...\n')
  
167         #分析总的事务数跟行修改数量
  
168         sql="select 'tbtran' name,count(*) nums from {}  union all select 'tbrow' name,count(*) nums from {};".format(self.tbtran,self.tbrow)
  
169         self.cur.execute(sql)
  
170         rows=self.cur.fetchall()
  
171         for row in rows:
  
172             if row['name']=='tbtran':
  
173                 t_num = row['nums']
  
174             else:
  
175                 r_num = row['nums']
  
176         print('This binlog file has {} transactions, {} rows are changed '.format(t_num,r_num))
  
177
  
178         # 计算 最耗时 的单个事务
  
179         # 分析每个事务的耗时情况,分为5个时间段来描述
  
180         # 这里正常应该是 以毫秒来分析的,但是binlog中,只精确时间到second
  
181         sql='''select
  
182                       count(case when cost_sec between 0 and 1 then 1 end ) cos_1,
  
183                       count(case when cost_sec between 1.1 and 5 then 1 end ) cos_5,
  
184                       count(case when cost_sec between 5.1 and 10 then 1 end ) cos_10,
  
185                       count(case when cost_sec between 10.1 and 30 then 1 end ) cos_30,
  
186                       count(case when cost_sec >30.1 then 1 end ) cos_more,
  
187                       max(cost_sec) cos_max
  
188                 from
  
189                 (
  
190                         select
  
191                             auto_id,timestampdiff(second,begin_time,end_time) cost_sec
  
192                         from {}
  
193                 ) a;'''.format(self.tbtran)
  
194         self.cur.execute(sql)
  
195         rows=self.cur.fetchall()
  
196
  
197         for row in rows:
  
198             print('The most cost time : {} '.format(row['cos_max']))
  
199             print('The distribution map of each transaction costed time: ')
  
200             print('Cost time between    0 and  1 second : {} , {}%'.format(row['cos_1'],int(row['cos_1']*100/t_num)))
  
201             print('Cost time between  1.1 and  5 second : {} , {}%'.format(row['cos_5'], int(row['cos_5'] * 100 / t_num)))
  
202             print('Cost time between  5.1 and 10 second : {} , {}%'.format(row['cos_10'], int(row['cos_10'] * 100 / t_num)))
  
203             print('Cost time between 10.1 and 30 second : {} , {}%'.format(row['cos_30'], int(row['cos_30'] * 100 / t_num)))
  
204             print('Cost time                     > 30.1 : {} , {}%\n'.format(row['cos_more'], int(row['cos_more'] * 100 / t_num)))
  
205
  
206         # 计算 单个事务影响行数最多 的行数量
  
207         # 分析每个事务 影响行数 情况,分为5个梯度来描述
  
208         sql='''select
  
209                     count(case when nums between 0 and 10 then 1 end ) row_1,
  
210                     count(case when nums between 11 and 100 then 1 end ) row_2,
  
211                     count(case when nums between 101 and 1000 then 1 end ) row_3,
  
212                     count(case when nums between 1001 and 10000 then 1 end ) row_4,
  
213                     count(case when nums >10001 then 1 end ) row_5,
  
214                     max(nums) row_max
  
215                from
  
216                   (
  
217                     select
  
218                              count(*) nums
  
219                     from {} group by tran_num
  
220                    ) a;'''.format(self.tbrow)
  
221         self.cur.execute(sql)
  
222         rows=self.cur.fetchall()
  
223
  
224         for row in rows:
  
225             print('The most changed rows for each row: {} '.format(row['row_max']))
  
226             print('The distribution map of each transaction changed rows : ')
  
227             print('Changed rows between    1 and    10 second : {} , {}%'.format(row['row_1'],int(row['row_1']*100/t_num)))
  
228             print('Changed rows between   11 and   100 second : {} , {}%'.format(row['row_2'], int(row['row_2'] * 100 / t_num)))
  
229             print('Changed rows between  101 and  1000 second : {} , {}%'.format(row['row_3'], int(row['row_3'] * 100 / t_num)))
  
230             print('Changed rows between 1001 and 10000 second : {} , {}%'.format(row['row_4'], int(row['row_4'] * 100 / t_num)))
  
231             print('Changed rows                       > 10001 : {} , {}%\n'.format(row['row_5'], int(row['row_5'] * 100 / t_num)))
  
232
  
233         # 分析 各个行数 DML的类型情况
  
234         # 描述 delete,insert,update的分布情况
  
235         sql='select sqltype ,count(*) nums from {} group by sqltype ;'.format(self.tbrow)
  
236         self.cur.execute(sql)
  
237         rows=self.cur.fetchall()
  
238
  
239         print('The distribution map of the {} changed rows : '.format(r_num))
  
240         for row in rows:
  
241
  
242             if row['sqltype']==1:
  
243                 print('INSERT rows :{} , {}% '.format(row['nums'],int(row['nums']*100/r_num)))
  
244             if row['sqltype']==2:
  
245                 print('UPDATE rows :{} , {}% '.format(row['nums'],int(row['nums']*100/r_num)))
  
246             if row['sqltype']==3:
  
247                 print('DELETE rows :{} , {}%\n '.format(row['nums'],int(row['nums']*100/r_num)))
  
248
  
249         # 描述 影响行数 最多的表格
  
250         # 可以分析是哪些表格频繁操作,这里显示前10个table name
  
251         sql = '''select
  
252                       dbname,tbname ,
  
253                       count(*) ALL_rows,
  
254                       count(*)*100/{} per,
  
255                       count(case when sqltype=1 then 1 end) INSERT_rows,
  
256                       count(case when sqltype=2 then 1 end) UPDATE_rows,
  
257                       count(case when sqltype=3 then 1 end) DELETE_rows
  
258                 from {}
  
259                 group by dbname,tbname
  
260                 order by ALL_rows desc
  
261                 limit 10;'''.format(r_num,self.tbrow)
  
262         self.cur.execute(sql)
  
263         rows = self.cur.fetchall()
  
264
  
265         print('The distribution map of the {} changed rows : '.format(r_num))
  
266         print('tablename'.ljust(50),
  
267               '|','changed_rows'.center(15),
  
268               '|','percent'.center(10),
  
269               '|','insert_rows'.center(18),
  
270               '|','update_rows'.center(18),
  
271               '|','delete_rows'.center(18)
  
272               )
  
273         print('-------------------------------------------------------------------------------------------------------------------------------------------------')
  
274         for row in rows:
  
275             print((row['dbname']+'.'+row['tbname']).ljust(50),
  
276                   '|',str(row['ALL_rows']).rjust(15),
  
277                   '|',(str(int(row['per']))+'%').rjust(10),
  
278                   '|',str(row['INSERT_rows']).rjust(10)+' , '+(str(int(row['INSERT_rows']*100/row['ALL_rows']))+'%').ljust(5),
  
279                   '|',str(row['UPDATE_rows']).rjust(10)+' , '+(str(int(row['UPDATE_rows']*100/row['ALL_rows']))+'%').ljust(5),
  
280                   '|',str(row['DELETE_rows']).rjust(10)+' , '+(str(int(row['DELETE_rows']*100/row['ALL_rows']))+'%').ljust(5),
  
281                   )
  
282         print('\n')
  
283
  
284         logging.info('Finished to analyse the binlog file !!!')
  
285
  
286     def closeconn(self):
  
287         self.cur.close()
  
288         logging.info('release db connections\n')
  
289
  
290 def main():
  
291     p = queryanalyse()
  
292     p.rowrecord()
  
293     p.binlogdesc()
  
294     p.closeconn()
  
295
  
296 if __name__ == "__main__":
  
297     main()

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-423631-1-1.html 上篇帖子: PMM 对MYSQL 的监控配制 下篇帖子: mysql学习(二):mysql 5.7设置远程登录
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表