opo 发表于 2017-12-13 12:33:54

基于binlog来分析mysql的行记录修改情况(python脚本分析)

1 import pymysql  

2 from pymysql.cursors import DictCursor  

3 import re  

4 import os  

5 import sys  

6 import datetime  

7 import time  

8 import logging  

9 import importlib  

10 importlib.reload(logging)  

11 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ')  

12  
13
  
14 usage=''' usage: python
  
15 ALL options need to assign:
  
16
  
17 -h   : host, the database host,which database will store the results after analysis
  
18 -u   : user, the db user
  
19 -p   : password, the db user's password
  
20 -P   : port, the db port
  
21 -f   : file path, the binlog file
  
22 -tr    : table name for record , the table name to store the row record
  
23 -tt    : table name for transaction, the table name to store transactions
  
24 Example: python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f=/tmp/stock_binlog.log -tt=flashback.tbtran -tr=flashback.tbrow
  
25
  
26 '''
  
27

  
28>  
29   def __init__(self):
  
30         #初始化
  
31         self.host=''
  
32         self.user=''
  
33         self.password=''
  
34         self.port='3306'
  
35         self.fpath=''
  
36         self.tbrow=''
  
37         self.tbtran=''
  
38
  
39         self._get_db()
  
40         logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tb_for_record={},tb_for_tran={}'.format(self.host,self.user,self.port,self.fpath,self.tbrow,self.tbtran))
  
41
  
42         self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8')
  
43         self.cur = self.mysqlconn.cursor(cursor=DictCursor)
  
44         logging.info('MySQL which userd to store binlog event connection is ok')
  
45
  
46         self.begin_time=''
  
47         self.end_time=''
  
48         self.db_name=''
  
49         self.tb_name=''
  
50
  
51   def _get_db(self):
  
52         #解析用户输入的选项参数值,这里对password的处理是明文输入,可以自行处理成是input格式,
  
53         #由于可以拷贝binlog文件到非线上环境分析,所以password这块,没有特殊处理
  
54         logging.info('begin to assign values to parameters')
  
55         if len(sys.argv) == 1:
  
56             print(usage)
  
57             sys.exit(1)
  
58         elif sys.argv == '--help':
  
59             print(usage)
  
60             sys.exit()
  
61         elif len(sys.argv) > 2:
  
62             for i in sys.argv:
  
63               _argv = i.split('=')
  
64               if _argv == '-h':
  
65                     self.host = _argv
  
66               elif _argv == '-u':
  
67                     self.user = _argv
  
68               elif _argv == '-P':
  
69                     self.port = int(_argv)
  
70               elif _argv == '-f':
  
71                     self.fpath = _argv
  
72               elif _argv == '-tr':
  
73                     self.tbrow = _argv
  
74               elif _argv == '-tt':
  
75                     self.tbtran = _argv
  
76               elif _argv == '-p':
  
77                     self.password = _argv
  
78               else:
  
79                     print(usage)
  
80
  
81   def create_tab(self):
  
82         #创建两个表格:一个用户存储事务情况,一个用户存储每一行数据修改的情况
  
83         #注意,一个事务可以存储多行数据修改的情况
  
84         logging.info('creating table ...')
  
85         create_tb_sql ='''CREATE TABLE IF NOT EXISTS{} (
  
86                           `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  
87                           `begin_time` datetime NOT NULL,
  
88                           `end_time` datetime NOT NULL,
  
89                           PRIMARY KEY (`auto_id`)
  
90                         ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  
91                         CREATE TABLE IF NOT EXISTS{} (
  
92                           `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  
93                           `sqltype` int(11) NOT NULL COMMENT '1 is insert,2 is update,3 is delete',
  
94                           `tran_num` int(11) NOT NULL COMMENT 'the transaction number',
  
95                           `dbname` varchar(50) NOT NULL,
  
96                           `tbname` varchar(50) NOT NULL,
  
97                           PRIMARY KEY (`auto_id`),
  
98                           KEY `sqltype` (`sqltype`),
  
99                           KEY `dbname` (`dbname`),
  
100                           KEY `tbname` (`tbname`)
  
101                         ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  
102                         truncate table {};
  
103                         truncate table {};
  
104                         '''.format(self.tbtran,self.tbrow,self.tbtran,self.tbrow)
  
105
  
106         self.cur.execute(create_tb_sql)
  
107         logging.info('created table {} and {}'.format(self.tbrow,self.tbtran))
  
108
  
109   def rowrecord(self):
  
110         #处理每一行binlog
  
111         #事务的结束采用 'Xid =' 来划分
  
112         #分析结果,按照一个事务为单位存储提交一次到db
  
113         try:
  
114             tran_num=1    #事务数
  
115             record_sql='' #行记录的insert sql
  
116             tran_sql=''   #事务的insert sql
  
117
  
118             self.create_tab()
  
119
  
