设为首页 收藏本站
查看: 679|回复: 0

[经验分享] Python下定时任务框架APScheduler的使用

[复制链接]

尚未签到

发表于 2018-8-4 10:47:10 | 显示全部楼层 |阅读模式
  今天准备实现一个功能需要用到定时执行任务,所以就看到了Python的一个定时任务框架APScheduler,试了一下感觉还不错。
  1.APScheduler简介:
  APScheduler是Python的一个定时任务框架,可以很方便的满足用户定时执行或者周期执行任务的需求,它提供了基于日期date、固定时间间隔interval 、以及类似于Linux上的定时任务crontab类型的定时任务。并且该框架不仅可以添加、删除定时任务,还可以将任务存储到数据库中,实现任务的持久化,所以使用起来非常方便。
  2.APScheduler安装:
  APScheduler的安装相对来说也非常简单,可以直接利用pip安装,如果没有pip可以下载源码,利用源码安装。
  1).利用pip安装:(推荐)
# pip install apscheduler  2).基于源码安装:https://pypi.python.org/pypi/APScheduler/
# python setup.py install  3.基本概念
  APScheduler有四种组件及相关说明:
  1) triggers(触发器):触发器包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行,除了他们自己初始化配置外,触发器完全是无状态的。
  2)job stores(作业存储):用来存储被调度的作业,默认的作业存储器是简单地把作业任务保存在内存中,其它作业存储器可以将任务作业保存到各种数据库中,支持MongoDB、Redis、SQLAlchemy存储方式。当对作业任务进行持久化存储的时候,作业的数据将被序列化,重新读取作业时在反序列化。
  3) executors(执行器):执行器用来执行定时任务,只是将需要执行的任务放在新的线程或者线程池中运行。当作业任务完成时,执行器将会通知调度器。对于执行器,默认情况下选择ThreadPoolExecutor就可以了,但是如果涉及到一下特殊任务如比较消耗CPU的任务则可以选择ProcessPoolExecutor,当然根据根据实际需求可以同时使用两种执行器。
  4) schedulers(调度器):调度器是将其它部分联系在一起,一般在应用程序中只有一个调度器,应用开发者不会直接操作触发器、任务存储以及执行器,相反调度器提供了处理的接口。通过调度器完成任务的存储以及执行器的配置操作,如可以添加。修改、移除任务作业。
  APScheduler提供了多种调度器,可以根据具体需求来选择合适的调度器,常用的调度器有:
  BlockingScheduler:适合于只在进程中运行单个任务的情况,通常在调度器是你唯一要运行的东西时使用。
  BackgroundScheduler: 适合于要求任何在程序后台运行的情况,当希望调度器在应用后台执行时使用。
  AsyncIOScheduler:适合于使用asyncio框架的情况
  GeventScheduler: 适合于使用gevent框架的情况
  TornadoScheduler: 适合于使用Tornado框架的应用
  TwistedScheduler: 适合使用Twisted框架的应用
  QtScheduler: 适合使用QT的情况
  4.配置调度器
  APScheduler提供了许多不同的方式来配置调度器,你可以使用一个配置字典或者作为参数关键字的方式传入。你也可以先创建调度器,再配置和添加作业,这样你可以在不同的环境中得到更大的灵活性。
  1)下面一个简单的示例:
import time  
from apscheduler.schedulers.blocking import BlockingScheduler
  

  
def test_job():
  
    print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
  

  
scheduler = BlockingScheduler()
  
'''
  
#该示例代码生成了一个BlockingScheduler调度器,使用了默认的默认的任务存储MemoryJobStore,以及默认的执行器ThreadPoolExecutor,并且最大线程数为10。
  
'''
  
scheduler.add_job(test_job, 'interval', seconds=5, id='test_job')
  
'''
  
#该示例中的定时任务采用固定时间间隔(interval)的方式,每隔5秒钟执行一次。
  
#并且还为该任务设置了一个任务id
  
'''
  
