Django 中异步任务和 Redis 的线程安全

发布于 2024-10-29 05:46:50 字数 1936 浏览 0 评论 0原文

我有一个 Django 应用程序,它在查询集上调用异步任务(使用 celery)。该任务获取查询集并执行一大堆操作,根据其中的对象,这些操作可能需要很长时间。对象可以跨查询集共享,因此用户可以在包含已运行对象的查询集上提交任务,并且该新任务应仅在尚未运行的对象上执行,但等待所有对象完成在它返回之前。

我的解释有点令人困惑,所以想象一下下面的代码:

from time import sleep
import redis
from celery.task import Task
from someapp.models import InterestingModel
from someapp.longtime import i_take_a_while

class LongRunningTask(Task):
    def run(self, process_id, *args, **kwargs):
        _queryset = InterestingModel.objects.filter(process__id=process_id)

        r = redis.Redis()
        p = r.pipeline()
        run_check_sets = ('run_check', 'objects_already_running')

        # There must be a better way to do this:
        for o in _queryset.values_list('pk', flat=True):
            p.sadd('run_check')
        p.sdiff(run_check_sets) # Objects that need to be run
        p.sunion(run_check_sets) # Objects that we need to wait for
        p.sunionstore('objects_already_running',run_check_sets)
        p.delete('run_check')
        redis_result = p.execute()

        objects_to_run = redis_result[-3]
        objects_to_wait_for = redis_result[-2]

        if objects_to_run:
            i_take_a_while(objects_to_run)
            p = r.pipeline()
            for o in objects_to_run:
                p.srem('objects_already_running', o)
            p.execute()

        while objects_to_wait_for:
            p = r.pipeline()
            for o in objects_to_wait_for:
                p.sismember('objects_already_running',o)
            redis_result = p.execute()
            objects_to_wait_for = [objects_to_wait_for[i] for i, member in enumerate(redis_result) if member]
            # Probably need to add some sort of timeout here or in redis
            sleep(30) 

我对 Redis 非常陌生,所以我的主要问题是是否有更有效的方法来操作 Redis 以获得相同的结果。更广泛地说,我想知道 Redis 是否是处理这个问题的必要/正确方法。看来应该有更好的方法来将 Django 模型与 Redis 进行交互。最后,我想知道这段代码实际上是否是线程安全的。谁能在我的逻辑中找出漏洞吗?

任何评论表示赞赏。

I have a django application that calls an asynchronous task on a queryset (using celery). The task takes the queryset and performs a whole bunch of operations that could potentially take a very long time based on the obects therein. Objects could be shared across querysets, so a user could submit a task on a queryset that contains objects that are already running, and that new task should should only execute on the objects that aren't yet running, but wait for all objects to complete before it returns.

My explanation is a bit confusing, so imagine the following code:

from time import sleep
import redis
from celery.task import Task
from someapp.models import InterestingModel
from someapp.longtime import i_take_a_while

class LongRunningTask(Task):
    def run(self, process_id, *args, **kwargs):
        _queryset = InterestingModel.objects.filter(process__id=process_id)

        r = redis.Redis()
        p = r.pipeline()
        run_check_sets = ('run_check', 'objects_already_running')

        # There must be a better way to do this:
        for o in _queryset.values_list('pk', flat=True):
            p.sadd('run_check')
        p.sdiff(run_check_sets) # Objects that need to be run
        p.sunion(run_check_sets) # Objects that we need to wait for
        p.sunionstore('objects_already_running',run_check_sets)
        p.delete('run_check')
        redis_result = p.execute()

        objects_to_run = redis_result[-3]
        objects_to_wait_for = redis_result[-2]

        if objects_to_run:
            i_take_a_while(objects_to_run)
            p = r.pipeline()
            for o in objects_to_run:
                p.srem('objects_already_running', o)
            p.execute()

        while objects_to_wait_for:
            p = r.pipeline()
            for o in objects_to_wait_for:
                p.sismember('objects_already_running',o)
            redis_result = p.execute()
            objects_to_wait_for = [objects_to_wait_for[i] for i, member in enumerate(redis_result) if member]
            # Probably need to add some sort of timeout here or in redis
            sleep(30) 

I am extremely new to Redis, so my main question is whether there is a more efficient way to manipulate Redis to achieve the same result. More broadly, I wonder if Redis is necessary/the right approach to dealing with this problem. It seems like there should be a better way to interact Django models with Redis. Finally, I wonder if this code is, in fact, thread safe. Can anyone punch any holes in my logic?

Any commentary is appreciated.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

新一帅帅 2024-11-05 05:46:50

您是否可以稍微不同地设计这个?具体来说,我将启动每个对象的任务,然后将有关长时间运行的作业的信息存储在某处(例如数据库、缓存等)。当每个单独的对象完成时,它将更新长时间运行的作业信息并检查是否所有作业都已返回。如果是这样,那么您可以在长时间运行的任务完成时运行需要运行的任何代码。

这样做的优点是在等待其他事情发生时不会占用服务器上的线程。在客户端,您可以定期检查长时间运行的作业的状态,如果需要,甚至可以使用完成的对象数来更新进度表。

Is it possible for you to architect this slightly differently? Specifically, I would kick off the tasks for each object and then store information about your long running jobs somewhere (e.g., database, cache, etc). When each individual object was finished, it would update the long running job info and check to see if all of the jobs had returned. If so, then you can run whatever code needs to be run when the long running task is complete.

This has the advantage of not tying up a thread on your server while you wait for other things to happen. On the client side, you could check the status of the long running job periodically and even use the number of objects complete to update a progress meter if you want.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文