如何为芹菜和普罗米修斯生成自定义应用指标/监控

发布于 2025-02-05 08:04:56 字数 1991 浏览 4 评论 0原文

我正在尝试在芹菜中生成自定义应用程序指标,并将其拉入普罗米修斯。我正在使用 danihodovic/celery-exporter 导出芹菜任务指标,这些是开箱即用。但是,我找不到一种生成自定义应用程序指标的方法(例如,内部方法调用计数,方法的延迟等)。我已经使用了标准 python prometheus客户端在芹菜工作。

我可以看到芹菜监测与 celery events 但是,发布自定义事件并未显示普罗米修斯出口商中的任何新指标。

代码:

#!/usr/bin/env python3
import os

from celery import Celery
from celery.events import EventDispatcher
from celery.utils.log import get_task_logger
from kombu import Queue
from prometheus_client import Counter

QUEUE_INPUT = 'test_queue'
TASK_NAME = 'test_task'
QUEUE_ARGS = {'x-queue-mode': 'lazy'}
QUEUE_CONN_STR = os.environ['RMQ_MASTER_CONN_STR']

__LOG = get_task_logger(__name__)

# Define our consumer object
app = Celery(TASK_NAME)

# Define routes
app.conf.update({
    'broker_url': "{0}".format(QUEUE_CONN_STR),
    'task_routes': {"test_celery.test_task": {"queue": QUEUE_INPUT}},
    'task_serializer': 'json',
    'task_send_sent_event': True,
    'worker_send_task_events': True,
    'result_serializer': 'json',
    "broker_pool_limit": 20,
    "task_always_eager": False,
    "result_expires": 2,
    'worker_enable_remote_control': False,
    'worker_prefetch_multiplier': 1
})

app.conf.task_queues = [
    Queue(QUEUE_INPUT, queue_arguments=QUEUE_ARGS),
]

c = Counter('test_counter', 'Number of hits')
d = Counter('test_counter_1', 'Number of msgs')
dispatcher = EventDispatcher(app.connection())
print("Hi")
dispatcher.send("C_INIT")
c.inc()


@app.task(name="test_task", bind=True, max_retries=2)
def test_task(self, payload: dict):
    d.inc()
    dispatcher.send("START_PROCESSING")
    print(payload)
    dispatcher.send("END_PROCESSING")
    

I'm trying to generate custom application metrics in celery, and pull them into prometheus. I'm using danihodovic/celery-exporter to export the celery task metrics, and those are working out of the box. However I cannot find a way to generate custom application metrics (e.g. count of internal method calls, latencies of methods, etc.). I've used the standard python prometheus client successfully in a flask application, but I can't make it work in celery.

I can see that celery monitoring is tied with celery events, however publishing custom events does not show any new metrics in the prometheus exporter.

Code:

#!/usr/bin/env python3
import os

from celery import Celery
from celery.events import EventDispatcher
from celery.utils.log import get_task_logger
from kombu import Queue
from prometheus_client import Counter

QUEUE_INPUT = 'test_queue'
TASK_NAME = 'test_task'
QUEUE_ARGS = {'x-queue-mode': 'lazy'}
QUEUE_CONN_STR = os.environ['RMQ_MASTER_CONN_STR']

__LOG = get_task_logger(__name__)

# Define our consumer object
app = Celery(TASK_NAME)

# Define routes
app.conf.update({
    'broker_url': "{0}".format(QUEUE_CONN_STR),
    'task_routes': {"test_celery.test_task": {"queue": QUEUE_INPUT}},
    'task_serializer': 'json',
    'task_send_sent_event': True,
    'worker_send_task_events': True,
    'result_serializer': 'json',
    "broker_pool_limit": 20,
    "task_always_eager": False,
    "result_expires": 2,
    'worker_enable_remote_control': False,
    'worker_prefetch_multiplier': 1
})

app.conf.task_queues = [
    Queue(QUEUE_INPUT, queue_arguments=QUEUE_ARGS),
]

c = Counter('test_counter', 'Number of hits')
d = Counter('test_counter_1', 'Number of msgs')
dispatcher = EventDispatcher(app.connection())
print("Hi")
dispatcher.send("C_INIT")
c.inc()


@app.task(name="test_task", bind=True, max_retries=2)
def test_task(self, payload: dict):
    d.inc()
    dispatcher.send("START_PROCESSING")
    print(payload)
    dispatcher.send("END_PROCESSING")
    

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

桃酥萝莉 2025-02-12 08:04:56

我求助于使用 prometheus push push Gateway 。仍在寻找芹菜更好的指标框架。

I resorted to using the prometheus push gateway. Still searching for a better metrics framework for celery.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文