Celery period_task 并行运行多次
我有一些使用 Celery 线程的非常简单的周期代码;它只是打印“Pre”和“Post”并在中间休眠。它改编自这个StackOverflow问题和此链接网站
from celery.task import task
from celery.task import periodic_task
from django.core.cache import cache
from time import sleep
import main
import cutout_score
from threading import Lock
import socket
from datetime import timedelta
from celery.decorators import task, periodic_task
def single_instance_task(timeout):
def task_exc(func):
def wrapper(*args, **kwargs):
lock_id = "celery-single-instance-" + func.__name__
acquire_lock = lambda: cache.add(lock_id, "true", timeout)
release_lock = lambda: cache.delete(lock_id)
if acquire_lock():
try:
func()
finally:
release_lock()
return wrapper
return task_exc
LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
@periodic_task(run_every = timedelta(seconds=2))
def test():
lock_id = "lock"
# cache.add fails if if the key already exists
acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
release_lock = lambda: cache.delete(lock_id)
if acquire_lock():
try:
print 'pre'
sleep(20)
print 'post'
finally:
release_lock()
return
print 'already in use...'
此代码从不打印'already in使用...'
;当我使用 @single_instance_task 装饰器时,也会发生同样的现象。
你知道出了什么问题吗?
编辑:我简化了问题,以便它不会写入内存(使用全局或 django 缓存);我仍然没有看到 '已经在使用...'
编辑:当我将以下代码添加到我的 Django settings.py 文件中时(通过更改 https://docs.djangoproject.com/en/dev/topics/cache/一切都按希望进行,但只有当我使用端口 11211(奇怪的是,我的服务器位于端口 8000)
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
'LOCATION': [
'127.0.0.1:11211'
]
}
}
I have some very simple periodic code using Celery's threading; it simply prints "Pre" and "Post" and sleep in between. It is adapted from this StackOverflow question and this linked website
from celery.task import task
from celery.task import periodic_task
from django.core.cache import cache
from time import sleep
import main
import cutout_score
from threading import Lock
import socket
from datetime import timedelta
from celery.decorators import task, periodic_task
def single_instance_task(timeout):
def task_exc(func):
def wrapper(*args, **kwargs):
lock_id = "celery-single-instance-" + func.__name__
acquire_lock = lambda: cache.add(lock_id, "true", timeout)
release_lock = lambda: cache.delete(lock_id)
if acquire_lock():
try:
func()
finally:
release_lock()
return wrapper
return task_exc
LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
@periodic_task(run_every = timedelta(seconds=2))
def test():
lock_id = "lock"
# cache.add fails if if the key already exists
acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
release_lock = lambda: cache.delete(lock_id)
if acquire_lock():
try:
print 'pre'
sleep(20)
print 'post'
finally:
release_lock()
return
print 'already in use...'
This code never prints 'already in use...'
; the same phenomenon occurs when I use the @single_instance_task
decorator.
Do you know what's wrong?
Edit: I've simplified the question so that it doesn't write to memory (using a global or the django cache); I still never see 'already in use...'
Edit: When I add the following code to my Django settings.py file (by changing the code from https://docs.djangoproject.com/en/dev/topics/cache/ everything works as hoped, but only when I use port 11211 (oddly enough, my server is on port 8000)
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
'LOCATION': [
'127.0.0.1:11211'
]
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
你如何运行 celeryd ?我不熟悉线程选项。
如果它正在运行多进程,那么就没有在工作人员之间共享内存的“全局”变量。
如果您希望在所有工作人员之间共享计数器,那么我建议您使用
cache.incr
。例如:
更新
如果您通过睡眠强制任务重叠,会发生什么,例如:
如果您没有因更频繁地运行此任务而超过 20 秒而得到“已在使用中...”,那么您就知道缓存的行为不符合预期。
另一个更新
您是否在 django 设置中设置了缓存后端?例如 memcached
如果没有,您可能正在使用 虚拟缓存< /a>,实际上不执行任何缓存,只是实现接口...这听起来像是问题的令人信服的原因。
How are you running celeryd? I'm not familiar with a threaded option.
If it's running multi-process then there are no "global" variables that are shared memory between workers.
If you want a counter shared between all workers, then I'd suggest you use
cache.incr
.E.g.:
Update
What happens if you force your tasks to overlap by sleeping, e.g.:
If you don't get "already in use..." from running this more frequently then 20 seconds then you know that the cache isn't behaving as expected.
Another Update
Have you set up a cache backend in your django settings? E.g. memcached
If not you may be using the Dummy Cache, which doesn't actually do any caching, just implements the interface... which is sounding like a convincing cause of your problem.