运行更多任务的 Celery 任务
我正在使用 celerybeat 启动一项主要任务,该任务启动许多次要任务。我已经写好了这两个任务。
有没有办法轻松做到这一点? Celery 是否允许从任务内部运行任务?
我的示例:
@task
def compute(users=None):
if users is None:
users = User.objects.all()
tasks = []
for user in users:
tasks.append(compute_for_user.subtask((user.id,)))
job = TaskSet(tasks)
job.apply_async() # raises a IOError: Socket closed
@task
def compute_for_user(user_id):
#do some stuff
compute
从 celerybeat 调用,但在尝试运行 apply_async
时导致 IOError。有什么想法吗?
I am using celerybeat to kick off a primary task that kicks of a number of secondary tasks. I have both tasks written already.
Is there a way to easily do this? Does Celery allow for tasks to be run from within tasks?
My example:
@task
def compute(users=None):
if users is None:
users = User.objects.all()
tasks = []
for user in users:
tasks.append(compute_for_user.subtask((user.id,)))
job = TaskSet(tasks)
job.apply_async() # raises a IOError: Socket closed
@task
def compute_for_user(user_id):
#do some stuff
compute
gets called from celerybeat, but causes an IOError when it tries to run apply_async
. Any ideas?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(4)
回答您的开场问题:从版本 2.0 开始,Celery 提供了一种从其他任务启动任务的简单方法。您所说的“次要任务”就是所谓的“子任务”。请参阅任务、子任务和回调集的文档,其中@ Paperino 很友善地提供了链接。
对于 3.0 版,Celery 更改为使用 组 来实现此行为和其他类型的行为。
您的代码表明您已经熟悉此界面。您的实际问题似乎是,“当我尝试运行一组子任务时,为什么会出现‘Socket Closed’
IOError
?”我认为没有人可以回答这个问题,因为您没有提供有关您的程序的足够信息。您的摘录无法按原样运行,因此我们无法亲自检查您遇到的问题。请发布随IOError
提供的堆栈跟踪,如果幸运的话,将会有人帮助您解决崩溃问题。To answer your opening questions: As of version 2.0, Celery provides an easy way to start tasks from other tasks. What you are calling "secondary tasks" are what it calls "subtasks". See the documentation for Sets of tasks, Subtasks and Callbacks, which @Paperino was kind enough to link to.
For version 3.0, Celery changed to using groups for this, and other, types of behavior.
Your code shows that you are already familiar with this interface. Your actual question seems to be, "Why am I getting a 'Socket Closed'
IOError
when I try to run my set of subtasks?" I don't think anyone can answer that, because you have not provided enough information about your program. Your excerpt cannot be run as-is, so we cannot examine the problem you're having for ourselves. Please post the stacktrace provided with theIOError
, and with any luck, someone that can help you with your crasher will come along.你可以使用这样的东西(3.0中支持)
You can use something like this (Support in 3.0 )
由于 3.0 版“任务集”不再是这个术语...组、链和和弦作为一种特殊类型的子任务是新事物,请参阅 http://docs.celeryproject.org/en/3.1/whatsnew-3.0.html#group-chord-chain-are-now-subtasks
And since version 3.0 'TaskSet' isn't the term anymore... Groups, Chains and Chords as a special type of subtask is the new thing, see http://docs.celeryproject.org/en/3.1/whatsnew-3.0.html#group-chord-chain-are-now-subtasks
对于提到的 IOError,虽然这里的信息不足以说明导致它的原因,但我的猜测是您尝试在任务函数内部建立连接,因此每当调用任务时,就会建立一个新连接。如果任务被调用一千次,就会有一千个连接。这将淹没系统套接字管理器,并且 IOError 是它的抱怨。
For the IOError mentioned, although the information here is not sufficient to tell what caused it, my guess is that you tried to establish a connection inside the task function, so whenever a task is called, a new connection is built. If the task is to be called thousand times, there will be thousand connections. This will flood the system socket manager and the IOError is its complaint.