如何设置 Celery 在运行任务之前调用自定义初始化函数?

发布于 2024-08-18 20:33:34 字数 372 浏览 14 评论 0原文

我有一个 Django 项目,我正在尝试使用 Celery 提交任务进行后台处理( http:// /ask.github.com/celery/introduction.html )。 Celery 与 Django 集成得很好,我已经能够提交自定义任务并获取结果。

唯一的问题是我找不到在守护进程中执行自定义初始化的合理方法。在开始处理任务之前,我需要调用一个昂贵的函数,该函数会加载大量内存,但我不能每次都调用该函数。

以前有人遇到过这个问题吗?有什么想法可以在不修改 Celery 源代码的情况下解决这个问题吗?

谢谢

I have a Django project and I'm trying to use Celery to submit tasks for background processing ( http://ask.github.com/celery/introduction.html ). Celery integrates well with Django and I've been able to submit my custom tasks and get back results.

The only problem is that I can't find a sane way of performing custom initialization in the daemon process. I need to call an expensive function that loads a lot of memory before I start processing the tasks, and I can't afford to call that function every time.

Has anyone had this problem before? Any ideas how to work around it without modifying the Celery source code?

Thanks

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

莫言歌 2024-08-25 20:33:34

您可以编写自定义加载程序,也可以使用信号。

加载器有 on_task_init 方法,当任务即将执行时会调用该方法,
on_worker_init 由 celery+celerybeat 主进程调用。

使用信号可能是最简单的,可用的信号是:

0.8.x:

  • task_prerun(task_id, task, args, kwargs)

    当工作人员(或本地)即将执行任务时调度
    如果使用 apply/或者如果已设置 CELERY_ALWAYS_EAGER)。

  • task_postrun(task_id,任务,args,kwargs,retval)
    在与上述相同的条件下执行任务后调度。

  • task_sent(task_id、任务、args、kwargs、eta、任务集)

    应用任务时调用(不适合长时间运行的操作)

0.9.x 中提供的其他信号(github 上的当前主分支):

  • worker_init()

    在 celeryd 启动时调用(在任务初始化之前,因此如果在
    系统支持fork,任何内存更改都会被复制到子进程
    工作进程)。

  • worker_ready()

    当 celeryd 能够接收任务时调用。

  • worker_shutdown()

    当 celeryd 关闭时调用。

下面是一个在进程中第一次运行任务时预先计算某些内容的示例:

from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun

_precalc_table = {}

class PowersOfTwo(Task):

    def run(self, x):
        if x in _precalc_table:
            return _precalc_table[x]
        else:
            return x ** 2
tasks.register(PowersOfTwo)


def _precalc_numbers(**kwargs):
    if not _precalc_table: # it's empty, so haven't been generated yet
        for i in range(1024):
            _precalc_table[i] = i ** 2


# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])

如果您希望为所有任务运行该函数,只需跳过 sender 参数即可。

You can either write a custom loader, or use the signals.

Loaders have the on_task_init method, which is called when a task is about to be executed,
and on_worker_init which is called by the celery+celerybeat main process.

Using signals is probably the easiest, the signals available are:

0.8.x:

  • task_prerun(task_id, task, args, kwargs)

    Dispatched when a task is about to be executed by the worker (or locally
    if using apply/or if CELERY_ALWAYS_EAGER has been set).

  • task_postrun(task_id, task, args, kwargs, retval)
    Dispatched after a task has been executed in the same conditions as above.

  • task_sent(task_id, task, args, kwargs, eta, taskset)

    Called when a task is applied (not good for long-running operations)

Additional signals available in 0.9.x (current master branch on github):

  • worker_init()

    Called when celeryd has started (before the task is initialized, so if on a
    system supporting fork, any memory changes would be copied to the child
    worker processes).

  • worker_ready()

    Called when celeryd is able to receive tasks.

  • worker_shutdown()

    Called when celeryd is shutting down.

Here's an example precalculating something the first time a task is run in the process:

from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun

_precalc_table = {}

class PowersOfTwo(Task):

    def run(self, x):
        if x in _precalc_table:
            return _precalc_table[x]
        else:
            return x ** 2
tasks.register(PowersOfTwo)


def _precalc_numbers(**kwargs):
    if not _precalc_table: # it's empty, so haven't been generated yet
        for i in range(1024):
            _precalc_table[i] = i ** 2


# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])

If you want the function to be run for all tasks, just skip the sender argument.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文