Dagster/Python 中编排的监控/警报,如何记录跨度?
我正在编写一个在 Dagster 中编排的数据处理管道,并且我想添加监控/警报。
为了简化用例,我们处理数千个小数据,并且每一个数据都可以由 4-5 个不同的主要工作流程之一进行处理。我想跟踪每条数据得到完全处理所需的时间,并在任何数据花费> 时发出警报。 1小时。我想跟踪每个工作流程处理多少数据,并在任何数据与其正常值相差太远时发出警报。
我遇到的挑战是 OpenTelemetry 期望使用上下文管理器来识别跨度:
with tracer.start_as_current_span("span-name") as span:
# do some work
但是,我的管道工作被分解为多个 Python 函数,并且 Dagster 编排框架将它们连接在一起。在生产中,@op
将在单独的 Kubernetes 节点上运行。 这是一个示例
@op(
out=DynamicOut(str),
)
def find_small_data_sets(context):
"""Starts 1 dataset going through the pipeline."""
datasets = db.list_some_things()
for dataset in datasets:
yield DynamicOutput(value=data)
@op
def process_data_part_one(data: str) -> str:
pass # Do some work on one of the data sets.
@op
def process_data_part_two(data: str) -> int:
# Do more work on a data set.
# conceptually would be part of the same span
# as process_data_part_one
@op
def workflow_done(outputs: List[int]) -> int:
# Finish up the workflow. Here is where a workflow-level
# span might end.
return sum(sizes)
@job
def do_full_orchestrated_job():
"""This function defines the DAG structure.
It does not perform the actual runtime execution of my job
when it gets called.
"""
datasets = find_small_data_sets()
processed_datasets = (
datasets
.map(process_data_part_one)
.map(process_data_part_two)
)
workflow_done(processed_datasets.collect())
:我无权访问 Dagster 编排框架本身,有没有办法使用 OpenTelemetry 来监控我的管道?不同函数中的开始和结束跨度(没有上下文管理器)是否可能,特别是如果开始和结束实际上在不同的 CPU 上运行?或者有没有更好的工具来进行这种监控/警报?
感谢您的阅读!
I'm writing a data processing pipeline orchestrated in Dagster, and I'd like to add monitoring/alerting.
To simplify a use case, we process a few thousand small pieces of data, and each one could be processed by one of 4-5 different major workflows. I'd like to track the time it takes each piece of data to get fully processed and alert if any one takes > 1h. And I'd like to track how many pieces of data each workflow process and alert if any is too far from its normal value.
The challenge I'm coming up against is that OpenTelemetry expects spans be identified with context managers:
with tracer.start_as_current_span("span-name") as span:
# do some work
However, my pipeline work is broken up into multiple Python functions, and the Dagster orchestration framework ties them together. In production the @op
s will be run on separate Kubernetes nodes. Here's an example:
@op(
out=DynamicOut(str),
)
def find_small_data_sets(context):
"""Starts 1 dataset going through the pipeline."""
datasets = db.list_some_things()
for dataset in datasets:
yield DynamicOutput(value=data)
@op
def process_data_part_one(data: str) -> str:
pass # Do some work on one of the data sets.
@op
def process_data_part_two(data: str) -> int:
# Do more work on a data set.
# conceptually would be part of the same span
# as process_data_part_one
@op
def workflow_done(outputs: List[int]) -> int:
# Finish up the workflow. Here is where a workflow-level
# span might end.
return sum(sizes)
@job
def do_full_orchestrated_job():
"""This function defines the DAG structure.
It does not perform the actual runtime execution of my job
when it gets called.
"""
datasets = find_small_data_sets()
processed_datasets = (
datasets
.map(process_data_part_one)
.map(process_data_part_two)
)
workflow_done(processed_datasets.collect())
Given I don't have access to instrument the Dagster orchestration framework itself, is there a way I can use OpenTelemetry to monitor my pipeline? Would starting and ending spans in different functions (without a context manager) be possible, especially if the start and end are actually running on different CPUs? Or is there a better tool for this kind of monitoring/alerting?
Thanks for reading!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我最终使用了自定义跟踪传播,以便 Dagster 作业有一个共享跟踪,然后每个操作都有自己的跨度。所以我没有(反)序列化跨度,只是跟踪。请参阅概念传播文档和 Python 传播文档。
Python 跟踪上下文传播的一个简单示例:
特别是对于 Dagster,没有内置方法可以作为存储跟踪上下文的绝佳工具。为每个操作执行上下文(例如具有默认多进程执行器的每个进程)重新初始化资源,因此资源无法创建为所有操作共享的跟踪上下文;你最终会得到可能不同的痕迹。因此,我构建了一些自定义的东西,根据运行 ID(或父运行 ID)存储和获取跟踪上下文。以下是遍历运行 ID 祖先的片段:
I ended up using custom trace propagation so that the Dagster job had one shared trace, and then each op had its own span. So I didn't (de)serialize spans, just the trace. See conceptual propagation docs and Python propagation docs.
A simple example of Python trace context propagation:
For Dagster in particular there's no builtin way that's a great tool for storing trace contexts. Resources are re-initialized for each op execution context (such as each process with the default multiprocess executor) so a resource can't create a trace context that would be shared for all ops; you would end up with may different traces. So I built something custom that stores and fetches the trace context based on the run ID (or parent run ID). Here's a snippet for walking the run ID ancestry:
使用跨度时不必使用上下文管理器。您可以在不同的函数中开始跨度并在另一个函数中结束它。下面是一些如何做到这一点的示例片段。
It's not necessary that you have to use context managers when working with spans. You can start a span in different function and end it in another function. Below is some sample snippet how you could do that.