运行“独特”芹菜任务

发布于 2024-10-01 02:45:42 字数 893 浏览 3 评论 0原文

我使用 celery 来更新我的新闻聚合网站中的 RSS 提要。我为每个提要使用一个@task,并且一切似乎运行良好。

有一个细节我不确定处理得很好:所有提要都使用 @periodic_task 每分钟更新一次,但是如果在启动新任务时提要仍在从上一个定期任务更新怎么办? (例如,如果提要真的很慢,或者离线并且任务被保存在重试循环中)

当前我存储任务结果并检查它们的状态,如下所示:

import socket
from datetime import timedelta
from celery.decorators import task, periodic_task
from aggregator.models import Feed


_results = {}


@periodic_task(run_every=timedelta(minutes=1))
def fetch_articles():
    for feed in Feed.objects.all():
        if feed.pk in _results:
            if not _results[feed.pk].ready():
                # The task is not finished yet
                continue
        _results[feed.pk] = update_feed.delay(feed)


@task()
def update_feed(feed):
    try:
        feed.fetch_articles()
    except socket.error, exc:
        update_feed.retry(args=[feed], exc=exc)

也许有一种更复杂/强大的方法可以使用一些方法来实现相同的结果我错过了芹菜机制?

I use celery to update RSS feeds in my news aggregation site. I use one @task for each feed, and things seem to work nicely.

There's a detail that I'm not sure to handle well though: all feeds are updated once every minute with a @periodic_task, but what if a feed is still updating from the last periodic task when a new one is started ? (for example if the feed is really slow, or offline and the task is held in a retry loop)

Currently I store tasks results and check their status like this:

import socket
from datetime import timedelta
from celery.decorators import task, periodic_task
from aggregator.models import Feed


_results = {}


@periodic_task(run_every=timedelta(minutes=1))
def fetch_articles():
    for feed in Feed.objects.all():
        if feed.pk in _results:
            if not _results[feed.pk].ready():
                # The task is not finished yet
                continue
        _results[feed.pk] = update_feed.delay(feed)


@task()
def update_feed(feed):
    try:
        feed.fetch_articles()
    except socket.error, exc:
        update_feed.retry(args=[feed], exc=exc)

