CELERY_ROUTES - 如何根据任务名称进行路由

发布于 2024-12-11 05:29:17 字数 634 浏览 3 评论 0原文

我试图让 celery 根据任务名称路由任务...基本上,我有名为“worker.some_name”和“web.some_name”的任务,并且我使用两个不同的队列,称为worker和分别是网络。我希望所有工作任务都进入工作队列,反之亦然。目前我有一个像这样的大 CELERY_ROUTES 字典:

CELERY_ROUTES = {
    "web.some_name": {
        "queue": "web"
    },
    "web.some_other_name": {
        "queue": "web"
    },
    etc.... }

但我想要更通用的东西,例如:

CELERY_ROUTES = (MyRouter(), ) 
class MyRouter(object):
    def route_for_task(self, task, args=None, kwargs=None):
        if task.split('.')[0] == "worker":
            return {"queue": "worker"}
        return {"queue": "web"}

但这似乎不起作用。有什么想法吗?谢谢。

I'm trying to get celery to route tasks based on the name of the task... basically, I have tasks that are name 'worker.some_name' and 'web.some_name', and I use two different queues, called worker and web respectively. I would like all worker tasks to go to the worker queue and vice-versa. Currently I have a big CELERY_ROUTES dictionary like this:

CELERY_ROUTES = {
    "web.some_name": {
        "queue": "web"
    },
    "web.some_other_name": {
        "queue": "web"
    },
    etc.... }

But I would like something more generic like:

CELERY_ROUTES = (MyRouter(), ) 
class MyRouter(object):
    def route_for_task(self, task, args=None, kwargs=None):
        if task.split('.')[0] == "worker":
            return {"queue": "worker"}
        return {"queue": "web"}

But this doesn't seem to work. Any ideas? Thanks.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

迷鸟归林 2024-12-18 05:29:17

您必须对 py 文件中定义的任务使用装饰器“@app.task”。

您可以使用 @app.task(queue='queue_name') 路由您的任务

You must have used the decorator "@app.task" for the task you've defined in py file.

You can route your task using @app.task(queue='queue_name')

昨迟人 2024-12-18 05:29:17

您应该能够通过将交换类型从直接更改为主题来完成您想要的操作。通过这种方式,您可以将任务指定为 web.* 或 worker.*

您可以在此处阅读:http://ask.github.com/celery/userguide/routing.html#topic-exchanges

You should be able to do what you want by changing the exchange type from direct to topic. This way you can specify the tasks as web.* or worker.*

You can read up on it here: http://ask.github.com/celery/userguide/routing.html#topic-exchanges

一个人的旅程 2024-12-18 05:29:17

Celery 3.x 默认不支持通配符路由,但你可以自己实现。

这是一个复制粘贴解决方案:

class TaskRouter:
    def __init__(self, routes):
        self.routes = {}
        self.glob_routes = {}

        for glob, queue in routes.items():
            if '*' in glob:
                self.glob_routes[glob] = queue
            else:
                self.routes[glob] = queue

    def route_for_task(self, task, args=None, kwargs=None):
        if task in self.routes:
            return self.routes[task]

        for route in self.glob_routes:
            prefix = route.split('*')[0]
            if task.startswith(prefix):
                return self.glob_routes[route]

        return None # for unknown tasks will be used default queue

用法:

# celery.py
CELERY_ROUTES = {
    'web.*':            'web',
    'web.slow_task':    'slow',
    'worker.*':         'worker',
}

app = Celery('config')
app.config_from_object('django.conf:settings') # Django or your app config

app.conf.update(
    CELERY_ROUTES=(TaskRouter(CELERY_ROUTES),),
)

TaskRouter 的工作原理:

In [2]: CELERY_ROUTES = { 
   ...:     'web.*':            'web', 
   ...:     'web.slow_task':    'slow', 
   ...:     'worker.*':         'worker', 
   ...: }                                                                       

In [3]: router = TaskRouter(CELERY_ROUTES)                                      

In [4]: router.route_for_task('web.blabla')                                     
Out[4]: 'web'

In [5]: router.route_for_task('web.slow_task')                                  
Out[5]: 'slow'

In [6]: router.route_for_task('unknown_task')  # None = default queue                                 

In [7]: router.route_for_task('worker.foo')                                     
Out[7]: 'worker'

In [8]: router.route_for_task('worker.bar')                                     
Out[8]: 'worker'

Wildcard routing is not supported in Celery 3.x by default, but you can implement it yourself.

Here is a copy-paste solution:

class TaskRouter:
    def __init__(self, routes):
        self.routes = {}
        self.glob_routes = {}

        for glob, queue in routes.items():
            if '*' in glob:
                self.glob_routes[glob] = queue
            else:
                self.routes[glob] = queue

    def route_for_task(self, task, args=None, kwargs=None):
        if task in self.routes:
            return self.routes[task]

        for route in self.glob_routes:
            prefix = route.split('*')[0]
            if task.startswith(prefix):
                return self.glob_routes[route]

        return None # for unknown tasks will be used default queue

Usage:

# celery.py
CELERY_ROUTES = {
    'web.*':            'web',
    'web.slow_task':    'slow',
    'worker.*':         'worker',
}

app = Celery('config')
app.config_from_object('django.conf:settings') # Django or your app config

app.conf.update(
    CELERY_ROUTES=(TaskRouter(CELERY_ROUTES),),
)

How the TaskRouter works:

In [2]: CELERY_ROUTES = { 
   ...:     'web.*':            'web', 
   ...:     'web.slow_task':    'slow', 
   ...:     'worker.*':         'worker', 
   ...: }                                                                       

In [3]: router = TaskRouter(CELERY_ROUTES)                                      

In [4]: router.route_for_task('web.blabla')                                     
Out[4]: 'web'

In [5]: router.route_for_task('web.slow_task')                                  
Out[5]: 'slow'

In [6]: router.route_for_task('unknown_task')  # None = default queue                                 

In [7]: router.route_for_task('worker.foo')                                     
Out[7]: 'worker'

In [8]: router.route_for_task('worker.bar')                                     
Out[8]: 'worker'
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文