芹菜限制每个用户运行的任务数量

发布于 2025-02-02 12:21:46 字数 1325 浏览 1 评论 0原文

我的芹菜任务看起来像这样:

@app.task(name='task_one')
def task_one(user_id, *args, **kwargs):
    # Long running task

此任务是在用户提交表单时在view中创建的,该任务需要大量资源,并且平均需要大约10分钟的时间才能完成。

(views.py)
...
if request.method == 'POST':
    task_one.delay(user.id)
...

我想限制task_one每个用户创建的任务的数量(活动或保留)

到目前为止正在做的事情,是在创建之前对该用户的任务或保留的任务是否有效或保留任务:

def user_created_task(active_tasks, reserved_tasks, user_id):
  for task in list(active_tasks.values())[0] + list(reserved_tasks.values())[0]:
    if task['name'] == 'task_one' and task['args'][0] == user_id:
      # Check if there is a `task_one` task created for the user
      return True
  
  return False

def user_tasks_already_running_or_reserved(user_id):
  inspect = app.control.inspect()

  active_tasks = inspect.active()
  reserved_tasks = inspect.reserved()

  if active_tasks is None and reserved_tasks is None:
    # Celery workers are disconnected 
    return False

  return user_created_task(active_tasks, reserved_tasks, user_id)


(views.py)
...
if request.method == 'POST':
    if not user_tasks_already_running_or_reserved(user.id):
        task_one.delay(user.id)
...

我想知道是否有一种更有效的方法来执行此操作,而不是在每个用户请求上检查所有工人,也许有一种方法可以在任务运行之前在芹菜上添加此条件,到目前为止我还没有找到文档中的任何内容。

I have a task in Celery that looks like this:

@app.task(name='task_one')
def task_one(user_id, *args, **kwargs):
    # Long running task

This task is created in views every time a user submits a form, the task requires a lot of resources and takes around 10 minutes on average to complete.

(views.py)
...
if request.method == 'POST':
    task_one.delay(user.id)
...

I want to limit the number of task_one tasks created per user to one (either active or reserved)

What I'm doing so far, is checking if there is a task active or reserved for that user before creating the task:

def user_created_task(active_tasks, reserved_tasks, user_id):
  for task in list(active_tasks.values())[0] + list(reserved_tasks.values())[0]:
    if task['name'] == 'task_one' and task['args'][0] == user_id:
      # Check if there is a `task_one` task created for the user
      return True
  
  return False

def user_tasks_already_running_or_reserved(user_id):
  inspect = app.control.inspect()

  active_tasks = inspect.active()
  reserved_tasks = inspect.reserved()

  if active_tasks is None and reserved_tasks is None:
    # Celery workers are disconnected 
    return False

  return user_created_task(active_tasks, reserved_tasks, user_id)


(views.py)
...
if request.method == 'POST':
    if not user_tasks_already_running_or_reserved(user.id):
        task_one.delay(user.id)
...

I was wondering if there is a more efficient way of doing this, instead of inspecting all the workers on every user request, maybe there's a way of adding this condition on Celery before the task runs, so far I haven't found anything in the documentation.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

信仰 2025-02-09 12:21:46

您正在描述使用分布式锁定的情况(因为n = 1),但可以更普遍地描述为分布式信号量。粗略地说,这些锁和机制属于芹菜内置的东西。

如评论者所述(HAT提示:@bernhard Vallant),分布式锁的直接实现通常会在数据库中使用类似表的表格或redis rlock/redlocks

为了利用一个常见的实现,您可以执行以下操作:

from redlock import MultipleRedlockException, Redlock
from django.conf import settings

@app.task(name='task_one', autoretry_for=(MultipleRedlockException, ),  retry_kwargs={'max_retries': 5})
def task_one(user_id, *args, **kwargs):
    # assumes you are using redis for django cache with location
    # set to the redis url
    lock_manager = Redlock([ settings.CACHES['default']['LOCATION'] ])
    lock_name = f'task_one:{user_id}'
    # if the lock fails, we'll get the MultipleRedlockException and trigger
    # celery auto retry
    lock_manager.lock(lock_name, 60 * 60 * 2)  # lock for 2 hours
    try:
        # the main body of what you want to do goes here
        pass
    finally:
        lock_manager.unlock(lock_name)

