Dagster/Python 中编排的监控/警报,如何记录跨度?

发布于 2025-01-16 05:07:21 字数 1972 浏览 4 评论 0原文

我正在编写一个在 Dagster 中编排的数据处理管道,并且我想添加监控/警报。

为了简化用例,我们处理数千个小数据,并且每一个数据都可以由 4-5 个不同的主要工作流程之一进行处理。我想跟踪每条数据得到完全处理所需的时间,并在任何数据花费> 时发出警报。 1小时。我想跟踪每个工作流程处理多少数据,并在任何数据与其正常值相差太远时发出警报。

我遇到的挑战是 OpenTelemetry 期望使用上下文管理器来识别跨度

with tracer.start_as_current_span("span-name") as span:
    # do some work

但是,我的管道工作被分解为多个 Python 函数,并且 Dagster 编排框架将它们连接在一起。在生产中,@op 将在单独的 Kubernetes 节点上运行。 这是一个示例

@op(
   out=DynamicOut(str),
)
def find_small_data_sets(context):
    """Starts 1 dataset going through the pipeline."""
    datasets = db.list_some_things()
    for dataset in datasets:
        yield DynamicOutput(value=data)


@op
def process_data_part_one(data: str) -> str:
    pass # Do some work on one of the data sets.


@op
def process_data_part_two(data: str) -> int:
    # Do more work on a data set.
    # conceptually would be part of the same span
    # as process_data_part_one


@op
def workflow_done(outputs: List[int]) -> int:
    # Finish up the workflow. Here is where a workflow-level
    # span might end.
    return sum(sizes)


@job
def do_full_orchestrated_job():
    """This function defines the DAG structure.

    It does not perform the actual runtime execution of my job
    when it gets called.
    """
    datasets = find_small_data_sets()
    processed_datasets = (
        datasets
        .map(process_data_part_one)
        .map(process_data_part_two)
    )
    workflow_done(processed_datasets.collect())

:我无权访问 Dagster 编排框架本身,有没有办法使用 OpenTelemetry 来监控我的管道?不同函数中的开始和结束跨度(没有上下文管理器)是否可能,特别是如果开始和结束实际上在不同的 CPU 上运行?或者有没有更好的工具来进行这种监控/警报?

感谢您的阅读!

I'm writing a data processing pipeline orchestrated in Dagster, and I'd like to add monitoring/alerting.

To simplify a use case, we process a few thousand small pieces of data, and each one could be processed by one of 4-5 different major workflows. I'd like to track the time it takes each piece of data to get fully processed and alert if any one takes > 1h. And I'd like to track how many pieces of data each workflow process and alert if any is too far from its normal value.

The challenge I'm coming up against is that OpenTelemetry expects spans be identified with context managers:

with tracer.start_as_current_span("span-name") as span:
    # do some work

However, my pipeline work is broken up into multiple Python functions, and the Dagster orchestration framework ties them together. In production the @ops will be run on separate Kubernetes nodes. Here's an example:

@op(
   out=DynamicOut(str),
)
def find_small_data_sets(context):
    """Starts 1 dataset going through the pipeline."""
    datasets = db.list_some_things()
    for dataset in datasets:
        yield DynamicOutput(value=data)


@op
def process_data_part_one(data: str) -> str:
    pass # Do some work on one of the data sets.


@op
def process_data_part_two(data: str) -> int:
    # Do more work on a data set.
    # conceptually would be part of the same span
    # as process_data_part_one


@op
def workflow_done(outputs: List[int]) -> int:
    # Finish up the workflow. Here is where a workflow-level
    # span might end.
    return sum(sizes)


@job
def do_full_orchestrated_job():
    """This function defines the DAG structure.

    It does not perform the actual runtime execution of my job
    when it gets called.
    """
    datasets = find_small_data_sets()
    processed_datasets = (
        datasets
        .map(process_data_part_one)
        .map(process_data_part_two)
    )
    workflow_done(processed_datasets.collect())

Given I don't have access to instrument the Dagster orchestration framework itself, is there a way I can use OpenTelemetry to monitor my pipeline? Would starting and ending spans in different functions (without a context manager) be possible, especially if the start and end are actually running on different CPUs? Or is there a better tool for this kind of monitoring/alerting?

Thanks for reading!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

醉殇 2025-01-23 05:07:22

我最终使用了自定义跟踪传播,以便 Dagster 作业有一个共享跟踪,然后每个操作都有自己的跨度。所以我没有(反)序列化跨度,只是跟踪。请参阅概念传播文档Python 传播文档

Python 跟踪上下文传播的一个简单示例:

from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator


def get_trace_context():
    carrier = {}
    TraceContextTextMapPropagator().inject(carrier)
    return carrier


def set_trace_context(carrier):
    """Sets the current trace context from the given propagated trace context carrier.    
    """
    ctx = TraceContextTextMapPropagator().extract(carrier)
    context.attach(ctx)

特别是对于 Dagster,没有内置方法可以作为存储跟踪上下文的绝佳工具。为每个操作执行上下文(例如具有默认多进程执行器的每个进程)重新初始化资源,因此资源无法创建为所有操作共享的跟踪上下文;你最终会得到可能不同的痕迹。因此,我构建了一些自定义的东西,根据运行 ID(或父运行 ID)存储和获取跟踪上下文。以下是遍历运行 ID 祖先的片段:

def _get_run_id_and_ancestors(context):
    """Returns a list starting with the current run's ID and then any ancestor runs.
    """
    run_id = context.run_id
    run_ids = [run_id]
    while True:
        run = context.instance.get_run_by_id(run_id)
        if run.parent_run_id:
            run_id = run.parent_run_id
            run_ids.append(run_id)
        else:
            break
    return run_ids

I ended up using custom trace propagation so that the Dagster job had one shared trace, and then each op had its own span. So I didn't (de)serialize spans, just the trace. See conceptual propagation docs and Python propagation docs.

A simple example of Python trace context propagation:

from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator


def get_trace_context():
    carrier = {}
    TraceContextTextMapPropagator().inject(carrier)
    return carrier


def set_trace_context(carrier):
    """Sets the current trace context from the given propagated trace context carrier.    
    """
    ctx = TraceContextTextMapPropagator().extract(carrier)
    context.attach(ctx)

For Dagster in particular there's no builtin way that's a great tool for storing trace contexts. Resources are re-initialized for each op execution context (such as each process with the default multiprocess executor) so a resource can't create a trace context that would be shared for all ops; you would end up with may different traces. So I built something custom that stores and fetches the trace context based on the run ID (or parent run ID). Here's a snippet for walking the run ID ancestry:

def _get_run_id_and_ancestors(context):
    """Returns a list starting with the current run's ID and then any ancestor runs.
    """
    run_id = context.run_id
    run_ids = [run_id]
    while True:
        run = context.instance.get_run_by_id(run_id)
        if run.parent_run_id:
            run_id = run.parent_run_id
            run_ids.append(run_id)
        else:
            break
    return run_ids
血之狂魔 2025-01-23 05:07:22

使用跨度时不必使用上下文管理器。您可以在不同的函数中开始跨度并在另一个函数中结束它。下面是一些如何做到这一点的示例片段。

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    BatchSpanProcessor,
    ConsoleSpanExporter,
)

provider = TracerProvider()
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)


tracer = trace.get_tracer(__name__)

# single span start in one func and end in another

def foo(span):
    span.set_attribute("my_custom_attribue", "foo")
    span.end()

def bar():
    span = tracer.start_span("bar")
    foo(span)

# end

# demonstrate `use_span(...)`

def bob(span):
    with trace.use_span(span, end_on_exit=False):
        # using the passed to wrap the internal span for trace detail
        with tracer.start_as_current_span("internal") as internal:
            internal.set_attribute("k", "v")

    # `span`` is still not ended because `end_on_exit` is set to False
    # You may pass around the span to different function before ending

    # ending manually
    span.end()

def alice():
    span = tracer.start_span("alice")
    bob(span)

# end

if __name__ == '__main__':
    bar()
    alice()

It's not necessary that you have to use context managers when working with spans. You can start a span in different function and end it in another function. Below is some sample snippet how you could do that.

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
    BatchSpanProcessor,
    ConsoleSpanExporter,
)

provider = TracerProvider()
processor = BatchSpanProcessor(ConsoleSpanExporter())
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)


tracer = trace.get_tracer(__name__)

# single span start in one func and end in another

def foo(span):
    span.set_attribute("my_custom_attribue", "foo")
    span.end()

def bar():
    span = tracer.start_span("bar")
    foo(span)

# end

# demonstrate `use_span(...)`

def bob(span):
    with trace.use_span(span, end_on_exit=False):
        # using the passed to wrap the internal span for trace detail
        with tracer.start_as_current_span("internal") as internal:
            internal.set_attribute("k", "v")

    # `span`` is still not ended because `end_on_exit` is set to False
    # You may pass around the span to different function before ending

    # ending manually
    span.end()

def alice():
    span = tracer.start_span("alice")
    bob(span)

# end

if __name__ == '__main__':
    bar()
    alice()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文