120             with open(self.fpath,'r') as binlog_file:
  
121               logging.info('begining to analyze the binlog file ,this may be take a long time !!!')
  
122               logging.info('analyzing...')
  
123
  
124               for bline in binlog_file:
  
125
  
126                     if bline.find('Table_map:') != -1:
  
127                         l = bline.index('server')
  
128                         n = bline.index('Table_map')
  
129                         begin_time = bline[:l:].rstrip(' ').replace('#', '20')
  
130
  
131                         if record_sql=='':
  
132                           self.begin_time = begin_time + '-' + begin_time + '-' + begin_time
  
133
  
134                         self.db_name = bline.split(' ').replace('`', '').split('.')
  
135                         self.tb_name = bline.split(' ').replace('`', '').split('.')
  
136                         bline=''
  
137
  
138                     elif bline.startswith('### INSERT INTO'):
  
139                        record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (1,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)
  
140
  
141                     elif bline.startswith('### UPDATE'):
  
142                        record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (2,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)
  
143
  
144                     elif bline.startswith('### DELETE FROM'):
  
145                        record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (3,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)
  
146
  
147                     elif bline.find('Xid =') != -1:
  
148
  
149                         l = bline.index('server')
  
150                         end_time = bline[:l:].rstrip(' ').replace('#', '20')
  
151                         self.end_time = end_time + '-' + end_time + '-' + end_time
  
152                         tran_sql=record_sql+"insert into {}(begin_time,end_time) VALUES ('{}','{}')".format(self.tbtran,self.begin_time,self.end_time)
  
153
  
154                         self.cur.execute(tran_sql)
  
155                         self.mysqlconn.commit()
  
156                         record_sql = ''
  
157                         tran_num += 1
  
158
  
159         except Exception:
  
160             return 'funtion rowrecord error'
  
161
  
162   def binlogdesc(self):
  
163         sql=''
  
164         t_num=0
  
165         r_num=0
  
166         logging.info('Analysed result printing...\n')
  
167         #分析总的事务数跟行修改数量
  
168         sql="select 'tbtran' name,count(*) nums from {}union all select 'tbrow' name,count(*) nums from {};".format(self.tbtran,self.tbrow)
  
169         self.cur.execute(sql)
  
170         rows=self.cur.fetchall()
  
171         for row in rows:
  
172             if row['name']=='tbtran':
  
173               t_num = row['nums']
  
174             else:
  
175               r_num = row['nums']
  
176         print('This binlog file has {} transactions, {} rows are changed '.format(t_num,r_num))
  
177
  
178         # 计算 最耗时 的单个事务
  
179         # 分析每个事务的耗时情况,分为5个时间段来描述
  
180         # 这里正常应该是 以毫秒来分析的,但是binlog中,只精确时间到second
  
181         sql='''select
  
182                     count(case when cost_sec between 0 and 1 then 1 end ) cos_1,
  
183                     count(case when cost_sec between 1.1 and 5 then 1 end ) cos_5,
  
184                     count(case when cost_sec between 5.1 and 10 then 1 end ) cos_10,
  
185                     count(case when cost_sec between 10.1 and 30 then 1 end ) cos_30,
  
186                     count(case when cost_sec >30.1 then 1 end ) cos_more,
  
187                     max(cost_sec) cos_max
  
188               from
  
189               (
  
190                         select
  
191                           auto_id,timestampdiff(second,begin_time,end_time) cost_sec
  
192                         from {}
  
193               ) a;'''.format(self.tbtran)
  
194         self.cur.execute(sql)
  
195         rows=self.cur.fetchall()
  
196
  
197         for row in rows:
  
198             print('The most cost time : {} '.format(row['cos_max']))
  
199             print('The distribution map of each transaction costed time: ')
  
200             print('Cost time between    0 and1 second : {} , {}%'.format(row['cos_1'],int(row['cos_1']*100/t_num)))
  
201             print('Cost time between1.1 and5 second : {} , {}%'.format(row['cos_5'], int(row['cos_5'] * 100 / t_num)))
  
202             print('Cost time between5.1 and 10 second : {} , {}%'.format(row['cos_10'], int(row['cos_10'] * 100 / t_num)))
  
203             print('Cost time between 10.1 and 30 second : {} , {}%'.format(row['cos_30'], int(row['cos_30'] * 100 / t_num)))
  
204             print('Cost time                     > 30.1 : {} , {}%\n'.format(row['cos_more'], int(row['cos_more'] * 100 / t_num)))
  
205
  
206         # 计算 单个事务影响行数最多 的行数量
  
207         # 分析每个事务 影响行数 情况,分为5个梯度来描述
  
