分布式Python:芹菜send_task获取command_invalid
- 上下文
我开发了一个将任务发送到我的计算环境的烧瓶API。 为了使用此功能,您应该向API提出发布请求。 然后,API通过RabbitMQ Broker收到您的请求,对其进行处理并发送必要的数据,这是计算环境将保留的一条消息。 最后,它应该将结果发送回API,
- 这里有些代码
是我的API和我的芹菜应用程序的示例:
#main.py
# Package
import time
from flask import Flask
from flask import request, jsonify, make_response
# Own module
from celery_app import celery_app
# Environment
app = Flask()
# Endpoint
@app.route("/test", methods=["POST"])
def test():
"""
Test route
Returns
-------
Json formatted output
"""
# Do some preprocessing in here
result = celery_app.send_task(f"tasks.Client", args=[1, 2])
while result.state == "PENDING":
time.sleep(0.01)
result = result.get()
if result["sucess"]:
result_code = 200
else:
result_code = 500
output = str(result)
return make_response(
jsonify(
text=output,
code_status=result_code, ),
result_code,
)
# Main thread
if __name__ == "__main__":
app.run()
在另一个文件中,我已经将连接到RabbitMQ队列的芹菜应用程序设置
#celery_app.py
from celery import Celery, Task
celery_app = Celery("my_celery",
broker=f"amqp://{USER}:{PASSWORD}@{HOSTNAME}:{PORT}/{COLLECTION}",
backend="rpc://"
)
celery_app.conf.task_serializer = "pickle"
celery_app.conf.result_serializer = "pickle"
celery_app.conf.accept_content = ["pickle"]
celery_app.conf.broker_connection_max_retries = 5
celery_app.conf.broker_pool_limit = 1
class MyTask(Task):
def run(self, a, b):
return a + b
celery_app.register_task(MyTask())
为运行它,您应该启动:
python3 main.py
不要忘记运行芹菜工人(在其中注册任务后),
然后您可以在其上提出一个帖子请求:
curl -X POST http://localhost:8000/test
- 解决的问题
在此简单的API运行时 ,我在端点上发送请求。 不幸的是,它在4 上失败了1次。
我有2条消息:
- 第一个消息是:
amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more
- 然后,由于超时,我的服务器已经丢失了消息:
File "main.py", line x, in test
result = celery_app.send_task("tasks.Client", args=[1, 2])
amqp.exceptions.InvalidCommand: Channel.close_ok: (503) COMMAND_INVALID - unimplemented method
- 解决此错误
有2个解决方案可以解决此问题
重试直到失败5连续的时间(try / the amqp.exceptions.invalidcommand)< / p>
更改超时值。< / p>
不幸的是,这似乎不是解决它的最佳方法。
你能帮助我吗 ?
问候
PS:
- my_packages:
烧瓶== 2.0.2
python == 3.6
芹菜== 4.4.5
兔子==最新
- Context
I developed a Flask API that sends tasks to my computing environment.
To use this, you should make a post request to the API.
Then, the API received your request, process it and send necessary data, through the RABBITMQ broker, a message to be held by the computing environment.
At the end, it should send the result back to the API
- Some code
Here is an example of my API and my Celery application:
#main.py
# Package
import time
from flask import Flask
from flask import request, jsonify, make_response
# Own module
from celery_app import celery_app
# Environment
app = Flask()
# Endpoint
@app.route("/test", methods=["POST"])
def test():
"""
Test route
Returns
-------
Json formatted output
"""
# Do some preprocessing in here
result = celery_app.send_task(f"tasks.Client", args=[1, 2])
while result.state == "PENDING":
time.sleep(0.01)
result = result.get()
if result["sucess"]:
result_code = 200
else:
result_code = 500
output = str(result)
return make_response(
jsonify(
text=output,
code_status=result_code, ),
result_code,
)
# Main thread
if __name__ == "__main__":
app.run()
In a different file, I have setup my celery application connected to RABBITMQ Queue
#celery_app.py
from celery import Celery, Task
celery_app = Celery("my_celery",
broker=f"amqp://{USER}:{PASSWORD}@{HOSTNAME}:{PORT}/{COLLECTION}",
backend="rpc://"
)
celery_app.conf.task_serializer = "pickle"
celery_app.conf.result_serializer = "pickle"
celery_app.conf.accept_content = ["pickle"]
celery_app.conf.broker_connection_max_retries = 5
celery_app.conf.broker_pool_limit = 1
class MyTask(Task):
def run(self, a, b):
return a + b
celery_app.register_task(MyTask())
To run it, you should launch:
python3 main.py
Do not forget to run the celery worker (after registering tasks in it)
Then you can make a post request on it:
curl -X POST http://localhost:8000/test
- The problem to resolve
When this simple API is running, I am sending request on my endpoint.
Unfortunatly, it fails 1 time on 4.
I have 2 messages:
- The first message is:
amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more
- Then, because of the time out, my server has lost the message so:
File "main.py", line x, in test
result = celery_app.send_task("tasks.Client", args=[1, 2])
amqp.exceptions.InvalidCommand: Channel.close_ok: (503) COMMAND_INVALID - unimplemented method
- Resolve this error
There are 2 solutions to get around this problem
retry to send a tasks until it fails 5 times in a row (try / except amqp.exceptions.InvalidCommand)
change the timeout value.
Unfortunatly, it doesn't seems to be the best ways to solve it.
Can you help me ?
Regards
PS:
- my_packages:
Flask==2.0.2
python==3.6
celery==4.4.5
rabbitmq==latest
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
1。预处理
我将 RabbitMQ版本从最新版本更改为3.8.14 。
然后,我使用time_limit和soft_time_limit设置了芹菜任务超时。
它起作用:)
2。无效
解决此问题,我使用此重新函数。
我设置:
1. PreconditionFailed
I change my RabbitMQ version from latest to 3.8.14.
Then, I set up a celery task timeout using time_limit and soft_time_limit.
And it works :)
2. InvalidCommand
To resolve this problem, I use this retryfunctionnaluity.
I setup: