Celery period_task 并行运行多次

发布于 2024-12-09 15:47:16 字数 2309 浏览 2 评论 0原文

我有一些使用 Celery 线程的非常简单的周期代码;它只是打印“Pre”和“Post”并在中间休眠。它改编自这个StackOverflow问题此链接网站

from celery.task import task
from celery.task import periodic_task
from django.core.cache import cache
from time import sleep
import main
import cutout_score
from threading import Lock

import socket
from datetime import timedelta
from celery.decorators import task, periodic_task

def single_instance_task(timeout):
  def task_exc(func):
    def wrapper(*args, **kwargs):
        lock_id = "celery-single-instance-" + func.__name__
        acquire_lock = lambda: cache.add(lock_id, "true", timeout)
        release_lock = lambda: cache.delete(lock_id)
        if acquire_lock():
            try:
                func()
            finally:
                release_lock()
    return wrapper
  return task_exc

LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
@periodic_task(run_every = timedelta(seconds=2))
def test():
    lock_id = "lock"

    # cache.add fails if if the key already exists
    acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
    # memcache delete is very slow, but we have to use it to take
    # advantage of using add() for atomic locking
    release_lock = lambda: cache.delete(lock_id)

    if acquire_lock():
        try:
            print 'pre'
            sleep(20)
            print 'post'
        finally:
            release_lock()
        return
    print 'already in use...'

此代码从不打印'already in使用...';当我使用 @single_instance_task 装饰器时,也会发生同样的现象。

你知道出了什么问题吗?

编辑:我简化了问题,以便它不会写入内存(使用全局或 django 缓存);我仍然没有看到 '已经在使用...'


编辑:当我将以下代码添加到我的 Django settings.py 文件中时(通过更改 https://docs.djangoproject.com/en/dev/topics/cache/一切都按希望进行,但只有当我使用端口 11211(奇怪的是,我的服务器位于端口 8000)

CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
        'LOCATION': [
            '127.0.0.1:11211'
        ]
    }
}

I have some very simple periodic code using Celery's threading; it simply prints "Pre" and "Post" and sleep in between. It is adapted from this StackOverflow question and this linked website

from celery.task import task
from celery.task import periodic_task
from django.core.cache import cache
from time import sleep
import main
import cutout_score
from threading import Lock

import socket
from datetime import timedelta
from celery.decorators import task, periodic_task

def single_instance_task(timeout):
  def task_exc(func):
    def wrapper(*args, **kwargs):
        lock_id = "celery-single-instance-" + func.__name__
        acquire_lock = lambda: cache.add(lock_id, "true", timeout)
        release_lock = lambda: cache.delete(lock_id)
        if acquire_lock():
            try:
                func()
            finally:
                release_lock()
    return wrapper
  return task_exc

LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
@periodic_task(run_every = timedelta(seconds=2))
def test():
    lock_id = "lock"

    # cache.add fails if if the key already exists
    acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
    # memcache delete is very slow, but we have to use it to take
    # advantage of using add() for atomic locking
    release_lock = lambda: cache.delete(lock_id)

    if acquire_lock():
        try:
            print 'pre'
            sleep(20)
            print 'post'
        finally:
            release_lock()
        return
    print 'already in use...'

This code never prints 'already in use...'; the same phenomenon occurs when I use the @single_instance_task decorator.

Do you know what's wrong?

Edit: I've simplified the question so that it doesn't write to memory (using a global or the django cache); I still never see 'already in use...'


Edit: When I add the following code to my Django settings.py file (by changing the code from https://docs.djangoproject.com/en/dev/topics/cache/ everything works as hoped, but only when I use port 11211 (oddly enough, my server is on port 8000)

CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
        'LOCATION': [
            '127.0.0.1:11211'
        ]
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

公布 2024-12-16 15:47:16

你如何运行 celeryd ?我不熟悉线程选项。

如果它正在运行多进程,那么就没有在工作人员之间共享内存的“全局”变量。

如果您希望在所有工作人员之间共享计数器,那么我建议您使用cache.incr

例如:

In [1]: from django.core.cache import cache

In [2]: cache.set('counter',0)

In [3]: cache.incr('counter')
Out[3]: 1

In [4]: cache.incr('counter')
Out[4]: 2

更新

如果您通过睡眠强制任务重叠,会发生什么,例如:

print "Task on %r started" % (self,)
sleep(20)
print "Task on %r stopped" % (self,)

如果您没有因更频繁地运行此任务而超过 20 秒而得到“已在使用中...”,那么您就知道缓存的行为不符合预期。


另一个更新

您是否在 django 设置中设置了缓存后端?例如 memcached

如果没有,您可能正在使用 虚拟缓存< /a>,实际上不执行任何缓存,只是实现接口...这听起来像是问题的令人信服的原因。

How are you running celeryd? I'm not familiar with a threaded option.

If it's running multi-process then there are no "global" variables that are shared memory between workers.

If you want a counter shared between all workers, then I'd suggest you use cache.incr.

E.g.:

In [1]: from django.core.cache import cache

In [2]: cache.set('counter',0)

In [3]: cache.incr('counter')
Out[3]: 1

In [4]: cache.incr('counter')
Out[4]: 2

Update

What happens if you force your tasks to overlap by sleeping, e.g.:

print "Task on %r started" % (self,)
sleep(20)
print "Task on %r stopped" % (self,)

If you don't get "already in use..." from running this more frequently then 20 seconds then you know that the cache isn't behaving as expected.


Another Update

Have you set up a cache backend in your django settings? E.g. memcached

If not you may be using the Dummy Cache, which doesn't actually do any caching, just implements the interface... which is sounding like a convincing cause of your problem.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文