如何为芹菜和普罗米修斯生成自定义应用指标/监控
我正在尝试在芹菜中生成自定义应用程序指标,并将其拉入普罗米修斯。我正在使用 danihodovic/celery-exporter 导出芹菜任务指标,这些是开箱即用。但是,我找不到一种生成自定义应用程序指标的方法(例如,内部方法调用计数,方法的延迟等)。我已经使用了标准 python prometheus客户端在芹菜工作。
我可以看到芹菜监测与 celery events 但是,发布自定义事件并未显示普罗米修斯出口商中的任何新指标。
代码:
#!/usr/bin/env python3
import os
from celery import Celery
from celery.events import EventDispatcher
from celery.utils.log import get_task_logger
from kombu import Queue
from prometheus_client import Counter
QUEUE_INPUT = 'test_queue'
TASK_NAME = 'test_task'
QUEUE_ARGS = {'x-queue-mode': 'lazy'}
QUEUE_CONN_STR = os.environ['RMQ_MASTER_CONN_STR']
__LOG = get_task_logger(__name__)
# Define our consumer object
app = Celery(TASK_NAME)
# Define routes
app.conf.update({
'broker_url': "{0}".format(QUEUE_CONN_STR),
'task_routes': {"test_celery.test_task": {"queue": QUEUE_INPUT}},
'task_serializer': 'json',
'task_send_sent_event': True,
'worker_send_task_events': True,
'result_serializer': 'json',
"broker_pool_limit": 20,
"task_always_eager": False,
"result_expires": 2,
'worker_enable_remote_control': False,
'worker_prefetch_multiplier': 1
})
app.conf.task_queues = [
Queue(QUEUE_INPUT, queue_arguments=QUEUE_ARGS),
]
c = Counter('test_counter', 'Number of hits')
d = Counter('test_counter_1', 'Number of msgs')
dispatcher = EventDispatcher(app.connection())
print("Hi")
dispatcher.send("C_INIT")
c.inc()
@app.task(name="test_task", bind=True, max_retries=2)
def test_task(self, payload: dict):
d.inc()
dispatcher.send("START_PROCESSING")
print(payload)
dispatcher.send("END_PROCESSING")
I'm trying to generate custom application metrics in celery, and pull them into prometheus. I'm using danihodovic/celery-exporter to export the celery task metrics, and those are working out of the box. However I cannot find a way to generate custom application metrics (e.g. count of internal method calls, latencies of methods, etc.). I've used the standard python prometheus client successfully in a flask application, but I can't make it work in celery.
I can see that celery monitoring is tied with celery events, however publishing custom events does not show any new metrics in the prometheus exporter.
Code:
#!/usr/bin/env python3
import os
from celery import Celery
from celery.events import EventDispatcher
from celery.utils.log import get_task_logger
from kombu import Queue
from prometheus_client import Counter
QUEUE_INPUT = 'test_queue'
TASK_NAME = 'test_task'
QUEUE_ARGS = {'x-queue-mode': 'lazy'}
QUEUE_CONN_STR = os.environ['RMQ_MASTER_CONN_STR']
__LOG = get_task_logger(__name__)
# Define our consumer object
app = Celery(TASK_NAME)
# Define routes
app.conf.update({
'broker_url': "{0}".format(QUEUE_CONN_STR),
'task_routes': {"test_celery.test_task": {"queue": QUEUE_INPUT}},
'task_serializer': 'json',
'task_send_sent_event': True,
'worker_send_task_events': True,
'result_serializer': 'json',
"broker_pool_limit": 20,
"task_always_eager": False,
"result_expires": 2,
'worker_enable_remote_control': False,
'worker_prefetch_multiplier': 1
})
app.conf.task_queues = [
Queue(QUEUE_INPUT, queue_arguments=QUEUE_ARGS),
]
c = Counter('test_counter', 'Number of hits')
d = Counter('test_counter_1', 'Number of msgs')
dispatcher = EventDispatcher(app.connection())
print("Hi")
dispatcher.send("C_INIT")
c.inc()
@app.task(name="test_task", bind=True, max_retries=2)
def test_task(self, payload: dict):
d.inc()
dispatcher.send("START_PROCESSING")
print(payload)
dispatcher.send("END_PROCESSING")
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我求助于使用 prometheus push push Gateway 。仍在寻找芹菜更好的指标框架。
I resorted to using the prometheus push gateway. Still searching for a better metrics framework for celery.