通知 celery 任务worker 关闭

发布于 2024-12-15 09:07:24 字数 804 浏览 4 评论 0原文

我使用 celery 2.4.1 和 python 2.6、rabbitmq 后端和 django。我希望如果工作人员关闭,我的任务能够正确清理。据我所知,您无法提供任务析构函数,因此我尝试连接 worker_shutdown 信号。

注意: AbortableTask 仅适用于数据库后端,所以我不能用那个。

from celery.signals import worker_shutdown

@task
def mytask(*args)

  obj = DoStuff()

  def shutdown_hook(*args):
     print "Worker shutting down"
     # cleanup nicely
     obj.stop()

  worker_shutdown.connect(shutdown_hook)

  # blocking call that monitors a network connection
  obj.stuff()

但是,关闭挂钩永远不会被调用。 Ctrl-C'ing 工作线程不会终止任务,我必须从 shell 手动终止它。

因此,如果这不是正确的方法,我该如何允许任务正常关闭?

I am using celery 2.4.1 with python 2.6, the rabbitmq backend, and django. I would like my task to be able to clean up properly if the worker shuts down. As far as I am aware you cannot supply a task destructor so I tried hooking into the worker_shutdown signal.

Note: AbortableTask only works with the database backend so I cant use that.

from celery.signals import worker_shutdown

@task
def mytask(*args)

  obj = DoStuff()

  def shutdown_hook(*args):
     print "Worker shutting down"
     # cleanup nicely
     obj.stop()

  worker_shutdown.connect(shutdown_hook)

  # blocking call that monitors a network connection
  obj.stuff()

However, the shutdown hook never gets called. Ctrl-C'ing the worker doesnt kill the task and I have to manually kill it from the shell.

So if this is not the proper way to go about it, how do I allow tasks to shutdown gracefully?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

苏别ゝ 2024-12-22 09:07:24

worker_shutdown 仅由 MainProcess 发送,而不是由子池工作线程发送。
除worker_process_init之外的所有worker_*信号,请参阅MainProcess

但是,关闭挂钩永远不会被调用。 Ctrl-C 操作工人
不会终止任务,我必须从 shell 手动终止它。

在正常(热)关闭下,工作线程永远不会终止任务。
即使任务需要几天才能完成,工作人员也不会完成关闭
直到完成。您可以将 --soft-time-limit--time-limit 设置为
告诉实例何时可以终止任务。

因此,要添加任何类型的进程清理过程,您首先需要
确保任务能够真正完成。因为清理不会
在此之前被调用。

要向池工作进程添加清理步骤,您可以使用
像这样的东西:

from celery import platforms
from celery.signals import worker_process_init

def cleanup_after_tasks(signum, frame):
    # reentrant code here (see http://docs.python.org/library/signal.html)

def install_pool_process_sighandlers(**kwargs):
    platforms.signals["TERM"] = cleanup_after_tasks
    platforms.signals["INT"] = cleanup_after_tasks

worker_process_init.connect(install_pool_process_sighandlers)

worker_shutdown is only sent by the MainProcess, not the child pool workers.
All worker_* signals except for worker_process_init, refer to the MainProcess.

However, the shutdown hook never gets called. Ctrl-C'ing the worker
doesn't kill the task and I have to manually kill it from the shell.

The worker never terminates a task under normal (warm) shutdown.
Even if a task takes days to complete, the worker won't complete shutdown
until it's completed. You can set --soft-time-limit, or --time-limit to
to tell the instance when it's ok to terminate the task.

So to add any kind of process cleanup process you first need to
make sure that the tasks can actually complete. As the cleanup wouldn't
be called before that happens.

To add a cleanup step to the pool worker processes you can use
something like:

from celery import platforms
from celery.signals import worker_process_init

def cleanup_after_tasks(signum, frame):
    # reentrant code here (see http://docs.python.org/library/signal.html)

def install_pool_process_sighandlers(**kwargs):
    platforms.signals["TERM"] = cleanup_after_tasks
    platforms.signals["INT"] = cleanup_after_tasks

worker_process_init.connect(install_pool_process_sighandlers)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文