如何将Exithandler与KubeFlow Pipelines SDK V2一起使用

发布于 2025-01-23 13:06:48 字数 5745 浏览 5 评论 0 原文

我正在尝试将所有kubeflow管道从使用以前的SDK V1( kfp ),转移到新的 pipelines sdk v2 kfp.v2 )。我正在使用版本 1.8.12 。此重构已证明几乎所有代码都成功了,除了仍然存在的 exithandler 来自KFP.V2.DSL导入Exithandler 。似乎是将管道对象编译为 tar.gz -file的一种方法,使用 kfp.compiler.compiler.compiler()。compile(pipeline,'basic_pipeline.tar.gz')< /代码>文件保留某种类型的Argo占位符,而新的 .json 使用 compiler.compiler()。compile(pipeline_func = pipeline,package_path,package_path =“ basic-pipeline.json”) 无法以相同的方式工作。下面,我将详细介绍管道SDK V1中的功能以及如何在V2中实现它。

以前,使用KubeFlow Pipelines V1,我可以使用Exithandler,如当其中一个管道组件失败时,将消息发送给Slack。我将管道定义为

import kfp.dsl as dsl

@dsl.pipeline(
    name='Basic-pipeline'
)
def pipeline(...):
    exit_task = dsl.ContainerOp(
        name='Exit handler that catches errors and post them in Slack',
        image='eu.gcr.io/.../send-error-msg-to-slack',
        arguments=[
                    'python3', 'main.py',
                    '--message', 'Basic-pipeline failed'
                    '--status', "{{workflow.status}}"
                  ]
    )
    with dsl.ExitHandler(exit_task):
        step_1 = dsl.ContainerOp(...)
        step_2 = dsl.ContainerOp(...) \
            .after(step_1)

if __name__ == '__main__':
    import kfp.compiler as compiler
    compiler.Compiler().compile(pipeline, 'basic_pipeline.tar.gz')

exit_task 如果管道的任何步骤失败,则将 Message 发送给我们的懈怠。 exit_task 图像的代码看起来像是

import argparse

def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--message', type=str)
    parser.add_argument('--status', type=str)
    return parser.parse_known_args()

def main(FLAGS):
    def post_to_slack(msg):
        ...

    if FLAGS.status == "Failed":
        post_to_slack(FLAGS.message)
    else:
        pass

if __name__ == '__main__':
    FLAGS, unparsed = get_args()
    main(FLAGS)

有效的,因为基础的Argo Workflow可以某种方式理解“ {{workflow.status}}” notion。

但是,,我现在试图使用顶点AI运行管道,利用KubeFlow Pipelines SDK V2, kfp.v2 。使用与以前相同的退出手机映像,'eu.gcr.io /.../ send-error-msg-to-slack',我现在定义一个yaml组件文件(> exit_handler.yaml )相反,

name: Exit handler
description: Prints to Slack if any step of the pipeline fails

inputs:
  - {name: message, type: String}
  - {name: status, type: String}

implementation:
  container:
    image: eu.gcr.io/.../send-error-msg-to-slack
    command: [
      python3,
      main.py,
      --message, {inputValue: message},
      --status, {inputValue: status}
    ]

管道代码现在看起来像这样,

from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, ExitHandler
from kfp.components import load_component_from_file

@pipeline(name="Basic-pipeline",
          pipeline_root='gs://.../basic-pipeline')
def pipeline():
    exit_handler_spec = load_component_from_file('./exit_handler.yaml')
    exit_handler = exit_handler_spec(
        message="Basic pipeline failed.",
        status="{{workflow.status}}"
    )
    with ExitHandler(exit_handler):
        step_0_spec = load_component_from_file('./comp_0.yaml')
        step0 = step_0_spec(...)

        step_1_spec = load_component_from_file('./comp_1.yaml')
        step1 = step_1_spec(...) \
            .after(step0)

if __name__ == '__main__':
    compiler.Compiler().compile(
        pipeline_func=pipeline,
        package_path="basic-pipeline.json"
    )
    from google.oauth2 import service_account
    credentials = service_account.Credentials.from_service_account_file("./my-key.json")
    aiplatform.init(project='bsg-personalization',
                    location='europe-west4',
                    credentials=credentials)

    job = pipeline_jobs.PipelineJob(
        display_name="basic-pipeline",
        template_path="basic-pipeline.json",
        parameter_values={...}
    )
    job.run()

