运行“独特”芹菜任务
我使用 celery 来更新我的新闻聚合网站中的 RSS 提要。我为每个提要使用一个@task,并且一切似乎运行良好。
有一个细节我不确定处理得很好:所有提要都使用 @periodic_task 每分钟更新一次,但是如果在启动新任务时提要仍在从上一个定期任务更新怎么办? (例如,如果提要真的很慢,或者离线并且任务被保存在重试循环中)
当前我存储任务结果并检查它们的状态,如下所示:
import socket
from datetime import timedelta
from celery.decorators import task, periodic_task
from aggregator.models import Feed
_results = {}
@periodic_task(run_every=timedelta(minutes=1))
def fetch_articles():
for feed in Feed.objects.all():
if feed.pk in _results:
if not _results[feed.pk].ready():
# The task is not finished yet
continue
_results[feed.pk] = update_feed.delay(feed)
@task()
def update_feed(feed):
try:
feed.fetch_articles()
except socket.error, exc:
update_feed.retry(args=[feed], exc=exc)
也许有一种更复杂/强大的方法可以使用一些方法来实现相同的结果我错过了芹菜机制?
I use celery to update RSS feeds in my news aggregation site. I use one @task for each feed, and things seem to work nicely.
There's a detail that I'm not sure to handle well though: all feeds are updated once every minute with a @periodic_task, but what if a feed is still updating from the last periodic task when a new one is started ? (for example if the feed is really slow, or offline and the task is held in a retry loop)
Currently I store tasks results and check their status like this:
import socket
from datetime import timedelta
from celery.decorators import task, periodic_task
from aggregator.models import Feed
_results = {}
@periodic_task(run_every=timedelta(minutes=1))
def fetch_articles():
for feed in Feed.objects.all():
if feed.pk in _results:
if not _results[feed.pk].ready():
# The task is not finished yet
continue
_results[feed.pk] = update_feed.delay(feed)
@task()
def update_feed(feed):
try:
feed.fetch_articles()
except socket.error, exc:
update_feed.retry(args=[feed], exc=exc)
Maybe there is a more sophisticated/robust way of achieving the same result using some celery mechanism that I missed ?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(7)
根据 MattH 的回答,您可以使用这样的装饰器:
然后,像这样使用它......
Based on MattH's answer, you could use a decorator like this:
then, use it like so...
来自官方文档: 确保一次只执行一个任务。
From the official documentation: Ensuring a task is only executed one at a time.
使用 https://pypi.python.org/pypi/celery_once 似乎做得很好,包括报告错误和测试某些参数的唯一性。
您可以执行以下操作:
只需在项目中进行以下设置:
Using https://pypi.python.org/pypi/celery_once seems to do the job really nice, including reporting errors and testing against some parameters for uniqueness.
You can do things like:
which just needs the following settings in your project:
如果您正在寻找不使用 Django 的示例,则 尝试这个示例(警告:使用 Redis,我已经在使用它)。
装饰器代码如下(完全归功于文章作者,去阅读一下)
If you're looking for an example that doesn't use Django, then try this example (caveat: uses Redis instead, which I was already using).
The decorator code is as follows (full credit to the author of the article, go read it)
我想知道为什么没有人提到使用 celery.app.control.inspect().active() 获取当前正在运行的任务列表。难道不是实时的吗?因为否则它会很容易实现,例如:
然后只需将装饰器应用到相应的任务即可:
I was wondering why nobody mentioned using celery.app.control.inspect().active() to get the list of the currently running tasks. Is it not real time? Because otherwise it would be very easy to implement, for instance:
And then just applying the decorator to the corresponding tasks:
此解决方案适用于在并发性大于 1 的单主机上工作的 celery。其他类型(没有像 redis 之类的依赖项)的基于文件的锁差异不适用于并发性大于 1 的情况。
This solution for celery working at single host with concurency greater 1. Other kinds (without dependencies like redis) of locks difference file-based don't work with concurrency greater 1.
我以稍微不同的方式解决了这个问题,方法是覆盖 Task 类的 before_start motoda 并检查是否已经有一个具有该名称的任务正在运行,如果有,则新的任务任务被撤销。
I solved this problem in a slightly different way by overwriting the before_start motoda of the Task class and checking if there is already a task with that name running, if so, the new task is revoked.