Celery 响应显示在 Flask 应用程序中

发布于 2025-01-12 01:12:40 字数 1465 浏览 1 评论 0原文

Views.py

class UserCreate(Resource):
        def post(self):
            # try:
             celery=create_user_employee.delay(auth_header, form_data, employee_id, employee_response)
                    # from celery import current_task
                    if "exc" in celery.get:
                        # print(celery)
                        return deliver_response(False, 'Celery task failed'), 400                    
                    return deliver_response(True, message="User Creation in Progress", status=200, data=data), 200

Task.py

@celery.task(name='accounts.tasks.create_user_employee')
def create_user_employee(auth_header, form_data, employee_id, employee_response):
    try:
        # add_employee(form_data, employee_id, eid)
        if "streetLane" in form_data:
            user_id=form_data.get("personalEmail","")
            employee_address=address_post__(form_data, auth_header, user_id)
        return deliver_response(True,message="success",status=200),200
    except Exception as e:
            return deliver_response(False, str(e), status=500), 500
    

注意:我无法从task.py返回对flask应用程序的响应,这里的目标是,如果task.py有任何错误响应,我需要中断views.py函数但结果是无法在views.py中导入或打印任何帮助都会很棒。 ...................................................... ...................................................... ...................................................... ................................

Views.py

class UserCreate(Resource):
        def post(self):
            # try:
             celery=create_user_employee.delay(auth_header, form_data, employee_id, employee_response)
                    # from celery import current_task
                    if "exc" in celery.get:
                        # print(celery)
                        return deliver_response(False, 'Celery task failed'), 400                    
                    return deliver_response(True, message="User Creation in Progress", status=200, data=data), 200

Task.py

@celery.task(name='accounts.tasks.create_user_employee')
def create_user_employee(auth_header, form_data, employee_id, employee_response):
    try:
        # add_employee(form_data, employee_id, eid)
        if "streetLane" in form_data:
            user_id=form_data.get("personalEmail","")
            employee_address=address_post__(form_data, auth_header, user_id)
        return deliver_response(True,message="success",status=200),200
    except Exception as e:
            return deliver_response(False, str(e), status=500), 500
    

Note:I am not able to return the response to flask app from tasks.py the objective here is that i need to break the views.py func if there is any error response from tasks.py but the result is not bein able to import or print in views.py any help would be great.
...................................................................................................................................................................................

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

花心好男孩 2025-01-19 01:12:40

由于 celery 进程是一个异步进程,因此您不会立即得到 celery 作业的响应。
所以这段代码

celery=create_user_employee.delay(auth_header, form_data, employee_id, employee_response)

不会给你任务的结果。因此 celery 变量不会提供任务完成的任何有意义的信息。也不要使用celery作为变量名。
因此,要获取 celery 任务的结果,

from celery import result
res = result.AsyncResult(job_id)

res 变量具有 res.statusres.result 来获取任务的状态和结果 job_id 任务。您可以在调用 celery 任务时设置自己的作业 id。
何时获取这些结果取决于您。要么编写一个定期查看作业状态的递归函数,要么在需要结果时尝试查看作业状态。

As the celery process is a Async process, you will not get immediate response from celery job.
So this code

celery=create_user_employee.delay(auth_header, form_data, employee_id, employee_response)

doesn't give you result of the task. So celery variable doesn't give any meaningful info of task getting completed. Also don't use celery as a variable name.
So to fetch results of celery task,

from celery import result
res = result.AsyncResult(job_id)

So res variable has res.status and res.result to fetch status of task and result of job_id task. You can set your own job id when you call celery task.
When to fetch those results is upto you. Either write a recursive function that sees the status of job periodically or try to see job status whenever the result is required.

谜兔 2025-01-19 01:12:40

我希望在 if "exc" in celery.get: ... 的地方抛出一个错误 TypeError: argument of type 'function' is not iterable< /代码>。

您是否尝试稍微更改该行并在 celery.get(): 中有 if "exc": (注意括号)?

另外,将delay()的结果命名为“celery”是一种误导——任何阅读你的代码的人都可能认为这是Celery类的一个实例,但事实并非如此。这是 AsyncResult 类型的实例。 获取() 只是其方法之一。

I would expect an Error to be thrown at the place where you have if "exc" in celery.get: ... something like TypeError: argument of type 'function' is not iterable.

Did you try to slightly change that line and have if "exc" in celery.get(): (notice the parenthesis)?

Also, giving a result of delay() a name "celery" is misleading - whoever reads your code may think that is an instance of the Celery class, which is not. That is an instance of the AsyncResult type. get() is just one of its methods.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文