此“工作”(不例外)要编译和运行,但是Exithandler代码将 status status 将其解释为带有的字符串值{{workflow.status}},这也由从上面的代码生成的编译管道JSON表示( basic-pipeline.json ),您可以在下面看到('String>“ stringvalue” :“ {{workflow.status}}” ):

...
         "exit-handler": {
            "componentRef": {
              "name": "comp-exit-handler"
            },
            "dependentTasks": [
              "exit-handler-1"
            ],
            "inputs": {
              "parameters": {
                "message": {
                  "runtimeValue": {
                    "constantValue": {
                      "stringValue": "Basic pipeline failed."
                    }
                  }
                },
                "status": {
                  "runtimeValue": {
                    "constantValue": {
                      "stringValue": "{{workflow.status}}"
                    }
                  }
                }
              }
            },
            "taskInfo": {
              "name": "exit-handler"
            },
            "triggerPolicy": {
              "strategy": "ALL_UPSTREAM_TASKS_COMPLETED"
            }
          }
...

任何想法我如何使用v1使用v1对新的SDK V2重构我的旧 exithandler 代码>如果我的管道状态失败了,请让出口处理程序理解

I'm trying to move all my Kubeflow Pipelines from using the previous SDK v1 (kfp), to the newer Pipelines SDK v2 (kfp.v2). I'm using version 1.8.12.This refactoring have proved successful for almost all code, except for the ExitHandler, which still exists; from kfp.v2.dsl import ExitHandler. It seems like the previous way of compiling the pipeline object into a tar.gz-file using kfp.compiler.Compiler().compile(pipeline, 'basic_pipeline.tar.gz') file preserved some type of Argo placeholders, while the new .json pipelines using compiler.Compiler().compile(pipeline_func=pipeline, package_path="basic-pipeline.json") doesn't work the same way. Below, I will go into detail what works in Pipelines SDK v1 and how I've tried to implement it in v2.

Previously, using Kubeflow Pipelines v1, I could use an ExitHandler as shown in this StackOverflow question to eg. send a message to Slack when one of the pipeline components failed. I would define the pipeline as

import kfp.dsl as dsl

@dsl.pipeline(
    name='Basic-pipeline'
)
def pipeline(...):
    exit_task = dsl.ContainerOp(
        name='Exit handler that catches errors and post them in Slack',
        image='eu.gcr.io/.../send-error-msg-to-slack',
        arguments=[
                    'python3', 'main.py',
                    '--message', 'Basic-pipeline failed'
                    '--status', "{{workflow.status}}"
                  ]
    )
    with dsl.ExitHandler(exit_task):
        step_1 = dsl.ContainerOp(...)
        step_2 = dsl.ContainerOp(...) \
            .after(step_1)

if __name__ == '__main__':
    import kfp.compiler as compiler
    compiler.Compiler().compile(pipeline, 'basic_pipeline.tar.gz')

where the exit_task would send the message to our Slack if any of the steps of the pipeline failed. The code for the exit_task image looks like

import argparse

def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--message', type=str)
    parser.add_argument('--status', type=str)
    return parser.parse_known_args()

def main(FLAGS):
    def post_to_slack(msg):
        ...

    if FLAGS.status == "Failed":
        post_to_slack(FLAGS.message)
    else:
        pass

if __name__ == '__main__':
    FLAGS, unparsed = get_args()
    main(FLAGS)

This worked, because the underlying Argo workflow could somehow understand the "{{workflow.status}}" notion.

However, I'm now trying to use Vertex AI to run the pipeline, leveraging the Kubeflow Pipelines SDK v2, kfp.v2. Using the same exit-handler image as before, 'eu.gcr.io/.../send-error-msg-to-slack', I now define a yaml component file (exit_handler.yaml) instead,

name: Exit handler
description: Prints to Slack if any step of the pipeline fails

inputs:
  - {name: message, type: String}
  - {name: status, type: String}

implementation:
  container:
    image: eu.gcr.io/.../send-error-msg-to-slack
    command: [
      python3,
      main.py,
      --message, {inputValue: message},
      --status, {inputValue: status}
    ]

The pipeline code now looks like this instead,

from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, ExitHandler
from kfp.components import load_component_from_file

@pipeline(name="Basic-pipeline",
          pipeline_root='gs://.../basic-pipeline')
def pipeline():
    exit_handler_spec = load_component_from_file('./exit_handler.yaml')
    exit_handler = exit_handler_spec(
        message="Basic pipeline failed.",
        status="{{workflow.status}}"
    )
    with ExitHandler(exit_handler):
        step_0_spec = load_component_from_file('./comp_0.yaml')
        step0 = step_0_spec(...)

        step_1_spec = load_component_from_file('./comp_1.yaml')
        step1 = step_1_spec(...) \
            .after(step0)

if __name__ == '__main__':
    compiler.Compiler().compile(
        pipeline_func=pipeline,
        package_path="basic-pipeline.json"
    )
    from google.oauth2 import service_account
    credentials = service_account.Credentials.from_service_account_file("./my-key.json")
    aiplatform.init(project='bsg-personalization',
                    location='europe-west4',
                    credentials=credentials)

    job = pipeline_jobs.PipelineJob(
        display_name="basic-pipeline",
        template_path="basic-pipeline.json",
        parameter_values={...}
    )
    job.run()

This "works" (no exceptions) to compile and run, but the ExitHandler code interprets the status as a string with value {{workflow.status}}, which is also indicated by the compiled pipeline json generated from the code above (basic-pipeline.json), which you can see below ("stringValue": "{{workflow.status}}"):

...
         "exit-handler": {
            "componentRef": {
              "name": "comp-exit-handler"
            },
            "dependentTasks": [
              "exit-handler-1"
            ],
            "inputs": {
              "parameters": {
                "message": {
                  "runtimeValue": {
                    "constantValue": {
                      "stringValue": "Basic pipeline failed."
                    }
                  }
                },
                "status": {
                  "runtimeValue": {
                    "constantValue": {
                      "stringValue": "{{workflow.status}}"
                    }
                  }
                }
              }
            },
            "taskInfo": {
              "name": "exit-handler"
            },
            "triggerPolicy": {
              "strategy": "ALL_UPSTREAM_TASKS_COMPLETED"
            }
          }
...

Any idea of how I can refactor my old ExitHandler code using v1 to the new SDK v2, to make the exit handler understand if the status of my pipeline is failed or not?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

-黛色若梦 2025-01-30 13:06:48

这可能尚未完全记录,但是在V2中,我们引入了一个不同的变量 pipelinetaskFinalStatus ,可以自动填充您将其发送到您的Slack频道。

这是官方DOC htttps: //cloud.google.com/vertex-ai/docs/pipelines/email-notifications#sending_a_notification_from_a_pipeline

这是相应的电子邮件通知组件

您可以使用以下参数编写自己的组件,在Exit Handler运行时会自动填充。

inputs:
...
  - name: pipeline_task_final_status
    type: PipelineTaskFinalStatus

(请注意,此功能目前在Kubeflow Pipelines开源分布中尚不可用,并且可以在KFP V2中使用。它仅在Vertex Pipelines发行中提供)

This is probably not yet fully documented but in V2 we introduced a different variable PipelineTaskFinalStatus that can automatically populated for you to send it to your Slack channel.

Here is an example of the exit handler in the official doc https://cloud.google.com/vertex-ai/docs/pipelines/email-notifications#sending_a_notification_from_a_pipeline

And here is the corresponding email notification component
https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/v1/vertex_notification_email/component.yaml

You can write your own component with the following parameter which would be automatically populated when exit handler runs.

inputs:
...
  - name: pipeline_task_final_status
    type: PipelineTaskFinalStatus

(Note this feature is currently not available in Kubeflow Pipelines open source distribution yet and will be available in KFP V2. It's only available in Vertex Pipelines distribution)

污味仙女 2025-01-30 13:06:48

的更换“ {{WorkFlow.status}}” 在KFP SDK V2中是特殊类型注释 pipelinetaskfinalstatus 如上所述。

它的用法记录在

Replacement of "{{workflow.status}}" in KFP SDK v2 is the special type annotation PipelineTaskFinalStatus as IronPan mentioned above.

Its usage is documented in https://www.kubeflow.org/docs/components/pipelines/v2/author-a-pipeline/pipelines/#dslexithandler

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文