Tornado celery 集成黑客

发布于 2024-12-17 10:58:40 字数 920 浏览 4 评论 0原文

由于没有人为这篇文章提供解决方案,加上我迫切需要一种解决方法,这是我的情况和一些抽象的解决方案/辩论想法。

我的堆栈:

  1. Tornado
  2. Celery
  3. MongoDB
  4. Redis
  5. RabbitMQ

我的问题:找到一种方法让 Tornado 分派 celery 任务(已解决),然后异步收集结果(有什么想法吗?)。

场景 1:(请求/响应 hack 加 webhook)

  • Tornado 接收(用户)请求,然后在本地内存(或 Redis)中保存一个 { jobID : (user)request} 以记住传播位置响应,并使用 jobID 触发 celery 任务
  • 当 celery 完成任务时,它会在某个 url 处执行 webhook 并告诉 Tornado 该 jobID 已完成(加上结果)
  • Tornado检索(用户)请求并将响应转发给(用户)

这会发生吗?它有什么逻辑吗?

场景 2:(tornado 加长轮询)

  • Tornado 分派 celery 任务并向客户端返回一些主要 json 数据 (jQuery)
  • jQuery 在收到主要 json 后执行一些长轮询,例如,每个x 微秒,龙卷风根据某些数据库标志进行回复。当 celery 任务完成时,该数据库标志设置为 True,然后 jQuery“循环”完成。

这有效率吗?

还有其他想法/方案吗?

Since nobody provided a solution to this post plus the fact that I desperately need a workaround, here is my situation and some abstract solutions/ideas for debate.

My stack:

  1. Tornado
  2. Celery
  3. MongoDB
  4. Redis
  5. RabbitMQ

My problem: Find a way for Tornado to dispatch a celery task ( solved ) and then asynchronously gather the result ( any ideas? ).

Scenario 1: (request/response hack plus webhook)

  • Tornado receives a (user)request, then saves in local memory (or in Redis) a { jobID : (user)request} to remember where to propagate the response, and fires a celery task with jobID
  • When celery completes the task, it performs a webhook at some url and tells tornado that this jobID has finished ( plus the results )
  • Tornado retrieves the (user)request and forwards a response to the (user)

Can this happen? Does it have any logic?

Scenario 2: (tornado plus long-polling)

  • Tornado dispatches the celery task and returns some primary json data to the client (jQuery)
  • jQuery does some long-polling upon receipt of the primary json, say, every x microseconds, and tornado replies according to some database flag. When the celery task completes, this database flag is set to True, then jQuery "loop" is finished.

Is this efficient?

Any other ideas/schemas?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

吐个泡泡 2024-12-24 10:58:40

我的解决方案涉及从龙卷风到芹菜的轮询:

class CeleryHandler(tornado.web.RequestHandlerr):

    @tornado.web.asynchronous
    def get(self):    

        task = yourCeleryTask.delay(**kwargs)

        def check_celery_task():
            if task.ready():
                self.write({'success':True} )
                self.set_header("Content-Type", "application/json")  
                self.finish()
            else:   
                tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)

        tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)

这是 发布关于它。

My solution involves polling from tornado to celery:

class CeleryHandler(tornado.web.RequestHandlerr):

    @tornado.web.asynchronous
    def get(self):    

        task = yourCeleryTask.delay(**kwargs)

        def check_celery_task():
            if task.ready():
                self.write({'success':True} )
                self.set_header("Content-Type", "application/json")  
                self.finish()
            else:   
                tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)

        tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)

Here is post about it.

伴随着你 2024-12-24 10:58:40

这是我们解决问题的方法。由于我们在应用程序中的多个处理程序中查找结果,因此我们将 celery 查找设置为 mixin 类。

这也使得使用tornado.gen模式的代码更具可读性。

from functools import partial

class CeleryResultMixin(object):
    """
    Adds a callback function which could wait for the result asynchronously
    """
    def wait_for_result(self, task, callback):
        if task.ready():
            callback(task.result)
        else:
            # TODO: Is this going to be too demanding on the result backend ?
            # Probably there should be a timeout before each add_callback
            tornado.ioloop.IOLoop.instance().add_callback(
                partial(self.wait_for_result, task, callback)
            )


class ARemoteTaskHandler(CeleryResultMixin, tornado.web.RequestHandler):
    """Execute a task asynchronously over a celery worker.
    Wait for the result without blocking
    When the result is available send it back
    """
    @tornado.web.asynchronous
    @tornado.web.authenticated
    @tornado.gen.engine
    def post(self):
        """Test the provided Magento connection
        """
        task = expensive_task.delay(
            self.get_argument('somearg'),
        )

        result = yield tornado.gen.Task(self.wait_for_result, task)

        self.write({
            'success': True,
            'result': result.some_value
        })
        self.finish()

Here is our solution to the problem. Since we look for result in several handlers in our application we made the celery lookup a mixin class.

This also makes code more readable with the tornado.gen pattern.

from functools import partial

class CeleryResultMixin(object):
    """
    Adds a callback function which could wait for the result asynchronously
    """
    def wait_for_result(self, task, callback):
        if task.ready():
            callback(task.result)
        else:
            # TODO: Is this going to be too demanding on the result backend ?
            # Probably there should be a timeout before each add_callback
            tornado.ioloop.IOLoop.instance().add_callback(
                partial(self.wait_for_result, task, callback)
            )


class ARemoteTaskHandler(CeleryResultMixin, tornado.web.RequestHandler):
    """Execute a task asynchronously over a celery worker.
    Wait for the result without blocking
    When the result is available send it back
    """
    @tornado.web.asynchronous
    @tornado.web.authenticated
    @tornado.gen.engine
    def post(self):
        """Test the provided Magento connection
        """
        task = expensive_task.delay(
            self.get_argument('somearg'),
        )

        result = yield tornado.gen.Task(self.wait_for_result, task)

        self.write({
            'success': True,
            'result': result.some_value
        })
        self.finish()
青朷 2024-12-24 10:58:40

我偶然发现了这个问题,并且反复点击结果后端对我来说看起来并不理想。所以我使用 Unix Sockets 实现了一个类似于场景 1 的 Mixin。

一旦任务完成(准确地说,是链中的下一个任务运行),它就会通知 Tornado,并且只访问结果后端一次。这是链接

I stumbled upon this question and hitting the results backend repeatedly did not look optimal to me. So I implemented a Mixin similar to your Scenario 1 using Unix Sockets.

It notifies Tornado as soon as the task finishes (to be accurate, as soon as next task in chain runs) and only hits results backend once. Here is the link.

吾家有女初长成 2024-12-24 10:58:40

现在, https://github.com/mher/tornado-celery 来救援......

class GenAsyncHandler(web.RequestHandler):
    @asynchronous
    @gen.coroutine
    def get(self):
        response = yield gen.Task(tasks.sleep.apply_async, args=[3])
        self.write(str(response.result))
        self.finish()

Now, https://github.com/mher/tornado-celery comes to rescue...

class GenAsyncHandler(web.RequestHandler):
    @asynchronous
    @gen.coroutine
    def get(self):
        response = yield gen.Task(tasks.sleep.apply_async, args=[3])
        self.write(str(response.result))
        self.finish()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文