设为首页 收藏本站
查看: 2639|回复: 0

[经验分享] 44. Python Celery多实例 定时任务

[复制链接]

尚未签到

发表于 2018-8-9 08:29:26 | 显示全部楼层 |阅读模式
  celery是一个分布式的任务调度模块,那么celery是如何和分布式挂钩呢?
  celery可以支持多台不同的计算机执行不同的任务或者相同的任务。
  如果要说celery的分布式应用的话,就要提到celery的消息路由机制,提到AMQP协议。
  具体可以查看AMQP文档详细了解。
  简单理解:
  可以有多个"消息队列"(message Queue),不同的消息可以指定发送给不同的Message Queue,
  而这是通过Exchange来实现的,发送消息到"消息队列"中时,可以指定routiing_key,Exchange通过routing_key来吧消息路由(routes)到不同的"消息队列"中去。
  如图:
DSC0000.jpg

  exchange 对应 一个消息队列(queue),即:通过"消息路由"的机制使exchange对应queue,每个queue对应每个worker
  写个例子:
  vim demon3.py
from celery import Celery  
app = Celery()
  
app.config_from_object("celeryconfig")
  
@app.task
  
def taskA(x, y):
  
    return x * y
  
@app.task
  
def taskB(x, y, z):
  
    return x + y + z
  
@app.task
  
def add(x, y):
  
    return x + y
  vim celeryconfig.py
from kombu import Queue  
BORKER_URL = "redis://192.168.48.131:6379/1"#1库
  
CELERY_RESULT_BACKEND = "redis://192.168.48.131:6379/2"#2库
  
CELERY_QUEUES = {
  
    Queue("default", Exchange("default"), routing_key = "default"),
  
    Queue("for_task_A", Exchange("for_task_A"), routing_key = "for_task_A"),
  
    Queue("for_task_B", Exchange("for_task_B"), routing_key = "for_task_B")
  
}
  
#路由
  
CELERY_ROUTES = {
  
    "demon3.taskA":{"queue": "for_task_A",  "routing_key": "for_task_A"},
  
    "demon3.taskB":{"queue": "for_task_B",  "routing_key": "for_task_B"}
  
}
  下面把两个脚本导入服务器:
  指定taskA启动一个worker:
# celery -A demon3 worker -l info -n workerA.%h -Q for_task_A  同理:
# celery -A demon3 worker -l info -n workerB.%h -Q for_task_B  下面远程客户端调用:新文件
  vim remote.py
from demon3 import *  
r1 = taskA.delay(10, 20)
  
print (r1.result)
  
print (r1.status)
  
r2 = taskB.delay(10, 20, 30)
  
time.sleep(1)
  
prnit (r2.result)
  
print (r2.status)
  
#print (dir(r2))
  
r3 = add.delay(100, 200)
  
print (r3.result)
  
print (r3.status)#PENDING
  看到状态是PENDING,表示没有执行,这个是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。
  下面,我们来启动一个worker来执行celery队列中的任务
# celery -A tasks worker -l info -n worker.%h -Q celery ##默认的  可以看到这行的结果为success
  print(re3.status)    #SUCCESS
  定时任务:
  Celery 与 定时任务
  在celery中执行定时任务非常简单,只需要设置celery对象中的CELERYBEAT_SCHEDULE属性即可。
  下面我们接着在配置文件:celeryconfig.py,添加关于 CELERYBEAT_SCHEDULE 变量到脚本中去:
CELERY_TIMEZONE = 'UTC'  
CELERYBEAT_SCHEDULE = {
  
    'taskA_schedule' : {
  
        'task':'tasks.taskA',
  
        'schedule':20,
  
        'args':(5,6)
  
    },
  
'taskB_scheduler' : {
  
    'task':"tasks.taskB",
  
    "schedule":200,
  
    "args":(10,20,30)
  
    },
  
'add_schedule': {
  
    "task":"tasks.add",
  
    "schedule":10,
  
    "args":(1,2)
  
    }
  
}
  注意格式,否则会有问题
  启动:
  celery -A demon3 worker -l info -n workerA.%h -Q for_task_A
  celery -A demon3 worker -l info -n workerB.%h -Q for_task_B
  celery -A tasks worker -l info -n worker.%h -Q celery
  celery -A demon3 beat

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-548901-1-1.html 上篇帖子: mysql + Python3.5.2 + Django + Uwsgi + Nginx实现生产环境 下篇帖子: python命令行参数模块argparse
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表