208         sql='''select
  
209                     count(case when nums between 0 and 10 then 1 end ) row_1,
  
210                     count(case when nums between 11 and 100 then 1 end ) row_2,
  
211                     count(case when nums between 101 and 1000 then 1 end ) row_3,
  
212                     count(case when nums between 1001 and 10000 then 1 end ) row_4,
  
213                     count(case when nums >10001 then 1 end ) row_5,
  
214                     max(nums) row_max
  
215                from
  
216                   (
  
217                     select
  
218                              count(*) nums
  
219                     from {} group by tran_num
  
220                  ) a;'''.format(self.tbrow)
  
221         self.cur.execute(sql)
  
222         rows=self.cur.fetchall()
  
223
  
224         for row in rows:
  
225             print('The most changed rows for each row: {} '.format(row['row_max']))
  
226             print('The distribution map of each transaction changed rows : ')
  
227             print('Changed rows between    1 and    10 second : {} , {}%'.format(row['row_1'],int(row['row_1']*100/t_num)))
  
228             print('Changed rows between   11 and   100 second : {} , {}%'.format(row['row_2'], int(row['row_2'] * 100 / t_num)))
  
229             print('Changed rows between101 and1000 second : {} , {}%'.format(row['row_3'], int(row['row_3'] * 100 / t_num)))
  
230             print('Changed rows between 1001 and 10000 second : {} , {}%'.format(row['row_4'], int(row['row_4'] * 100 / t_num)))
  
231             print('Changed rows                     > 10001 : {} , {}%\n'.format(row['row_5'], int(row['row_5'] * 100 / t_num)))
  
232
  
233         # 分析 各个行数 DML的类型情况
  
234         # 描述 delete,insert,update的分布情况
  
235         sql='select sqltype ,count(*) nums from {} group by sqltype ;'.format(self.tbrow)
  
236         self.cur.execute(sql)
  
237         rows=self.cur.fetchall()
  
238
  
239         print('The distribution map of the {} changed rows : '.format(r_num))
  
240         for row in rows:
  
241
  
242             if row['sqltype']==1:
  
243               print('INSERT rows :{} , {}% '.format(row['nums'],int(row['nums']*100/r_num)))
  
244             if row['sqltype']==2:
  
245               print('UPDATE rows :{} , {}% '.format(row['nums'],int(row['nums']*100/r_num)))
  
246             if row['sqltype']==3:
  
247               print('DELETE rows :{} , {}%\n '.format(row['nums'],int(row['nums']*100/r_num)))
  
248
  
249         # 描述 影响行数 最多的表格
  
250         # 可以分析是哪些表格频繁操作,这里显示前10个table name
  
251         sql = '''select
  
252                     dbname,tbname ,
  
253                     count(*) ALL_rows,
  
254                     count(*)*100/{} per,
  
255                     count(case when sqltype=1 then 1 end) INSERT_rows,
  
256                     count(case when sqltype=2 then 1 end) UPDATE_rows,
  
257                     count(case when sqltype=3 then 1 end) DELETE_rows
  
258               from {}
  
259               group by dbname,tbname
  
260               order by ALL_rows desc
  
261               limit 10;'''.format(r_num,self.tbrow)
  
262         self.cur.execute(sql)
  
263         rows = self.cur.fetchall()
  
264
  
265         print('The distribution map of the {} changed rows : '.format(r_num))
  
266         print('tablename'.ljust(50),
  
267               '|','changed_rows'.center(15),
  
268               '|','percent'.center(10),
  
269               '|','insert_rows'.center(18),
  
270               '|','update_rows'.center(18),
  
271               '|','delete_rows'.center(18)
  
272               )
  
273         print('-------------------------------------------------------------------------------------------------------------------------------------------------')
  
274         for row in rows:
  
275             print((row['dbname']+'.'+row['tbname']).ljust(50),
  
276                   '|',str(row['ALL_rows']).rjust(15),
  
277                   '|',(str(int(row['per']))+'%').rjust(10),
  
278                   '|',str(row['INSERT_rows']).rjust(10)+' , '+(str(int(row['INSERT_rows']*100/row['ALL_rows']))+'%').ljust(5),
  
279                   '|',str(row['UPDATE_rows']).rjust(10)+' , '+(str(int(row['UPDATE_rows']*100/row['ALL_rows']))+'%').ljust(5),
  
280                   '|',str(row['DELETE_rows']).rjust(10)+' , '+(str(int(row['DELETE_rows']*100/row['ALL_rows']))+'%').ljust(5),
  
281                   )
  
282         print('\n')
  
283
  
284         logging.info('Finished to analyse the binlog file !!!')
  
285
  
286   def closeconn(self):
  
287         self.cur.close()
  
288         logging.info('release db connections\n')
  
289
  
290 def main():
  
291   p = queryanalyse()
  
292   p.rowrecord()
  
293   p.binlogdesc()
  
294   p.closeconn()
  
295
  
296 if __name__ == "__main__":
  
297   main()
页: [1]
查看完整版本: 基于binlog来分析mysql的行记录修改情况(python脚本分析)