基于binlog来分析mysql的行记录修改情况(python脚本分析)
1 import pymysql2 from pymysql.cursors import DictCursor
3 import re
4 import os
5 import sys
6 import datetime
7 import time
8 import logging
9 import importlib
10 importlib.reload(logging)
11 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ')
12
13
14 usage=''' usage: python
15 ALL options need to assign:
16
17 -h : host, the database host,which database will store the results after analysis
18 -u : user, the db user
19 -p : password, the db user's password
20 -P : port, the db port
21 -f : file path, the binlog file
22 -tr : table name for record , the table name to store the row record
23 -tt : table name for transaction, the table name to store transactions
24 Example: python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f=/tmp/stock_binlog.log -tt=flashback.tbtran -tr=flashback.tbrow
25
26 '''
27
28>
29 def __init__(self):
30 #初始化
31 self.host=''
32 self.user=''
33 self.password=''
34 self.port='3306'
35 self.fpath=''
36 self.tbrow=''
37 self.tbtran=''
38
39 self._get_db()
40 logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tb_for_record={},tb_for_tran={}'.format(self.host,self.user,self.port,self.fpath,self.tbrow,self.tbtran))
41
42 self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8')
43 self.cur = self.mysqlconn.cursor(cursor=DictCursor)
44 logging.info('MySQL which userd to store binlog event connection is ok')
45
46 self.begin_time=''
47 self.end_time=''
48 self.db_name=''
49 self.tb_name=''
50
51 def _get_db(self):
52 #解析用户输入的选项参数值,这里对password的处理是明文输入,可以自行处理成是input格式,
53 #由于可以拷贝binlog文件到非线上环境分析,所以password这块,没有特殊处理
54 logging.info('begin to assign values to parameters')
55 if len(sys.argv) == 1:
56 print(usage)
57 sys.exit(1)
58 elif sys.argv == '--help':
59 print(usage)
60 sys.exit()
61 elif len(sys.argv) > 2:
62 for i in sys.argv:
63 _argv = i.split('=')
64 if _argv == '-h':
65 self.host = _argv
66 elif _argv == '-u':
67 self.user = _argv
68 elif _argv == '-P':
69 self.port = int(_argv)
70 elif _argv == '-f':
71 self.fpath = _argv
72 elif _argv == '-tr':
73 self.tbrow = _argv
74 elif _argv == '-tt':
75 self.tbtran = _argv
76 elif _argv == '-p':
77 self.password = _argv
78 else:
79 print(usage)
80
81 def create_tab(self):
82 #创建两个表格:一个用户存储事务情况,一个用户存储每一行数据修改的情况
83 #注意,一个事务可以存储多行数据修改的情况
84 logging.info('creating table ...')
85 create_tb_sql ='''CREATE TABLE IF NOT EXISTS{} (
86 `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
87 `begin_time` datetime NOT NULL,
88 `end_time` datetime NOT NULL,
89 PRIMARY KEY (`auto_id`)
90 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
91 CREATE TABLE IF NOT EXISTS{} (
92 `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
93 `sqltype` int(11) NOT NULL COMMENT '1 is insert,2 is update,3 is delete',
94 `tran_num` int(11) NOT NULL COMMENT 'the transaction number',
95 `dbname` varchar(50) NOT NULL,
96 `tbname` varchar(50) NOT NULL,
97 PRIMARY KEY (`auto_id`),
98 KEY `sqltype` (`sqltype`),
99 KEY `dbname` (`dbname`),
100 KEY `tbname` (`tbname`)
101 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
102 truncate table {};
103 truncate table {};
104 '''.format(self.tbtran,self.tbrow,self.tbtran,self.tbrow)
105
106 self.cur.execute(create_tb_sql)
107 logging.info('created table {} and {}'.format(self.tbrow,self.tbtran))
108
109 def rowrecord(self):
110 #处理每一行binlog
111 #事务的结束采用 'Xid =' 来划分
112 #分析结果,按照一个事务为单位存储提交一次到db
113 try:
114 tran_num=1 #事务数
115 record_sql='' #行记录的insert sql
116 tran_sql='' #事务的insert sql
117
118 self.create_tab()
119
120 with open(self.fpath,'r') as binlog_file:
121 logging.info('begining to analyze the binlog file ,this may be take a long time !!!')
122 logging.info('analyzing...')
123
124 for bline in binlog_file:
125
126 if bline.find('Table_map:') != -1:
127 l = bline.index('server')
128 n = bline.index('Table_map')
129 begin_time = bline[:l:].rstrip(' ').replace('#', '20')
130
131 if record_sql=='':
132 self.begin_time = begin_time + '-' + begin_time + '-' + begin_time
133
134 self.db_name = bline.split(' ').replace('`', '').split('.')
135 self.tb_name = bline.split(' ').replace('`', '').split('.')
136 bline=''
137
138 elif bline.startswith('### INSERT INTO'):
139 record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (1,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)
140
141 elif bline.startswith('### UPDATE'):
142 record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (2,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)
143
144 elif bline.startswith('### DELETE FROM'):
145 record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (3,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)
146
147 elif bline.find('Xid =') != -1:
148
149 l = bline.index('server')
150 end_time = bline[:l:].rstrip(' ').replace('#', '20')
151 self.end_time = end_time + '-' + end_time + '-' + end_time
152 tran_sql=record_sql+"insert into {}(begin_time,end_time) VALUES ('{}','{}')".format(self.tbtran,self.begin_time,self.end_time)
153
154 self.cur.execute(tran_sql)
155 self.mysqlconn.commit()
156 record_sql = ''
157 tran_num += 1
158
159 except Exception:
160 return 'funtion rowrecord error'
161
162 def binlogdesc(self):
163 sql=''
164 t_num=0
165 r_num=0
166 logging.info('Analysed result printing...\n')
167 #分析总的事务数跟行修改数量
168 sql="select 'tbtran' name,count(*) nums from {}union all select 'tbrow' name,count(*) nums from {};".format(self.tbtran,self.tbrow)
169 self.cur.execute(sql)
170 rows=self.cur.fetchall()
171 for row in rows:
172 if row['name']=='tbtran':
173 t_num = row['nums']
174 else:
175 r_num = row['nums']
176 print('This binlog file has {} transactions, {} rows are changed '.format(t_num,r_num))
177
178 # 计算 最耗时 的单个事务
179 # 分析每个事务的耗时情况,分为5个时间段来描述
180 # 这里正常应该是 以毫秒来分析的,但是binlog中,只精确时间到second
181 sql='''select
182 count(case when cost_sec between 0 and 1 then 1 end ) cos_1,
183 count(case when cost_sec between 1.1 and 5 then 1 end ) cos_5,
184 count(case when cost_sec between 5.1 and 10 then 1 end ) cos_10,
185 count(case when cost_sec between 10.1 and 30 then 1 end ) cos_30,
186 count(case when cost_sec >30.1 then 1 end ) cos_more,
187 max(cost_sec) cos_max
188 from
189 (
190 select
191 auto_id,timestampdiff(second,begin_time,end_time) cost_sec
192 from {}
193 ) a;'''.format(self.tbtran)
194 self.cur.execute(sql)
195 rows=self.cur.fetchall()
196
197 for row in rows:
198 print('The most cost time : {} '.format(row['cos_max']))
199 print('The distribution map of each transaction costed time: ')
200 print('Cost time between 0 and1 second : {} , {}%'.format(row['cos_1'],int(row['cos_1']*100/t_num)))
201 print('Cost time between1.1 and5 second : {} , {}%'.format(row['cos_5'], int(row['cos_5'] * 100 / t_num)))
202 print('Cost time between5.1 and 10 second : {} , {}%'.format(row['cos_10'], int(row['cos_10'] * 100 / t_num)))
203 print('Cost time between 10.1 and 30 second : {} , {}%'.format(row['cos_30'], int(row['cos_30'] * 100 / t_num)))
204 print('Cost time > 30.1 : {} , {}%\n'.format(row['cos_more'], int(row['cos_more'] * 100 / t_num)))
205
206 # 计算 单个事务影响行数最多 的行数量
207 # 分析每个事务 影响行数 情况,分为5个梯度来描述
208 sql='''select
209 count(case when nums between 0 and 10 then 1 end ) row_1,
210 count(case when nums between 11 and 100 then 1 end ) row_2,
211 count(case when nums between 101 and 1000 then 1 end ) row_3,
212 count(case when nums between 1001 and 10000 then 1 end ) row_4,
213 count(case when nums >10001 then 1 end ) row_5,
214 max(nums) row_max
215 from
216 (
217 select
218 count(*) nums
219 from {} group by tran_num
220 ) a;'''.format(self.tbrow)
221 self.cur.execute(sql)
222 rows=self.cur.fetchall()
223
224 for row in rows:
225 print('The most changed rows for each row: {} '.format(row['row_max']))
226 print('The distribution map of each transaction changed rows : ')
227 print('Changed rows between 1 and 10 second : {} , {}%'.format(row['row_1'],int(row['row_1']*100/t_num)))
228 print('Changed rows between 11 and 100 second : {} , {}%'.format(row['row_2'], int(row['row_2'] * 100 / t_num)))
229 print('Changed rows between101 and1000 second : {} , {}%'.format(row['row_3'], int(row['row_3'] * 100 / t_num)))
230 print('Changed rows between 1001 and 10000 second : {} , {}%'.format(row['row_4'], int(row['row_4'] * 100 / t_num)))
231 print('Changed rows > 10001 : {} , {}%\n'.format(row['row_5'], int(row['row_5'] * 100 / t_num)))
232
233 # 分析 各个行数 DML的类型情况
234 # 描述 delete,insert,update的分布情况
235 sql='select sqltype ,count(*) nums from {} group by sqltype ;'.format(self.tbrow)
236 self.cur.execute(sql)
237 rows=self.cur.fetchall()
238
239 print('The distribution map of the {} changed rows : '.format(r_num))
240 for row in rows:
241
242 if row['sqltype']==1:
243 print('INSERT rows :{} , {}% '.format(row['nums'],int(row['nums']*100/r_num)))
244 if row['sqltype']==2:
245 print('UPDATE rows :{} , {}% '.format(row['nums'],int(row['nums']*100/r_num)))
246 if row['sqltype']==3:
247 print('DELETE rows :{} , {}%\n '.format(row['nums'],int(row['nums']*100/r_num)))
248
249 # 描述 影响行数 最多的表格
250 # 可以分析是哪些表格频繁操作,这里显示前10个table name
251 sql = '''select
252 dbname,tbname ,
253 count(*) ALL_rows,
254 count(*)*100/{} per,
255 count(case when sqltype=1 then 1 end) INSERT_rows,
256 count(case when sqltype=2 then 1 end) UPDATE_rows,
257 count(case when sqltype=3 then 1 end) DELETE_rows
258 from {}
259 group by dbname,tbname
260 order by ALL_rows desc
261 limit 10;'''.format(r_num,self.tbrow)
262 self.cur.execute(sql)
263 rows = self.cur.fetchall()
264
265 print('The distribution map of the {} changed rows : '.format(r_num))
266 print('tablename'.ljust(50),
267 '|','changed_rows'.center(15),
268 '|','percent'.center(10),
269 '|','insert_rows'.center(18),
270 '|','update_rows'.center(18),
271 '|','delete_rows'.center(18)
272 )
273 print('-------------------------------------------------------------------------------------------------------------------------------------------------')
274 for row in rows:
275 print((row['dbname']+'.'+row['tbname']).ljust(50),
276 '|',str(row['ALL_rows']).rjust(15),
277 '|',(str(int(row['per']))+'%').rjust(10),
278 '|',str(row['INSERT_rows']).rjust(10)+' , '+(str(int(row['INSERT_rows']*100/row['ALL_rows']))+'%').ljust(5),
279 '|',str(row['UPDATE_rows']).rjust(10)+' , '+(str(int(row['UPDATE_rows']*100/row['ALL_rows']))+'%').ljust(5),
280 '|',str(row['DELETE_rows']).rjust(10)+' , '+(str(int(row['DELETE_rows']*100/row['ALL_rows']))+'%').ljust(5),
281 )
282 print('\n')
283
284 logging.info('Finished to analyse the binlog file !!!')
285
286 def closeconn(self):
287 self.cur.close()
288 logging.info('release db connections\n')
289
290 def main():
291 p = queryanalyse()
292 p.rowrecord()
293 p.binlogdesc()
294 p.closeconn()
295
296 if __name__ == "__main__":
297 main()
页:
[1]