celery beat 调度如何运行期间动态添加任务?

发布于 2022-09-04 21:17:33 字数 86 浏览 22 评论 0

我尝试过django-celery-beat,在admin后台添加任务,可以实现动态添加任务
但要重启celery beat才生效,请问,有其他方试吗?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

掌心的温暖 2022-09-11 21:17:33

有个思路,你可以考虑,我目前也在尝试这个方法,处于摸石过河阶段。celery是支持定时任务,但是不符合我的需求,我需要像linux下的crontab这样动态添加定时任务,我也看了django-celery-beat,因为用的是Flask,发现不值得参考实现,所以一直在看文档搜资料,终于被我找到一种方式,celery的apply_async这个函数非常有用,它有个eta参数,它的简化使用是countdown,但是eta的威力是很巨大的,因为它只接受datetime对象,比如你给定一个任务在2017-05-02 20:0:0执行,你可以这样使用:

job.apply_async(args=args, kwarg=kwargs, eta=datetime(2017,5,2,20,0,0))

是不是很少用,假如我有一个任务需要每天晚上八点执行,我可以利用这个eta参数实现。伪代码如下:

时间规则 = '每天晚上八点执行'
第一次调用任务,先计算最近的执行时间,作为eta的参数,调用apply_async函数,
然后第一次任务执行成功,得到上次任务的eta参数值,在天的值上加一,然后把新的执行时间作为eta的参数再次调用apply_async函数,这里省略了很多判断,自行脑补。
循环往复,是不是一直按每天晚上八点执行。

这里有个非常重要的点是如何在任务执行成功的时候计算下一次的执行时间,做法如下

class MyTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print 'task done: {0}'.format(retval)
        return super(MyTask, self).on_success(retval, task_id, args, kwargs)
        
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print 'task fail, reason: {0}'.format(exc)
        return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo)
        
@app.task(base=MyTask)
def add(x, y):
    return x + y

它提供了任务执行成功和失败的函数,我们只要在此基础上重写就可以了,我说的只是最核心的部分,具体怎么做有很多方法,

如此安好 2022-09-11 21:17:33

无法动态添加,必须重启 beat。

ask 回答过原因了 #3493

苍暮颜 2022-09-11 21:17:33

你可以参照这个redisbeat

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文