分布式Python:芹菜send_task获取command_invalid

发布于 2025-01-28 03:50:10 字数 3015 浏览 1 评论 0原文

  1. 上下文

我开发了一个将任务发送到我的计算环境的烧瓶API。 为了使用此功能,您应该向API提出发布请求。 然后,API通过RabbitMQ Broker收到您的请求,对其进行处理并发送必要的数据,这是计算环境将保留的一条消息。 最后,它应该将结果发送回API,

  1. 这里有些代码

是我的API和我的芹菜应用程序的示例:

#main.py

# Package
import time
from flask import Flask
from flask import request, jsonify, make_response

# Own module
from celery_app import celery_app

# Environment
app = Flask()

# Endpoint
@app.route("/test", methods=["POST"])
def test():
    """
    Test route

    Returns
    -------
    Json formatted output
    """

    # Do some preprocessing in here 

    result = celery_app.send_task(f"tasks.Client", args=[1, 2])
    while result.state == "PENDING":
        time.sleep(0.01)
    result = result.get()

    if result["sucess"]:
        result_code = 200
    else:
        result_code = 500
    output = str(result)
    return make_response(
        jsonify(
            text=output,
            code_status=result_code,        ),
        result_code,
    )


# Main thread
if __name__ == "__main__":
    app.run()

在另一个文件中,我已经将连接到RabbitMQ队列的芹菜应用程序设置

#celery_app.py
from celery import Celery, Task

celery_app = Celery("my_celery",
                    broker=f"amqp://{USER}:{PASSWORD}@{HOSTNAME}:{PORT}/{COLLECTION}",
                    backend="rpc://"
)
celery_app.conf.task_serializer = "pickle"
celery_app.conf.result_serializer = "pickle"
celery_app.conf.accept_content = ["pickle"]
celery_app.conf.broker_connection_max_retries = 5
celery_app.conf.broker_pool_limit = 1

class MyTask(Task):
    def run(self, a, b):
        return a + b

celery_app.register_task(MyTask())

为运行它,您应该启动:

python3 main.py

不要忘记运行芹菜工人(在其中注册任务后),

然后您可以在其上提出一个帖子请求:

curl -X POST http://localhost:8000/test
  1. 解决的问题

在此简单的API运行时 ,我在端点上发送请求。 不幸的是,它在4 上失败了1次。

我有2条消息:

  • 第一个消息是:
amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more
  • 然后,由于超时,我的服务器已经丢失了消息:
File "main.py", line x, in test
    result = celery_app.send_task("tasks.Client", args=[1, 2])
amqp.exceptions.InvalidCommand: Channel.close_ok: (503) COMMAND_INVALID - unimplemented method
  1. 解决此错误

有2个解决方案可以解决此问题

  • 重试直到失败5连续的时间(try / the amqp.exceptions.invalidcommand)< / p>

  • 更改超时值。< / p>

不幸的是,这似乎不是解决它的最佳方法。

你能帮助我吗 ?

问候

PS:

  • my_packages:

烧瓶== 2.0.2

python == 3.6

芹菜== 4.4.5

兔子==最新

  1. Context

I developed a Flask API that sends tasks to my computing environment.
To use this, you should make a post request to the API.
Then, the API received your request, process it and send necessary data, through the RABBITMQ broker, a message to be held by the computing environment.
At the end, it should send the result back to the API

  1. Some code

Here is an example of my API and my Celery application:

#main.py

# Package
import time
from flask import Flask
from flask import request, jsonify, make_response

# Own module
from celery_app import celery_app

# Environment
app = Flask()

# Endpoint
@app.route("/test", methods=["POST"])
def test():
    """
    Test route

    Returns
    -------
    Json formatted output
    """

    # Do some preprocessing in here 

    result = celery_app.send_task(f"tasks.Client", args=[1, 2])
    while result.state == "PENDING":
        time.sleep(0.01)
    result = result.get()

    if result["sucess"]:
        result_code = 200
    else:
        result_code = 500
    output = str(result)
    return make_response(
        jsonify(
            text=output,
            code_status=result_code,        ),
        result_code,
    )


# Main thread
if __name__ == "__main__":
    app.run()

In a different file, I have setup my celery application connected to RABBITMQ Queue

#celery_app.py
from celery import Celery, Task

celery_app = Celery("my_celery",
                    broker=f"amqp://{USER}:{PASSWORD}@{HOSTNAME}:{PORT}/{COLLECTION}",
                    backend="rpc://"
)
celery_app.conf.task_serializer = "pickle"
celery_app.conf.result_serializer = "pickle"
celery_app.conf.accept_content = ["pickle"]
celery_app.conf.broker_connection_max_retries = 5
celery_app.conf.broker_pool_limit = 1

class MyTask(Task):
    def run(self, a, b):
        return a + b

celery_app.register_task(MyTask())

To run it, you should launch:

python3 main.py

Do not forget to run the celery worker (after registering tasks in it)

Then you can make a post request on it:

curl -X POST http://localhost:8000/test
  1. The problem to resolve

When this simple API is running, I am sending request on my endpoint.
Unfortunatly, it fails 1 time on 4.

I have 2 messages:

  • The first message is:
amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more
  • Then, because of the time out, my server has lost the message so:
File "main.py", line x, in test
    result = celery_app.send_task("tasks.Client", args=[1, 2])
amqp.exceptions.InvalidCommand: Channel.close_ok: (503) COMMAND_INVALID - unimplemented method
  1. Resolve this error

There are 2 solutions to get around this problem

  • retry to send a tasks until it fails 5 times in a row (try / except amqp.exceptions.InvalidCommand)

  • change the timeout value.

Unfortunatly, it doesn't seems to be the best ways to solve it.

Can you help me ?

Regards

PS:

  • my_packages:

Flask==2.0.2

python==3.6

celery==4.4.5

rabbitmq==latest

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

治碍 2025-02-04 03:50:10

1。预处理

我将 RabbitMQ版本从最新版本更改为3.8.14

然后,我使用time_limit和soft_time_limit设置了芹菜任务超时。

它起作用:)

2。无效

解决此问题,我使用此重新函数。

我设置:

# max_retries=3
# autoretry_for=(InvalidCommand,)

1. PreconditionFailed

I change my RabbitMQ version from latest to 3.8.14.

Then, I set up a celery task timeout using time_limit and soft_time_limit.

And it works :)

2. InvalidCommand

To resolve this problem, I use this retryfunctionnaluity.

I setup:

# max_retries=3
# autoretry_for=(InvalidCommand,)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文