|
后台cron_table管理
PHP安装扩展也可以真正的多线程,fork新的进程,php ,python 2种方式性能上差别多少不知道.
基于python 强大,简洁选用python 做核心功能.
使用mysql表存储cron任务,python守护进程每分钟读取MYSQL, 多线程分发fork子进程方式执行PHP代码.
具体流程:
1.cron 每分钟执行cron_table.py .
2.cron_table.py 读取cron 任务表,把符合当前时间执行的cron 记录状态更新为runing
3.cron_table_log 插入一条记录,cron_id,start_time
4.采用多线程方式fork守护子进程,等待子进程执行完,
5.子进程执行完子线程修改cron 任务的状态为no runing,更新log记录表的完成时间和子进程的输出信息.
cron_table功能:
1.web后台能添加修改cron 任务,可视化的操作.脱离需要操作服务器才能控制crontab
2.及时的记录每一条cron当前的执行状态,时间,以及历史执行状态记录..对cron任务
何时执行,执行时长,执行返回信息,等全面信息监控.对CRON 脚本优化,排查异常CRON等有直接的帮助.
快速找到死循环CRON,性能低下的CRON程序.
3.允许CRON 任务独占,许多任务是不能同时跑多次,做的独占机制能有效的避免上个周期跑的CRON 还没结束,
这个周期到了,继续跑..
4.可以设置cron最大运行时间
5.完全脱离cron管理和cron任务业务代码的联系.
有需要还允许后台控制kill cron 任务,可以不用shell 命令终止,发邮件,短信通知
相对比现在项目里的cron记录方式,在构造函数,析构函数做file_put_contents低效操作.
这种方式重大缺陷:CRON死循环不做析构函数写日志,CRON业务代码出错,整个CRON 不执行无法记录等..
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
| #!/usr/bin/python
#coding=utf-8
#引入 MySQL 模組
import MySQLdb, re, time, sys, subprocess, threading
def get_struct_time(time_stamp_int):
"""
按整型时间戳获取格式化时间 分 时 日 月 周
Args:
time_stamp_int 为传入的值为时间戳(整形),如:1332888820
经过localtime转换后变成
time.struct_time(tm_year=2012, tm_mon=3, tm_mday=28, tm_hour=6, tm_min=53, tm_sec=40, tm_wday=2, tm_yday=88, tm_isdst=0)
Return:
list____返回 分 时 日 月 周
"""
st_time = time.localtime(time_stamp_int)
return [st_time.tm_min, st_time.tm_hour, st_time.tm_mday, st_time.tm_mon, st_time.tm_wday]
def get_strptime(time_str, str_format):
"""从字符串获取 整型时间戳
Args:
time_str 字符串类型的时间戳 如 '31/Jul/2013:17:46:01'
str_format 指定 time_str 的格式 如 '%d/%b/%Y:%H:%M:%S'
Return:
返回10位整型(int)时间戳,如 1375146861
"""
return int(time.mktime(time.strptime(time_str, str_format)))
def get_str_time(time_stamp, str_format='%Y%m%d%H%M'):
"""
获取时间戳,
Args:
time_stamp 10位整型(int)时间戳,如 1375146861
str_format 指定返回格式,值类型为 字符串 str
Rturn:
返回格式 默认为 年月日时分,如2013年7月9日1时3分 :201207090103
"""
return time.strftime("%s" % str_format, time.localtime(time_stamp))
def match_cont(patten, cont):
"""
正则匹配(精确符合的匹配)
Args:
patten 正则表达式
cont____ 匹配内容
Return:
True or False
"""
res = re.match(patten, cont)
if res:
return True
else:
return False
def handle_num(val, ranges=(0, 100), res=list()):
"""处理纯数字"""
val = int(val)
if val >= ranges[0] and val <= ranges[1]:
res.append(val)
return res
def handle_nlist(val, ranges=(0, 100), res=list()):
"""处理数字列表 如 1,2,3,6"""
val_list = val.split(',')
for tmp_val in val_list:
tmp_val = int(tmp_val)
if tmp_val >= ranges[0] and tmp_val <= ranges[1]:
res.append(tmp_val)
return res
def handle_star(val, ranges=(0, 100), res=list()):
"""处理星号"""
if val == '*':
tmp_val = ranges[0]
while tmp_val <= ranges[1]:
res.append(tmp_val)
tmp_val = tmp_val + 1
return res
def handle_starnum(val, ranges=(0, 100), res=list()):
"""星号/数字 组合 如 */3"""
tmp = val.split('/')
val_step = int(tmp[1])
if val_step < 1:
return res
val_tmp = int(tmp[1])
while val_tmp <= ranges[1]:
res.append(val_tmp)
val_tmp = val_tmp + val_step
return res
def handle_range(val, ranges=(0, 100), res=list()):
"""处理区间 如 8-20"""
tmp = val.split('-')
range1 = int(tmp[0])
range2 = int(tmp[1])
tmp_val = range1
if range1 < 0:
return res
while tmp_val <= range2 and tmp_val <= ranges[1]:
res.append(tmp_val)
tmp_val = tmp_val + 1
return res
def handle_rangedv(val, ranges=(0, 100), res=list()):
"""处理区间/步长 组合 如 8-20/3 """
tmp = val.split('/')
range2 = tmp[0].split('-')
val_start = int(range2[0])
val_end = int(range2[1])
val_step = int(tmp[1])
if (val_step < 1) or (val_start < 0):
return res
val_tmp = val_start
while val_tmp <= val_end and val_tmp <= ranges[1]:
res.append(val_tmp)
val_tmp = val_tmp + val_step
return res
def parse_conf(conf, ranges=(0, 100), res=list()):
"""解析crontab 五个时间参数中的任意一个"""
#去除空格,再拆分
conf = conf.strip(' ').strip(' ')
conf_list = conf.split(',')
other_conf = []
number_conf = []
for conf_val in conf_list:
if match_cont(PATTEN['number'], conf_val):
#记录拆分后的纯数字参数
number_conf.append(conf_val)
else:
#记录拆分后纯数字以外的参数,如通配符 * , 区间 0-8, 及 0-8/3 之类
other_conf.append(conf_val)
if other_conf:
#处理纯数字外各种参数
for conf_val in other_conf:
for key, ptn in PATTEN.items():
if match_cont(ptn, conf_val):
res = PATTEN_HANDLER[key](val=conf_val, ranges=ranges, res=res)
if number_conf:
if len(number_conf) > 1 or other_conf:
#纯数字多于1,或纯数字与其它参数共存,则数字作为时间列表
res = handle_nlist(val=','.join(number_conf), ranges=ranges, res=res)
else:
#只有一个纯数字存在,则数字为时间 间隔
res = handle_num(val=number_conf[0], ranges=ranges, res=res)
return res
def parse_crontab_time(conf_string):
"""
解析crontab时间配置参数
Args:
conf_string 配置内容(共五个值:分 时 日 月 周)
取值范围 分钟:0-59 小时:1-23 日期:1-31 月份:1-12 星期:0-6(0表示周日)
Return:
crontab_range list格式,分 时 日 月 周 五个传入参数分别对应的取值范围
"""
time_limit = ((0, 59), (1, 23), (1, 31), (1, 12), (0, 6))
crontab_range = []
clist = []
conf_length = 5
tmp_list = conf_string.split(' ')
for val in tmp_list:
if len(clist) == conf_length:
break
if val:
clist.append(val)
if len(clist) != conf_length:
return -1, 'config error whith [%s]' % conf_string
cindex = 0
for conf in clist:
res_conf = []
res_conf = parse_conf(conf, ranges=time_limit[cindex], res=res_conf)
if not res_conf:
return -1, 'config error whith [%s]' % conf_string
crontab_range.append(res_conf)
cindex = cindex + 1
return 0, crontab_range
def time_match_crontab(crontab_time, time_struct):
"""
将时间戳与crontab配置中一行时间参数对比,判断该时间戳是否在配置设定的时间范围内
Args:
crontab_time____crontab配置中的五个时间(分 时 日 月 周)参数对应时间取值范围
time_struct____ 某个整型时间戳,如:1375027200 对应的 分 时 日 月 周
Return:
tuple 状态码, 状态描述
"""
cindex = 0
for val in time_struct:
if val not in crontab_time[cindex]:
return 0, False
cindex = cindex + 1
return 0, True
#crontab时间参数各种写法 的 正则匹配
PATTEN = {
#纯数字
'number':'^[0-9]+$',
#数字列表,如 1,2,3,6
'num_list':'^[0-9]+([,][0-9]+)+$',
#星号 *
'star':'^\*$',
#星号/数字 组合,如 */3
'star_num':'^\*\/[0-9]+$',
#区间 如 8-20
'range':'^[0-9]+[\-][0-9]+$',
#区间/步长 组合 如 8-20/3
'range_div':'^[0-9]+[\-][0-9]+[\/][0-9]+$'
#区间/步长 列表 组合,如 8-20/3,21,22,34
#'range_div_list':'^([0-9]+[\-][0-9]+[\/][0-9]+)([,][0-9]+)+$'
}
#各正则对应的处理方法
PATTEN_HANDLER = {
'number':handle_num,
'num_list':handle_nlist,
'star':handle_star,
'star_num':handle_starnum,
'range':handle_range,
'range_div':handle_rangedv
}
def mysqlConnect():
return MySQLdb.connect(host="192.168.51.129", user="root", passwd="123456", db="model")
def runCronTable(ns,control,method,id,logid):
child = subprocess.Popen('/usr/local/php-5.4.20/bin/php -c /usr/local/php-5.4.20/php.ini index.php --ns=%s --control=%s --method=%s --id=%s' % (ns,control,method,id),shell=True,stdout = subprocess.PIPE)
info,returnCode = child.communicate()
db = mysqlConnect()
cursor = db.cursor(MySQLdb.cursors.DictCursor)
cursor.execute('update cron_table set done_time=%s,run_status=0 where id=%s' , (time.time(),id))
cursor.execute('update cron_table_log set done_time=%s,info=%s where id=%s' , (time.time(),info,logid))
cursor.close()
db.close()
return True
if __name__ == '__main__':
#連接到 MySQL
db = mysqlConnect()
cursor = db.cursor(MySQLdb.cursors.DictCursor)
#執行 SQL 語句
cursor.execute("SELECT * FROM cron_table where status=0 and run_status=0")
result = cursor.fetchall()
threads = []
now = time.strftime('%M %H %d %m %w',time.localtime(time.time()))
for r in result:
res, desc = parse_crontab_time(r["cycle"])
if res == 0:
cron_time = desc
else:
cron_time = 0
time_stamp = int(time.time())
#解析 时间戳对应的 分 时 日 月 周
time_struct = get_struct_time(time_stamp)
match_res,isRun = time_match_crontab(cron_time, time_struct)
if isRun:
now = time.time()
print "%s.." % r["id"]
# 锁住cron 任务
cursor.execute("update cron_table set run_status=%s where id=%s" , (now,r["id"]))
# 添加日志
cursor.execute("insert cron_table_log (cron_id,start_time,done_time,info) VALUES (%s,%s,0,'')" , (r["id"],now))
thd = threading.Thread(target=runCronTable,args=(r["ns"],r["control"],r["method"],r["id"],int(db.insert_id())))
thd.setDaemon(True)
thd.start()
threads.append(thd)
db.close()
cursor.close()
for t in threads:
t.join()
print "\nExiting Main Thread\n"
|
MYSQL 表结构
CREATE TABLE `cron_table` ( `id` int(10) unsigned NOT NULL auto_increment, `ns` varchar(100) NOT NULL default '' COMMENT '命名空间', `control` varchar(50) NOT NULL default '' COMMENT '控制器', `method` varchar(50) NOT NULL default '' COMMENT '方法', `cycle` varchar(50) NOT NULL COMMENT '运行周期', `run_status` int(10) unsigned NOT NULL default '0' COMMENT '运行状态', `done_time` int(10) unsigned NOT NULL default '0' COMMENT '完成时间', `status` tinyint(4) unsigned NOT NULL default '1' COMMENT '开启关闭', `desc` varchar(500) NOT NULL default '' COMMENT 'CRON描述', PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `cron_table_log` ( `id` int(10) unsigned NOT NULL auto_increment, `cron_id` int(10) unsigned NOT NULL default '0' COMMENT 'cron_id', `start_time` int(10) unsigned NOT NULL default '0' COMMENT '开始执行时间', `done_time` int(10) unsigned NOT NULL default '0' COMMENT '脚本执行完成时间', `info` varchar(3000) NOT NULL default '' COMMENT '脚本输出', PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;
|
|