scheduler.start()
  2)如果想执行一些复杂任务,如上边所说的同时使用两种执行器,或者使用多种任务存储方式,并且需要根据具体情况对任务的一些默认参数进行调整。可以参考下面的方式。(http://apscheduler.readthedocs.io/en/latest/userguide.html)
  第一种方式:
from pytz import utc  

  
from apscheduler.schedulers.background import BackgroundScheduler
  
from apscheduler.jobstores.mongodb import MongoDBJobStore
  
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
  
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
  

  

  
jobstores = {
  
    'mongo': MongoDBJobStore(),
  
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
  
}
  
executors = {
  
    'default': ThreadPoolExecutor(20),
  
    'processpool': ProcessPoolExecutor(5)
  
}
  
job_defaults = {
  
    'coalesce': False,
  
    'max_instances': 3
  
}
  
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
  第二种方式:
from apscheduler.schedulers.background import BackgroundScheduler  

  

  
# The "apscheduler." prefix is hard coded
  
scheduler = BackgroundScheduler({
  
    'apscheduler.jobstores.mongo': {
  
         'type': 'mongodb'
  
    },
  
    'apscheduler.jobstores.default': {
  
        'type': 'sqlalchemy',
  
        'url': 'sqlite:///jobs.sqlite'
  
    },
  
    'apscheduler.executors.default': {
  
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
  
        'max_workers': '20'
  
    },
  
    'apscheduler.executors.processpool': {
  
        'type': 'processpool',
  
        'max_workers': '5'
  
    },
  
    'apscheduler.job_defaults.coalesce': 'false',
  
    'apscheduler.job_defaults.max_instances': '3',
  
    'apscheduler.timezone': 'UTC',
  
})
  第三种方式:
from pytz import utc  

  
from apscheduler.schedulers.background import BackgroundScheduler
  
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
  
from apscheduler.executors.pool import ProcessPoolExecutor
  

  

  
jobstores = {
  
    'mongo': {'type': 'mongodb'},
  
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
  
}
  
executors = {
  
    'default': {'type': 'threadpool', 'max_workers': 20},
  
    'processpool': ProcessPoolExecutor(max_workers=5)
  
}
  
job_defaults = {
  
    'coalesce': False,
  
    'max_instances': 3
  
}
  
scheduler = BackgroundScheduler()
  

  
# .. do something else here, maybe add jobs etc.
  

  
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
  5.对任务作业的基本操作:
  1).添加作业有两种方式:第一种可以直接调用add_job(),第二种使用scheduled_job()修饰器。
  而add_job()是使用最多的,它可以返回一个apscheduler.job.Job实例,因而可以对它进行修改或者删除,而使用修饰器添加的任务添加之后就不能进行修改。
  例如使用add_job()添加作业:
#!/usr/bin/env python  
#-*- coding:UTF-8
  
import time
  
import datetime
  
from apscheduler.schedulers.blocking import BlockingScheduler
  

  
def job1(f):
  
    print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), f
  

  
def job2(arg1, args2, f):
  
    print f, args1, args2
  

  
def job3(**args):
  
    print args
  

  
'''
  
APScheduler支持以下三种定时任务:
  
cron: crontab类型任务
  
interval: 固定时间间隔任务
  
date: 基于日期时间的一次性任务
  
'''
  
scheduler = BlockingScheduler()
  
#循环任务示例
  
scheduler.add_job(job1, 'interval', seconds=5, args=('循环',), id='test_job1')
  
#定时任务示例
  
scheduler.add_job(job1, 'cron', second='*/5', args=('定时',), id='test_job2')
  
#一次性任务示例
  
scheduler.add_job(job1, next_run_time=(datetime.datetime.now() + datetime.timedelta(seconds=10)), args=('一次',), id='test_job3')
  
'''
  
传递参数的方式有元组(tuple)、列表(list)、字典(dict)
  
注意:不过需要注意采用元组传递参数时后边需要多加一个逗号
  
'''
  
