芹菜和路由

发布于 2024-12-08 18:51:14 字数 777 浏览 3 评论 0原文

我需要在特定的 celeryd 实例上运行一些任务。所以我配置了队列:

celeryconfig.py:

CELERY_QUEUES = {
    'celery': {
        'exchange': 'celery',
        'binding_key': 'celery',
    },
    'import': {
        'exchange': 'import',
        'binding_key': 'import.products',
    },
}

CELERY_ROUTES = {
    'celery_tasks.import_tasks.test': {
        'queue': 'import',
        'routing_key': 'import.products',
    },
}

import_tasks.py:

@task
def test():
    print 'test'

@task(exchange='import', routing_key='import.products')
def test2
    print 'test2'

然后我启动 celeryd:

celeryd -c 2 -l INFO -Q import

并尝试执行该任务。 'test' 执行,但 'test2' 不执行。但我不想在 CELERY_ROUTES 中指定每个导入任务。如何在任务定义中指定哪个队列应执行任务?

I need to run some tasks on the specific celeryd instance. So I configured queues:

celeryconfig.py:

CELERY_QUEUES = {
    'celery': {
        'exchange': 'celery',
        'binding_key': 'celery',
    },
    'import': {
        'exchange': 'import',
        'binding_key': 'import.products',
    },
}

CELERY_ROUTES = {
    'celery_tasks.import_tasks.test': {
        'queue': 'import',
        'routing_key': 'import.products',
    },
}

import_tasks.py:

@task
def test():
    print 'test'

@task(exchange='import', routing_key='import.products')
def test2
    print 'test2'

then I start celeryd:

celeryd -c 2 -l INFO -Q import

And try to execute that tasks. 'test' executes but 'test2' do not. But I don't want to specify every importing task in the CELERY_ROUTES. How can I specify which queue should execute task in the task definition?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

寄居人 2024-12-15 18:51:14

哦,忘了说我使用了 send_task 函数来执行任务。并且该函数不导入任务。它只是将任务名称发送到队列。

不是这样写的:

from celery.execute import send_task

result = send_task(args.task, task_args, task_kwargs)

所以我

from celery import current_app as celery_app, registry as celery_registry

celery_imports = celery_app.conf.get('CELERY_IMPORTS')
if celery_imports:
    for module in celery_imports:
        __import__(module)

task = celery_registry.tasks.get(args.task)
if task:
    result = task.apply_async(task_args, task_kwargs)

Oh, forgot to say that I've used send_task function to execute tasks. And this function doesn't import tasks. It just sends the name of the task to the queue.

So instead of this:

from celery.execute import send_task

result = send_task(args.task, task_args, task_kwargs)

I wrote:

from celery import current_app as celery_app, registry as celery_registry

celery_imports = celery_app.conf.get('CELERY_IMPORTS')
if celery_imports:
    for module in celery_imports:
        __import__(module)

task = celery_registry.tasks.get(args.task)
if task:
    result = task.apply_async(task_args, task_kwargs)
好多鱼好多余 2024-12-15 18:51:14

请参阅 Roman 的解决方案 - http://www.imankulov.name/posts/ celery-for-internal-api.html——按名称访问任务,而且还能够指定队列等,就像导入任务模块一样。

See Roman's solution -- http://www.imankulov.name/posts/celery-for-internal-api.html -- to access tasks by name, but also with ability to specify queues and whatnot as if you imported the task module.

楠木可依 2024-12-15 18:51:14

我找到了几乎让我满意的解决方案:

class CustomRouter(object):
    def route_for_task(self, task, args=None, kwargs=None):
        if task.startswith('celery_tasks.import_tasks'):
            return {'exchange': 'import',
                    'routing_key': 'import.products'}

CELERY_ROUTES = (
    CustomRouter(),
)

问题是现在我不能使用任务名称。

I found solution that almost satisfied me:

class CustomRouter(object):
    def route_for_task(self, task, args=None, kwargs=None):
        if task.startswith('celery_tasks.import_tasks'):
            return {'exchange': 'import',
                    'routing_key': 'import.products'}

CELERY_ROUTES = (
    CustomRouter(),
)

Problem is that now I can't use names for tasks.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文