设为首页 收藏本站
查看: 1010|回复: 0

[经验分享] openstack中运行定时任务的两种方法及源代码分析

[复制链接]

尚未签到

发表于 2015-4-12 10:56:11 | 显示全部楼层 |阅读模式
启动一个进程,如要想要这个进程的某个方法定时得进行执行的话,在openstack有两种方式: 一种是通过继承 periodic_task.PeriodicTasks,另一种是使用loopingcall.py,针对两种方式分别说一下实现原理。

(1) 继承periodic_task.PeriodicTasks
         这种方式比较复杂,用到了python中的一些比较高级的特性,装饰器和元类;首先看一下periodic_task.py,在nova/openstack/common中,其他组件也有。
         看一下PeriodicTasks 这个类。
144 @six.add_metaclass(_PeriodicTasksMeta)
145 class PeriodicTasks(object):
146     def __init__(self):
147         super(PeriodicTasks, self).__init__()
148         self._periodic_last_run = {}
149         for name, task in self._periodic_tasks:
150             self._periodic_last_run[name] = task._periodic_last_run
151
152     def run_periodic_tasks(self, context, raise_on_error=False):
153         """Tasks to be run at a periodic interval."""
154         idle_for = DEFAULT_INTERVAL
155         for task_name, task in self._periodic_tasks:
156             full_task_name = '.'.join([self.__class__.__name__, task_name])
157
158             spacing = self._periodic_spacing[task_name]
159             last_run = self._periodic_last_run[task_name]
160
161             # If a periodic task is _nearly_ due, then we'll run it early
162             if spacing is not None:
163                 idle_for = min(idle_for, spacing)
164                 if last_run is not None:
165                     delta = last_run + spacing - time.time()
166                     if delta > 0.2:
167                         idle_for = min(idle_for, delta)
168                         continue
169
170             LOG.debug("Running periodic task %(full_task_name)s",
171                       {"full_task_name": full_task_name})
172             self._periodic_last_run[task_name] = time.time()
173
174             try:
175                 task(self, context)
176             except Exception as e:
177                 if raise_on_error:
178                     raise
179                 LOG.exception(_LE("Error during %(full_task_name)s: %(e)s"),
180                               {"full_task_name": full_task_name, "e": e})
181             time.sleep(0)
182
183         return idle_for

run_periodic_tasks 函数是用户启动各个定时任务的,其中里面有几个数据结构比较重要,self._periodic_tasks:记录来每个task和每个task的函数句柄;self._periodic_spacing: 记录每一个task的运行间隔时间。在__init__函数中,还有构造一个self._periodic_last_run 结构用来记录每一个task上一次运行的时间;具体运行的时候会根据上次运行时间和间隔时间来确定是否运行,函数第162~168行;那具体的self._periodic_tasks和self._periodic_spacing是怎么得来的,是通过元类的方式来实现的;

元类可以干预一个类的实现形式,比方说在为一个类添加一个方法,或者为了一个类统一添加某种行为;上文的@six.add_metaclass(_PeriodicTasksMeta)语法就是添加了一个元类,我们看一下_PeriodicTasksMeta的实现;

100 class _PeriodicTasksMeta(type):
101     def __init__(cls, names, bases, dict_):
102         """Metaclass that allows us to collect decorated periodic tasks."""
103         super(_PeriodicTasksMeta, cls).__init__(names, bases, dict_)
104
105         # NOTE(sirp): if the attribute is not present then we must be the base
106         # class, so, go ahead an initialize it. If the attribute is present,
107         # then we're a subclass so make a copy of it so we don't step on our
108         # parent's toes.
109         try:
110             cls._periodic_tasks = cls._periodic_tasks[:]
111         except AttributeError:
112             cls._periodic_tasks = []
113
114         try:
115             cls._periodic_spacing = cls._periodic_spacing.copy()
116         except AttributeError:
117             cls._periodic_spacing = {}
118
119         for value in cls.__dict__.values():
120             if getattr(value, '_periodic_task', False):
121                 task = value
122                 name = task.__name__
123
124                 if task._periodic_spacing < 0:
125                     LOG.info(_LI('Skipping periodic task %(task)s because '
126                                  'its interval is negative'),
127                              {'task': name})
128                     continue
129                 if not task._periodic_enabled:
130                     LOG.info(_LI('Skipping periodic task %(task)s because '
131                                  'it is disabled'),
132                              {'task': name})
133                     continue
134
135                 # A periodic spacing of zero indicates that this task should
136                 # be run every pass
137                 if task._periodic_spacing == 0:
138                     task._periodic_spacing = None
139
140                 cls._periodic_tasks.append((name, task))
141                 cls._periodic_spacing[name] = task._periodic_spacing
其中109~117为类添加_periodic_tasks与_periodic_spacing两个类变量, for value in cls.__dict__.values() 语句访问类的各个成员,主要是函数成员;如果发现成员中有_periodic_task属性,并且等于True,则构造_periodic_tasks与_periodic_spacing两个数据结构;那么剩下就要弄清楚task的结构了,task就是类中的一个函数,它为什么具有_periodic_task属性和_periodic_spacing的呢?这个活就是装饰器做的事情了。