The situation you are describing calls for the use of a distributed lock (because n = 1), but can be more generally described as a distributed semaphore. Roughly speaking, these locks and mechanisms fall outside of what is built into celery.

As mentioned by the commenters (hat tip: @bernhard vallant), a straightforward implementation of a distributed lock would normally utilize something like a table in a database or a redis rlock / redlocks.

In order to utilize one common implementation, you can do the following:

from redlock import MultipleRedlockException, Redlock
from django.conf import settings

@app.task(name='task_one', autoretry_for=(MultipleRedlockException, ),  retry_kwargs={'max_retries': 5})
def task_one(user_id, *args, **kwargs):
    # assumes you are using redis for django cache with location
    # set to the redis url
    lock_manager = Redlock([ settings.CACHES['default']['LOCATION'] ])
    lock_name = f'task_one:{user_id}'
    # if the lock fails, we'll get the MultipleRedlockException and trigger
    # celery auto retry
    lock_manager.lock(lock_name, 60 * 60 * 2)  # lock for 2 hours
    try:
        # the main body of what you want to do goes here
        pass
    finally:
        lock_manager.unlock(lock_name)

岁月静好 2025-02-09 12:21:46

2ps 的答案朝着正确的方向发展,但需要进行几个更正:

  1. 如果锁定已经存在,则.lock指令将返回false,不会导致异常。
  2. 锁定超时为MS,而不是秒
  3. .unlock指令需要接收锁定对象,而不是锁定对象或ID

源: https://github.com/spscommerce/redlock-py

?芹菜和 django
这对我有用:

from django.conf import settings
from celery import shared_task
from redlock import Redlock

# Set bind=True to access the task from within
@shared_task(bind=True)
def sample_task(self, user_id):
    # 1. Create the log manager
    lock_manager = Redlock(
        [
            {
                "host": settings.REDIS_HOST,
                "port": settings.REDIS_PORT,
                "db": settings.REDIS_DB,
            },
        ]
    )

    # 2. Set the lock id
    lock_id = f"{user_id}"

    # 3. Try to lock the task with the user_id
    lock = lock_manager.lock(lock_id, 1000 * 60 * 10)  # Lock for 10 minutes

    # 4. Handle lock in use
    if not lock:
        # The lock is in use
        self.retry(countdown=30)  # Retry the task after 30 seconds
    
    try:
        # ... your long task ...
        pass
    finally:
        # Use finally to make sure the unlock happens even if
        # task fails.

        if lock:
            lock_manager.unlock(lock)

2ps's answer goes in the right direction, but it needs a couple of corrections:

  1. If the lock is already in place, the .lock directive will return False, not cause an exception.
  2. Lock timeout is in ms, not seconds
  3. The .unlock directive needs to receive the lock object, not the lock name or id

Source: https://github.com/SPSCommerce/redlock-py?tab=readme-ov-file

I'm using Celery and django.
This is what worked for me:

from django.conf import settings
from celery import shared_task
from redlock import Redlock

# Set bind=True to access the task from within
@shared_task(bind=True)
def sample_task(self, user_id):
    # 1. Create the log manager
    lock_manager = Redlock(
        [
            {
                "host": settings.REDIS_HOST,
                "port": settings.REDIS_PORT,
                "db": settings.REDIS_DB,
            },
        ]
    )

    # 2. Set the lock id
    lock_id = f"{user_id}"

    # 3. Try to lock the task with the user_id
    lock = lock_manager.lock(lock_id, 1000 * 60 * 10)  # Lock for 10 minutes

    # 4. Handle lock in use
    if not lock:
        # The lock is in use
        self.retry(countdown=30)  # Retry the task after 30 seconds
    
    try:
        # ... your long task ...
        pass
    finally:
        # Use finally to make sure the unlock happens even if
        # task fails.

        if lock:
            lock_manager.unlock(lock)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文