如何将Exithandler与KubeFlow Pipelines SDK V2一起使用
我正在尝试将所有kubeflow管道从使用以前的SDK V1( kfp
),转移到新的 pipelines sdk v2 ( kfp.v2
)。我正在使用版本 1.8.12
。此重构已证明几乎所有代码都成功了,除了仍然存在的 exithandler
; 来自KFP.V2.DSL导入Exithandler
。似乎是将管道对象编译为 tar.gz
-file的一种方法,使用 kfp.compiler.compiler.compiler()。compile(pipeline,'basic_pipeline.tar.gz')< /代码>文件保留某种类型的Argo占位符,而新的
.json
使用 compiler.compiler()。compile(pipeline_func = pipeline,package_path,package_path =“ basic-pipeline.json”)
无法以相同的方式工作。下面,我将详细介绍管道SDK V1中的功能以及如何在V2中实现它。
以前,使用KubeFlow Pipelines V1,我可以使用Exithandler,如当其中一个管道组件失败时,将消息发送给Slack。我将管道定义为
import kfp.dsl as dsl
@dsl.pipeline(
name='Basic-pipeline'
)
def pipeline(...):
exit_task = dsl.ContainerOp(
name='Exit handler that catches errors and post them in Slack',
image='eu.gcr.io/.../send-error-msg-to-slack',
arguments=[
'python3', 'main.py',
'--message', 'Basic-pipeline failed'
'--status', "{{workflow.status}}"
]
)
with dsl.ExitHandler(exit_task):
step_1 = dsl.ContainerOp(...)
step_2 = dsl.ContainerOp(...) \
.after(step_1)
if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline, 'basic_pipeline.tar.gz')
exit_task
如果管道的任何步骤失败,则将 Message
发送给我们的懈怠。 exit_task
图像的代码看起来像是
import argparse
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument('--message', type=str)
parser.add_argument('--status', type=str)
return parser.parse_known_args()
def main(FLAGS):
def post_to_slack(msg):
...
if FLAGS.status == "Failed":
post_to_slack(FLAGS.message)
else:
pass
if __name__ == '__main__':
FLAGS, unparsed = get_args()
main(FLAGS)
有效的,因为基础的Argo Workflow可以某种方式理解“ {{workflow.status}}”
notion。
但是,,我现在试图使用顶点AI运行管道,利用KubeFlow Pipelines SDK V2, kfp.v2
。使用与以前相同的退出手机映像,'eu.gcr.io /.../ send-error-msg-to-slack'
,我现在定义一个yaml组件文件(> exit_handler.yaml
)相反,
name: Exit handler
description: Prints to Slack if any step of the pipeline fails
inputs:
- {name: message, type: String}
- {name: status, type: String}
implementation:
container:
image: eu.gcr.io/.../send-error-msg-to-slack
command: [
python3,
main.py,
--message, {inputValue: message},
--status, {inputValue: status}
]
管道代码现在看起来像这样,
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, ExitHandler
from kfp.components import load_component_from_file
@pipeline(name="Basic-pipeline",
pipeline_root='gs://.../basic-pipeline')
def pipeline():
exit_handler_spec = load_component_from_file('./exit_handler.yaml')
exit_handler = exit_handler_spec(
message="Basic pipeline failed.",
status="{{workflow.status}}"
)
with ExitHandler(exit_handler):
step_0_spec = load_component_from_file('./comp_0.yaml')
step0 = step_0_spec(...)
step_1_spec = load_component_from_file('./comp_1.yaml')
step1 = step_1_spec(...) \
.after(step0)
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=pipeline,
package_path="basic-pipeline.json"
)
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file("./my-key.json")
aiplatform.init(project='bsg-personalization',
location='europe-west4',
credentials=credentials)
job = pipeline_jobs.PipelineJob(
display_name="basic-pipeline",
template_path="basic-pipeline.json",
parameter_values={...}
)
job.run()
此“工作”(不例外)要编译和运行,但是Exithandler代码将 status status
将其解释为带有的字符串值{{workflow.status}},这也由从上面的代码生成的编译管道JSON表示( basic-pipeline.json
),您可以在下面看到('String>“ stringvalue” :“ {{workflow.status}}”
):
...
"exit-handler": {
"componentRef": {
"name": "comp-exit-handler"
},
"dependentTasks": [
"exit-handler-1"
],
"inputs": {
"parameters": {
"message": {
"runtimeValue": {
"constantValue": {
"stringValue": "Basic pipeline failed."
}
}
},
"status": {
"runtimeValue": {
"constantValue": {
"stringValue": "{{workflow.status}}"
}
}
}
}
},
"taskInfo": {
"name": "exit-handler"
},
"triggerPolicy": {
"strategy": "ALL_UPSTREAM_TASKS_COMPLETED"
}
}
...
任何想法我如何使用v1使用v1对新的SDK V2重构我的旧 exithandler
代码>如果我的管道状态失败了,请让出口处理程序理解?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
这可能尚未完全记录,但是在V2中,我们引入了一个不同的变量
pipelinetaskFinalStatus
,可以自动填充您将其发送到您的Slack频道。这是官方DOC htttps: //cloud.google.com/vertex-ai/docs/pipelines/email-notifications#sending_a_notification_from_a_pipeline
这是相应的电子邮件通知组件
您可以使用以下参数编写自己的组件,在Exit Handler运行时会自动填充。
(请注意,此功能目前在Kubeflow Pipelines开源分布中尚不可用,并且可以在KFP V2中使用。它仅在Vertex Pipelines发行中提供)
This is probably not yet fully documented but in V2 we introduced a different variable
PipelineTaskFinalStatus
that can automatically populated for you to send it to your Slack channel.Here is an example of the exit handler in the official doc https://cloud.google.com/vertex-ai/docs/pipelines/email-notifications#sending_a_notification_from_a_pipeline
And here is the corresponding email notification component
https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/v1/vertex_notification_email/component.yaml
You can write your own component with the following parameter which would be automatically populated when exit handler runs.
(Note this feature is currently not available in Kubeflow Pipelines open source distribution yet and will be available in KFP V2. It's only available in Vertex Pipelines distribution)
的更换“ {{WorkFlow.status}}”
在KFP SDK V2中是特殊类型注释pipelinetaskfinalstatus
如上所述。它的用法记录在
Replacement of
"{{workflow.status}}"
in KFP SDK v2 is the special type annotationPipelineTaskFinalStatus
as IronPan mentioned above.Its usage is documented in https://www.kubeflow.org/docs/components/pipelines/v2/author-a-pipeline/pipelines/#dslexithandler