当你给一个函数设置定时任务的装饰器时,一般会这样写:
@periodic_task.periodic_task(spacing=…, run_immediately=...)
def f(args,kwargs):
…….

奥妙就在这个装饰器里面了。
42 def periodic_task(*args, **kwargs):
43     """Decorator to indicate that a method is a periodic task.
44
45     This decorator can be used in two ways:
46
47         1. Without arguments '@periodic_task', this will be run on every cycle
48            of the periodic scheduler.
49
50         2. With arguments:
51            @periodic_task(spacing=N [, run_immediately=[True|False]])
52            this will be run on approximately every N seconds. If this number is
53            negative the periodic task will be disabled. If the run_immediately
54            argument is provided and has a value of 'True', the first run of the
55            task will be shortly after task scheduler starts.  If
56            run_immediately is omitted or set to 'False', the first time the
57            task runs will be approximately N seconds after the task scheduler
58            starts.
59     """
60     def decorator(f):
61         # Test for old style invocation
62         if 'ticks_between_runs' in kwargs:
63             raise InvalidPeriodicTaskArg(arg='ticks_between_runs')
64
65         # Control if run at all
66         f._periodic_task = True
67         f._periodic_external_ok = kwargs.pop('external_process_ok', False)
68         if f._periodic_external_ok and not CONF.run_external_periodic_tasks:
69             f._periodic_enabled = False
70         else:
71             f._periodic_enabled = kwargs.pop('enabled', True)
72
73         # Control frequency
74         f._periodic_spacing = kwargs.pop('spacing', 0)
75         f._periodic_immediate = kwargs.pop('run_immediately', False)
76         if f._periodic_immediate:
77             f._periodic_last_run = None
78         else:
79             f._periodic_last_run = time.time()
80         return f
81
82     # NOTE(sirp): The `if` is necessary to allow the decorator to be used with
83     # and without parents.
84     #
85     # In the 'with-parents' case (with kwargs present), this function needs to
86     # return a decorator function since the interpreter will invoke it like:
87     #
88     #   periodic_task(*args, **kwargs)(f)
89     #
90     # In the 'without-parents' case, the original function will be passed
91     # in as the first argument, like:
92     #
93     #   periodic_task(f)
94     if kwargs:
95         return decorator
96     else:
97         return decorator(args[0])

在装饰器中,66~80行就是为一个函数设置_periodic_task属性,并从装饰器的kwargs中取得spacing参数,如果有run_immediately(启动之后先立刻执行一次,然后再定时执行)会设置_periodic_last_run属性。这样定时函数运行需要的信息就都齐全了;

(2)另一个种方法是使用loopingcall.py ,也在nova/openstack/common中。
使用方法是:obj = loopingcall.FixedIntervalLoopingCall(f, args,kwargs),
                      obj.start(interval,initial_delay)
  代码:
62 class FixedIntervalLoopingCall(LoopingCallBase):
63     """A fixed interval looping call."""
64
65     def start(self, interval, initial_delay=None):
66         self._running = True
67         done = event.Event()
68
69         def _inner():
70             if initial_delay:
71                 greenthread.sleep(initial_delay)
72
73             try:
74                 while self._running:
75                     start = timeutils.utcnow()
76                     self.f(*self.args, **self.kw)
77                     end = timeutils.utcnow()
78                     if not self._running:
79                         break
80                     delay = interval - timeutils.delta_seconds(start, end)
81                     if delay  0 else 0)
85             except LoopingCallDone as e:
86                 self.stop()
87                 done.send(e.retvalue)
88             except Exception:
89                 LOG.exception(_LE('in fixed duration looping call'))
90                 done.send_exception(*sys.exc_info())
91                 return
92             else:
93                 done.send(True)
94
95         self.done = done
96
97         greenthread.spawn_n(_inner)
98         return self.done

start()方法运行定时任务,initial_delay表示是否有延时,75~84每次运行任务要记录开始时间和结束时间,如果开始时间减去结束时间比interval还大的话,那么就不等待了,立刻运行:greenthread.sleep(delay if delay > 0 else 0)  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-56228-1-1.html 上篇帖子: Openstack中当物理机故障时的灾难恢复 下篇帖子: 如何构建高可靠的Openstack
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表