Maybe there is a more sophisticated/robust way of achieving the same result using some celery mechanism that I missed ?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(7

嘿咻 2024-10-08 02:45:42

根据 MattH 的回答,您可以使用这样的装饰器:

from django.core.cache import cache
import functools

def single_instance_task(timeout):
    def task_exc(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            lock_id = "celery-single-instance-" + func.__name__
            acquire_lock = lambda: cache.add(lock_id, "true", timeout)
            release_lock = lambda: cache.delete(lock_id)
            if acquire_lock():
                try:
                    func(*args, **kwargs)
                finally:
                    release_lock()
        return wrapper
    return task_exc

然后,像这样使用它......

@periodic_task(run_every=timedelta(minutes=1))
@single_instance_task(60*10)
def fetch_articles()
    yada yada...

Based on MattH's answer, you could use a decorator like this:

from django.core.cache import cache
import functools

def single_instance_task(timeout):
    def task_exc(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            lock_id = "celery-single-instance-" + func.__name__
            acquire_lock = lambda: cache.add(lock_id, "true", timeout)
            release_lock = lambda: cache.delete(lock_id)
            if acquire_lock():
                try:
                    func(*args, **kwargs)
                finally:
                    release_lock()
        return wrapper
    return task_exc

then, use it like so...

@periodic_task(run_every=timedelta(minutes=1))
@single_instance_task(60*10)
def fetch_articles()
    yada yada...
镜花水月 2024-10-08 02:45:42

来自官方文档: 确保一次只执行一个任务

From the official documentation: Ensuring a task is only executed one at a time.

最偏执的依靠 2024-10-08 02:45:42

使用 https://pypi.python.org/pypi/celery_once 似乎做得很好,包括报告错误和测试某些参数的唯一性。

您可以执行以下操作:

from celery_once import QueueOnce
from myapp.celery import app
from time import sleep

@app.task(base=QueueOnce, once=dict(keys=('customer_id',)))
def start_billing(customer_id, year, month):
    sleep(30)
    return "Done!"

只需在项目中进行以下设置:

ONCE_REDIS_URL = 'redis://localhost:6379/0'
ONCE_DEFAULT_TIMEOUT = 60 * 60  # remove lock after 1 hour in case it was stale

Using https://pypi.python.org/pypi/celery_once seems to do the job really nice, including reporting errors and testing against some parameters for uniqueness.

You can do things like:

from celery_once import QueueOnce
from myapp.celery import app
from time import sleep

@app.task(base=QueueOnce, once=dict(keys=('customer_id',)))
def start_billing(customer_id, year, month):
    sleep(30)
    return "Done!"

which just needs the following settings in your project:

ONCE_REDIS_URL = 'redis://localhost:6379/0'
ONCE_DEFAULT_TIMEOUT = 60 * 60  # remove lock after 1 hour in case it was stale
迷途知返 2024-10-08 02:45:42

如果您正在寻找不使用 Django 的示例,则 尝试这个示例(警告:使用 Redis,我已经在使用它)。

装饰器代码如下(完全归功于文章作者,去阅读一下)

import redis

REDIS_CLIENT = redis.Redis()

def only_one(function=None, key="", timeout=None):
    """Enforce only one celery task at a time."""

    def _dec(run_func):
        """Decorator."""

        def _caller(*args, **kwargs):
            """Caller."""
            ret_value = None
            have_lock = False
            lock = REDIS_CLIENT.lock(key, timeout=timeout)
            try:
                have_lock = lock.acquire(blocking=False)
                if have_lock:
                    ret_value = run_func(*args, **kwargs)
            finally:
                if have_lock:
                    lock.release()

            return ret_value

        return _caller

    return _dec(function) if function is not None else _dec

If you're looking for an example that doesn't use Django, then try this example (caveat: uses Redis instead, which I was already using).

The decorator code is as follows (full credit to the author of the article, go read it)

import redis

REDIS_CLIENT = redis.Redis()

def only_one(function=None, key="", timeout=None):
    """Enforce only one celery task at a time."""

    def _dec(run_func):
        """Decorator."""

        def _caller(*args, **kwargs):
            """Caller."""
            ret_value = None
            have_lock = False
            lock = REDIS_CLIENT.lock(key, timeout=timeout)
            try:
                have_lock = lock.acquire(blocking=False)
                if have_lock:
                    ret_value = run_func(*args, **kwargs)
            finally:
                if have_lock:
                    lock.release()

            return ret_value

        return _caller

    return _dec(function) if function is not None else _dec
人生百味 2024-10-08 02:45:42

我想知道为什么没有人提到使用 celery.app.control.inspect().active() 获取当前正在运行的任务列表。难道不是实时的吗?因为否则它会很容易实现,例如:

def unique_task(callback,  *decorator_args, **decorator_kwargs):
    """
    Decorator to ensure only one instance of the task is running at once.
    """
    @wraps(callback)
    def _wrapper(celery_task, *args, **kwargs):
        active_queues = task.app.control.inspect().active()
        if active_queues:
            for queue in active_queues:
                for running_task in active_queues[queue]:
                    # Discard the currently running task from the list.
                    if task.name == running_task['name'] and task.request.id != running_task['id']:
                        return f'Task "{callback.__name__}()" cancelled! already running...'

        return callback(celery_task, *args, **kwargs)

    return _wrapper

然后只需将装饰器应用到相应的任务即可:

@celery.task(bind=True)
@unique_task
def my_task(self):
    # task executed once at a time.
    pass

I was wondering why nobody mentioned using celery.app.control.inspect().active() to get the list of the currently running tasks. Is it not real time? Because otherwise it would be very easy to implement, for instance:

def unique_task(callback,  *decorator_args, **decorator_kwargs):
    """
    Decorator to ensure only one instance of the task is running at once.
    """
    @wraps(callback)
    def _wrapper(celery_task, *args, **kwargs):
        active_queues = task.app.control.inspect().active()
        if active_queues:
            for queue in active_queues:
                for running_task in active_queues[queue]:
                    # Discard the currently running task from the list.
                    if task.name == running_task['name'] and task.request.id != running_task['id']:
                        return f'Task "{callback.__name__}()" cancelled! already running...'

        return callback(celery_task, *args, **kwargs)

    return _wrapper

And then just applying the decorator to the corresponding tasks:

@celery.task(bind=True)
@unique_task
def my_task(self):
    # task executed once at a time.
    pass

若有似无的小暗淡 2024-10-08 02:45:42

此解决方案适用于在并发性大于 1 的单主机上工作的 celery。其他类型(没有像 redis 之类的依赖项)的基于文件的锁差异不适用于并发性大于 1 的情况。

class Lock(object):
    def __init__(self, filename):
        self.f = open(filename, 'w')

    def __enter__(self):
        try:
            flock(self.f.fileno(), LOCK_EX | LOCK_NB)
            return True
        except IOError:
            pass
        return False

    def __exit__(self, *args):
        self.f.close()


class SinglePeriodicTask(PeriodicTask):
    abstract = True
    run_every = timedelta(seconds=1)

    def __call__(self, *args, **kwargs):
        lock_filename = join('/tmp',
                             md5(self.name).hexdigest())
        with Lock(lock_filename) as is_locked:
            if is_locked:
                super(SinglePeriodicTask, self).__call__(*args, **kwargs)
            else:
                print 'already working'


class SearchTask(SinglePeriodicTask):
    restart_delay = timedelta(seconds=60)

    def run(self, *args, **kwargs):
        print self.name, 'start', datetime.now()
        sleep(5)
        print self.name, 'end', datetime.now()

This solution for celery working at single host with concurency greater 1. Other kinds (without dependencies like redis) of locks difference file-based don't work with concurrency greater 1.

class Lock(object):
    def __init__(self, filename):
        self.f = open(filename, 'w')

    def __enter__(self):
        try:
            flock(self.f.fileno(), LOCK_EX | LOCK_NB)
            return True
        except IOError:
            pass
        return False

    def __exit__(self, *args):
        self.f.close()


class SinglePeriodicTask(PeriodicTask):
    abstract = True
    run_every = timedelta(seconds=1)

    def __call__(self, *args, **kwargs):
        lock_filename = join('/tmp',
                             md5(self.name).hexdigest())
        with Lock(lock_filename) as is_locked:
            if is_locked:
                super(SinglePeriodicTask, self).__call__(*args, **kwargs)
            else:
                print 'already working'


class SearchTask(SinglePeriodicTask):
    restart_delay = timedelta(seconds=60)

    def run(self, *args, **kwargs):
        print self.name, 'start', datetime.now()
        sleep(5)
        print self.name, 'end', datetime.now()
嘦怹 2024-10-08 02:45:42

我以稍微不同的方式解决了这个问题,方法是覆盖 Task 类的 before_start motoda 并检查是否已经有一个具有该名称的任务正在运行,如果有,则新的任务任务被撤销。

from celery import Celery, Task


def is_task_running(task_name: str | None) -> bool:
    from django_celery_results.models import TaskResult
    return (
        TaskResult.objects.filter(
            task_name=task_name, status__in=["PENDING", "STARTED", "RETRY"]
        ).count()
        > 1
    )


def revoke_task(task_id: str, celery_app):
    celery_app.control.revoke(task_id, terminate=True)


class ExtendedTask(Task):
    def before_start(self, task_id, args, kwargs):
        if is_task_running(self.name):
            revoke_task(task_id, self.app)

app = Celery("app", task_cls=ExtendedTask)

I solved this problem in a slightly different way by overwriting the before_start motoda of the Task class and checking if there is already a task with that name running, if so, the new task is revoked.

from celery import Celery, Task


def is_task_running(task_name: str | None) -> bool:
    from django_celery_results.models import TaskResult
    return (
        TaskResult.objects.filter(
            task_name=task_name, status__in=["PENDING", "STARTED", "RETRY"]
        ).count()
        > 1
    )


def revoke_task(task_id: str, celery_app):
    celery_app.control.revoke(task_id, terminate=True)


class ExtendedTask(Task):
    def before_start(self, task_id, args, kwargs):
        if is_task_running(self.name):
            revoke_task(task_id, self.app)

app = Celery("app", task_cls=ExtendedTask)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文