#基于list
  
scheduler.add_job(job2, 'interval', seconds=5, args=['a','b','list'], id='test_job4')
  
#基于tuple
  
scheduler.add_job(job2, 'interval', seconds=5, args=('a','b','tuple',), id='test_job5')
  
#基于dict
  
scheduler.add_job(job3, 'interval', seconds=5, kwargs={'f':'dict', 'a':1,'b':2}, id='test_job7')
  
print scheduler.get_jobs()
  
scheduler.start()
  

  
#带有参数的示例
  
scheduler.add_job(job2, 'interval', seconds=5, args=['a','b'], id='test_job4')
  
scheduler.add_job(job2, 'interval', seconds=5, args=('a','b',), id='test_job5')
  
scheduler.add_job(job3, 'interval', seconds=5, kwargs={'a':1,'b':2}, id='test_job6')
  
print scheduler.get_jobs()
  
scheduler.start()
  或者使用scheduled_job()修饰器来添加作业:
@sched.scheduled_job('cron', second='*/5' ,id='my_job_id',)  
def test_task():
  
    print("Hello world!")
  2).获得任务列表:
  可以通过get_jobs方法来获取当前的任务列表,也可以通过get_job()来根据job_id来获得某个任务的信息。并且apscheduler还提供了一个print_jobs()方法来打印格式化的任务列表。
  例如:
scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job')  
print scheduler.get_job('my_job_id')
  
print scheduler.get_jobs()
  3).修改任务:
  修改任务任务的属性可以使用apscheduler.job.Job.modify()或者modify_job()方法,可以修改除了id的其它任何属性。
  例如:
job = scheduler.add_job(my_job, 'interval', seconds=5, id='my_job' name='test_job')  
job.modify(max_instances=5, name='my_job')
  4).删除任务:

  删除调度器中的任务有可以用remove_job()根据job>  注意:通过scheduled_job()添加的任务只能使用remove_job()进行删除。
  例如:
job = scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job')  
job.remove()
  或者
scheduler.add_job(my_job, 'interval', seconds=5, id='my_job_id' name='test_job')  
scheduler.remove_job('my_job')
  5).暂停与恢复任务:
  暂停与恢复任务可以直接操作任务实例或者调度器来实现。当任务暂停时,它的运行时间会被重置,暂停期间不会计算时间。
  暂停任务:
apscheduler.job.Job.pause()  
apscheduler.schedulers.base.BaseScheduler.pause_job()
  恢复任务
apscheduler.job.Job.resume()  
apscheduler.schedulers.BaseScheduler.resume_job()
  6).启动调度器
  可以使用start()方法启动调度器,BlockingScheduler需要在初始化之后才能执行start(),对于其他的Scheduler,调用start()方法都会直接返回,然后可以继续执行后面的初始化操作。
  例如:
from apscheduler.schedulers.blocking import BlockingScheduler  

  
def my_job():
  
    print "Hello world!"
  

  
scheduler = BlockingScheduler()
  
scheduler.add_job(my_job, 'interval', seconds=5)
  
scheduler.start()
  7).关闭调度器:
  使用下边方法关闭调度器:
scheduler.shutdown()  默认情况下调度器会关闭它的任务存储和执行器,并等待所有正在执行的任务完成,如果不想等待,可以进行如下操作:
scheduler.shutdown(wait=False)  注意:
  当出现No handlers could be found for logger “apscheduler.scheduler”次错误信息时,说明没有
  logging模块的logger存在,所以需要添加上,对应新增内容如下所示(仅供参考):
import logging  
logging.basicConfig(level=logging.DEBUG,
  
            format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
  
           datafmt='%a, %d %b %Y %H:%M:%S',
  
            filename='/var/log/aaa.txt',
  
            filemode='a')

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-546364-1-1.html 上篇帖子: Python--基础练习 下篇帖子: python变量及浅复制与深复制
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表