如何在 Celery 中检查任务状态?

发布于 2024-12-29 11:17:31 字数 263 浏览 4 评论 0原文

如何检查任务是否在 celery 中运行(具体来说,我正在使用 celery-django)?

我已经阅读了文档,并且用谷歌搜索了,但我看不到这样的调用:

my_example_task.state() == RUNNING

我的用例是我有一个用于转码的外部(java)服务。当我发送要转码的文档时,我想检查运行该服务的任务是否正在运行,如果没有,则(重新)启动它。

我相信我正在使用当前的稳定版本 - 2.4。

How does one check whether a task is running in celery (specifically, I'm using celery-django)?

I've read the documentation, and I've googled, but I can't see a call like:

my_example_task.state() == RUNNING

My use-case is that I have an external (java) service for transcoding. When I send a document to be transcoded, I want to check if the task that runs that service is running, and if not, to (re)start it.

I'm using the current stable versions - 2.4, I believe.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(13

浴红衣 2025-01-05 11:17:32

每个 Task 对象都有一个 .request 属性,其中包含 AsyncRequest 对象。因此,下面的行给出了任务 task 的状态:

task.AsyncResult(task.request.id).state

Every Task object has a .request property, which contains it AsyncRequest object. Accordingly, the following line gives the state of a Task task:

task.AsyncResult(task.request.id).state
少跟Wǒ拽 2025-01-05 11:17:32

老问题,但我最近遇到了这个问题。

如果您尝试获取task_id,您可以这样做:

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

现在您确切地知道task_id是什么,并且现在可以使用它来获取AsyncResult:

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4

Old question but I recently ran into this problem.

If you're trying to get the task_id you can do it like this:

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

Now you know exactly what the task_id is and can now use it to get the AsyncResult:

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4
没︽人懂的悲伤 2025-01-05 11:17:32

您还可以创建自定义状态并在任务执行期间更新其值。
此示例来自文档:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/用户指南/tasks.html#custom-states

You can also create custom states and update it's value duting task execution.
This example is from docs:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states

人疚 2025-01-05 11:17:32

只需使用 celery FAQ 中的 API,

result = app.AsyncResult(task_id)

效果很好。

Just use this API from celery FAQ

result = app.AsyncResult(task_id)

This works fine.

筱武穆 2025-01-05 11:17:32

2020年答案:

#### tasks.py
@celery.task()
def mytask(arg1):
    print(arg1)

#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
    process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
    state = process.state
    return f"Thanks for your patience, your job {process.task_id} \
             is being processed. Status {state}"

Answer of 2020:

#### tasks.py
@celery.task()
def mytask(arg1):
    print(arg1)

#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
    process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
    state = process.state
    return f"Thanks for your patience, your job {process.task_id} \
             is being processed. Status {state}"
对岸观火 2025-01-05 11:17:32

尝试:

task.AsyncResult(task.request.id).state

这将提供 Celery 任务状态。如果 Celery 任务已经处于 FAILURE 状态,它将抛出异常:

raised Outstanding: KeyError('exc_type',)

Try:

task.AsyncResult(task.request.id).state

this will provide the Celery Task status. If Celery Task is already is under FAILURE state it will throw an Exception:

raised unexpected: KeyError('exc_type',)

九八野马 2025-01-05 11:17:32
  • 首先,在你的 celery APP 中:

vi my_celery_apps/app1.py

app = Celery(worker_name)
  • 然后,切换到任务文件,从你的 celery 应用模块导入 app。

vi 任务/task1.py

from my_celery_apps.app1 import app

app.AsyncResult(taskid)

try:
   if task.state.lower() != "success":
        return
except:
    """ do something """
  • First,in your celery APP:

vi my_celery_apps/app1.py

app = Celery(worker_name)
  • and next, change to the task file,import app from your celery app module.

vi tasks/task1.py

from my_celery_apps.app1 import app

app.AsyncResult(taskid)

try:
   if task.state.lower() != "success":
        return
except:
    """ do something """
默嘫て 2025-01-05 11:17:32

中找到了有用的信息

我在Celery 项目工作人员指南 Inspection-workers< /a>

对于我的情况,我正在检查 Celery 是否正在运行。

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
    state = 'FAILURE'
else:
    state = str(task.state) 

您可以使用检查来满足您的需求。

I found helpful information in the

Celery Project Workers Guide inspecting-workers

For my case, I am checking to see if Celery is running.

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
    state = 'FAILURE'
else:
    state = str(task.state) 

You can play with inspect to get your needs.

清醇 2025-01-05 11:17:32
res = method.delay()
    
print(f"id={res.id}, state={res.state}, status={res.status} ")

print(res.get())
res = method.delay()
    
print(f"id={res.id}, state={res.state}, status={res.status} ")

print(res.get())
凉宸 2025-01-05 11:17:32

对于简单的任务,我们可以使用 http://flower.readthedocs.io/en/latest /screenshots.htmlhttp://policystat.github.io/jobtastic/到进行监控。

对于复杂的任务,比如涉及许多其他模块的任务。我们建议手动记录特定任务单元的进度和消息。

for simple tasks, we can use http://flower.readthedocs.io/en/latest/screenshots.html and http://policystat.github.io/jobtastic/ to do the monitoring.

and for complicated tasks, say a task which deals with a lot other modules. We recommend manually record the progress and message on the specific task unit.

泛滥成性 2025-01-05 11:17:32

除了上述程序化方法之外
使用Flower可以轻松查看任务状态。

使用 Celery Events 进行实时监控。
Flower 是一个基于 Web 的工具,用于监视和管理 Celery 集群。

  1. 任务进度和历史记录
  2. 能够显示任务详细信息(参数、开始时间、运行时间等)
  3. 图表和统计

官方文档:
Flower - Celery 监控工具

安装:

$ pip install flower

使用:

http://localhost:5555

更新:
这与版本控制有关,flower(版本=0.9.7)仅适用于celery(版本=4.4.7),而且当您安装flower时,它会将更高版本的celery卸载到4.4.7,这永远不适用于已注册的任务

Apart from above Programmatic approach
Using Flower Task status can be easily seen.

Real-time monitoring using Celery Events.
Flower is a web based tool for monitoring and administrating Celery clusters.

  1. Task progress and history
  2. Ability to show task details (arguments, start time, runtime, and more)
  3. Graphs and statistics

Official Document:
Flower - Celery monitoring tool

Installation:

$ pip install flower

Usage:

http://localhost:5555

Update:
This has issue with versioning, flower (version=0.9.7) works only with celery (version=4.4.7) more over when you install flower, it uninstalls your higher version of celery into 4.4.7 and this never works for registered tasks

So要识趣 2025-01-05 11:17:31

返回task_id(从.delay()给出)并随后向celery实例询问状态:

x = method.delay(1,2)
print x.task_id

询问时,使用此task_id获取新的AsyncResult:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()

Return the task_id (which is given from .delay()) and ask the celery instance afterwards about the state:

x = method.delay(1,2)
print x.task_id

When asking, get a new AsyncResult using this task_id:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
冷心人i 2025-01-05 11:17:31

从任务 ID 创建 AsyncResult 对象常见问题解答获取任务当你唯一拥有的就是任务时的状态 ID。

然而,从 Celery 3.x 开始,有一些重要的警告,如果不注意的话可能会咬人。这实际上取决于具体的用例场景。

默认情况下,Celery 不记录“运行”状态。

为了让 Celery 记录任务正在运行,您必须设置 task_track_startedTrue。下面是一个测试此功能的简单任务:

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

当选项 task_track_startedFalse(默认值)时,状态显示为 PENDING,即使任务已经开始。如果您将 task_track_started 设置为 True,则状态将为 STARTED。您可以覆盖特定任务的默认设置:

@app.task(bind=True, track_started=True)
def test(self):
    print self.AsyncResult(self.request.id).state

或者更改 settings.py 中的全局设置:

CELERY_TRACK_STARTED=True

状态 PENDING 表示“我不知道”。

状态为 PENDINGAsyncResult 仅仅意味着 Celery 不知道任务的状态。这可能是由于多种原因造成的。

一方面,AsyncResult 可以使用无效的任务 ID 来构造。这样的“任务”将被 Celery 视为待处理:

>>> task.AsyncResult("invalid").status
'PENDING'

好吧,所以没有人会向 AsyncResult 提供显然无效的 id。这很公平,但它还具有以下效果:AsyncResult 也会将已成功运行但 Celery 忘记为 PENDING 的任务视为PENDING同样,在某些用例场景这可能是一个问题。部分问题取决于如何配置 Celery 来保存任务结果,因为它取决于结果后端中“墓碑”的可用性。 (“Tombstones”是 Celery 文档中用于记录任务如何结束的数据块的术语。)如果 task_ignore_result正确。更令人烦恼的问题是 Celery 默认会使墓碑过期。 result_expires默认设置为 24 小时。因此,如果您启动一个任务,并将 ID 记录在长期存储中,24 小时后,您用它创建一个 AsyncResult,状态将为 PENDING

所有“实际任务”都以 PENDING 状态开始。因此,在任务上获得 PENDING 可能意味着该任务已被请求,但从未取得比这更进一步的进展(无论出于何种原因)。或者这可能意味着任务已运行,但 Celery 忘记了它的状态。

哎哟! AsyncResult 对我不起作用。我还能做什么?

我更喜欢跟踪目标而不是跟踪任务本身。我确实保留了一些任务信息,但这对于跟踪目标来说确实是次要的。目标存储在独立于 Celery 的存储中。当请求需要执行依赖于已实现的某个目标的计算时,它会检查该目标是否已经实现,如果是,则使用这个缓存的目标,否则,它启动会影响该目标的任务,并发送向发出 HTTP 请求的客户端发送一个响应,指示它应该等待结果。


上面的变量名称和超链接适用于 Celery 4.x。在 3.x 中,相应的变量和超链接为: CELERY_TRACK_STARTEDCELERY_IGNORE_RESULTCELERY_TASK_RESULT_EXPIRES

Creating an AsyncResult object from the task id is the way recommended in the FAQ to obtain the task status when the only thing you have is the task id.

However, as of Celery 3.x, there are significant caveats that could bite people if they do not pay attention to them. It really depends on the specific use-case scenario.

By default, Celery does not record a "running" state.

In order for Celery to record that a task is running, you must set task_track_started to True. Here is a simple task that tests this:

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

When the option task_track_started is False, which is the default, the state show is PENDING even though the task has started. If you set task_track_started to True, then the state will be STARTED. You can either overwrite the default on a specific task:

@app.task(bind=True, track_started=True)
def test(self):
    print self.AsyncResult(self.request.id).state

or change the global setting in settings.py:

CELERY_TRACK_STARTED=True

The state PENDING means "I don't know."

An AsyncResult with the state PENDING does not mean anything more than that Celery does not know the status of the task. This could be because of any number of reasons.

For one thing, AsyncResult can be constructed with invalid task ids. Such "tasks" will be deemed pending by Celery:

>>> task.AsyncResult("invalid").status
'PENDING'

Ok, so nobody is going to feed obviously invalid ids to AsyncResult. Fair enough, but it also has for effect that AsyncResult will also consider a task that has successfully run but that Celery has forgotten as being PENDING. Again, in some use-case scenarios this can be a problem. Part of the issue hinges on how Celery is configured to keep the results of tasks, because it depends on the availability of the "tombstones" in the results backend. ("Tombstones" is the term use in the Celery documentation for the data chunks that record how the task ended.) Using AsyncResult won't work at all if task_ignore_result is True. A more vexing problem is that Celery expires the tombstones by default. The result_expires setting by default is set to 24 hours. So if you launch a task, and record the id in long-term storage, and more 24 hours later, you create an AsyncResult with it, the status will be PENDING.

All "real tasks" start in the PENDING state. So getting PENDING on a task could mean that the task was requested but never progressed further than this (for whatever reason). Or it could mean the task ran but Celery forgot its state.

Ouch! AsyncResult won't work for me. What else can I do?

I prefer to keep track of goals than keep track of the tasks themselves. I do keep some task information but it is really secondary to keeping track of the goals. The goals are stored in storage independent from Celery. When a request needs to perform a computation depends on some goal having been achieved, it checks whether the goal has already been achieved, if yes, then it uses this cached goal, otherwise, it starts the task that will affect the goal, and sends to the client that made the HTTP request a response that indicates it should wait for a result.


The variable names and hyperlinks above are for Celery 4.x. In 3.x the corresponding variables and hyperlinks are: CELERY_TRACK_STARTED, CELERY_IGNORE_RESULT, CELERY_TASK_RESULT_EXPIRES.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文