在flask应用中使用celery任务队列,celery队列无法正常启动

发布于 2022-09-01 06:28:04 字数 1323 浏览 16 评论 0

最近在写一个flask应用,想使用celery做任务队列,就去flask官网上找了样例程序,然后复制到本机上执行了一下,结果celery没有正常启动.
只有一个源文件test.py,rabbitmq已配置并正常启动.
代码如下:

from flask import Flask

from celery import Celery

def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

app = Flask(__name__)
app.config.update(
        CELERY_BROKER_URL='amqp://guest@localhost//',
        CELERY_RESULT_BACKEND='amqp://guest@localhost//'
)

celery = make_celery(app)


@celery.task()
def add_together(a, b):
    return a + b

result = add_together.delay(23, 42)
result.wait()   

差不多就是样例代码直接拷贝下来用了.
然后我在当前目录下执行celery -A test worker --loglevel=infocelery -A test.celery worker --loglevel=info之后阻塞,同时没有任何提示.
我另外打开一个终端,在这个目录下执行python test.py之后发生阻塞.用pdb调试发现阻塞在最后一行result.wait().
请问我的配置过程出现什么问题了?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

蝶舞 2022-09-08 06:28:04
# coding: utf-8

from celery import Celery
from flask import Flask

def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

app = Flask(__name__)
app.config.update(
        CELERY_BROKER_URL='amqp://guest@localhost//',
        CELERY_RESULT_BACKEND='amqp://guest@localhost//'
)

celery = make_celery(app)


@celery.task(name='add_together')
def add_together(a, b):
    return a + b

if __name__ == '__main__':
  result = add_together.delay(23, 42)
  result.wait()   

由于你没有提供错误信息,只能改对之后告诉你了
调用celery -A test.celery worker --loglevel=info启动celery,然后用 python test.py调用脚本

灯下孤影 2022-09-08 06:28:04

最近刚好也在看flask和celery,不过用的是redis。
先启动redis
然后启动celery celery worker -A test.celery --loglevel=info
再启动flask。

你问题里没有描述实际报错,也不好回答你了。可以参考下链接描述

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文