芹菜限制每个用户运行的任务数量
我的芹菜任务看起来像这样:
@app.task(name='task_one')
def task_one(user_id, *args, **kwargs):
# Long running task
此任务是在用户提交表单时在view
中创建的,该任务需要大量资源,并且平均需要大约10分钟的时间才能完成。
(views.py)
...
if request.method == 'POST':
task_one.delay(user.id)
...
我想限制task_one
每个用户创建的任务的数量(活动或保留)
到目前为止正在做的事情,是在创建之前对该用户的任务或保留的任务是否有效或保留任务:
def user_created_task(active_tasks, reserved_tasks, user_id):
for task in list(active_tasks.values())[0] + list(reserved_tasks.values())[0]:
if task['name'] == 'task_one' and task['args'][0] == user_id:
# Check if there is a `task_one` task created for the user
return True
return False
def user_tasks_already_running_or_reserved(user_id):
inspect = app.control.inspect()
active_tasks = inspect.active()
reserved_tasks = inspect.reserved()
if active_tasks is None and reserved_tasks is None:
# Celery workers are disconnected
return False
return user_created_task(active_tasks, reserved_tasks, user_id)
(views.py)
...
if request.method == 'POST':
if not user_tasks_already_running_or_reserved(user.id):
task_one.delay(user.id)
...
我想知道是否有一种更有效的方法来执行此操作,而不是在每个用户请求上检查所有工人,也许有一种方法可以在任务运行之前在芹菜上添加此条件,到目前为止我还没有找到文档中的任何内容。
I have a task in Celery that looks like this:
@app.task(name='task_one')
def task_one(user_id, *args, **kwargs):
# Long running task
This task is created in views
every time a user submits a form, the task requires a lot of resources and takes around 10 minutes on average to complete.
(views.py)
...
if request.method == 'POST':
task_one.delay(user.id)
...
I want to limit the number of task_one
tasks created per user to one (either active or reserved)
What I'm doing so far, is checking if there is a task active or reserved for that user before creating the task:
def user_created_task(active_tasks, reserved_tasks, user_id):
for task in list(active_tasks.values())[0] + list(reserved_tasks.values())[0]:
if task['name'] == 'task_one' and task['args'][0] == user_id:
# Check if there is a `task_one` task created for the user
return True
return False
def user_tasks_already_running_or_reserved(user_id):
inspect = app.control.inspect()
active_tasks = inspect.active()
reserved_tasks = inspect.reserved()
if active_tasks is None and reserved_tasks is None:
# Celery workers are disconnected
return False
return user_created_task(active_tasks, reserved_tasks, user_id)
(views.py)
...
if request.method == 'POST':
if not user_tasks_already_running_or_reserved(user.id):
task_one.delay(user.id)
...
I was wondering if there is a more efficient way of doing this, instead of inspecting all the workers on every user request, maybe there's a way of adding this condition on Celery before the task runs, so far I haven't found anything in the documentation.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
您正在描述使用分布式锁定的情况(因为n = 1),但可以更普遍地描述为分布式信号量。粗略地说,这些锁和机制属于芹菜内置的东西。
如评论者所述(HAT提示:@bernhard Vallant),分布式锁的直接实现通常会在数据库中使用类似表的表格或redis rlock/redlocks 。
为了利用一个常见的实现,您可以执行以下操作:
The situation you are describing calls for the use of a distributed lock (because n = 1), but can be more generally described as a distributed semaphore. Roughly speaking, these locks and mechanisms fall outside of what is built into celery.
As mentioned by the commenters (hat tip: @bernhard vallant), a straightforward implementation of a distributed lock would normally utilize something like a table in a database or a redis rlock / redlocks.
In order to utilize one common implementation, you can do the following:
2ps 的答案朝着正确的方向发展,但需要进行几个更正:
.lock
指令将返回false
,不会导致异常。.unlock
指令需要接收锁定对象,而不是锁定对象或ID源: https://github.com/spscommerce/redlock-py
?芹菜和 django 。
这对我有用:
2ps's answer goes in the right direction, but it needs a couple of corrections:
.lock
directive will returnFalse
, not cause an exception..unlock
directive needs to receive the lock object, not the lock name or idSource: https://github.com/SPSCommerce/redlock-py?tab=readme-ov-file
I'm using Celery and django.
This is what worked for me: