是否有一种实现方法可以在管道外使用 kubeflow 管道的输出?

发布于 2025-01-12 16:24:31 字数 1700 浏览 4 评论 0原文

我正在使用本地 kubeflow 管道来构建连续机器学习测试项目。我有一个使用 TFX 预处理数据的管道,它自动将输出保存到 minio。在此管道之外,我想使用 tfx 的 Trainer 来训练模型,但我需要在预处理管道中生成的工件。是否有导入此输出的实施方法?我查看了文档和一些问题,但找不到答案。而且因为我试图连续进行,所以我不能依赖手动进行。

我的预处理管道示例:


    @kfp.dsl.pipeline(
      name='TFX',
      description='TFX pipeline'
    )
    def tfx_pipeline():
    
        # DL with wget, can use gcs instead as well
        fetch = kfp.dsl.ContainerOp(
          name='download',
          image='busybox',
          command=['sh', '-c'],
          arguments=[
              'sleep 1;'
              'mkdir -p /tmp/data;'
              'wget <gcp link> -O /tmp/data/results.csv'],
          file_outputs={'downloaded': '/tmp/data'})
        records_example = tfx_csv_gen(input_base=fetch.output)
        stats = tfx_statistic_gen(input_data=records_example.output)
        schema_op = tfx_schema_gen(stats.output)
        tfx_example_validator(stats=stats.outputs['output'], schema=schema_op.outputs['output'])
        #tag::tft[]
        transformed_output = tfx_transform(
            input_data=records_example.output,
            schema=schema_op.outputs['output'],
            module_file=module_file) # Path to your TFT code on GCS/S3
        #end::tft[]

然后执行


    kfp.compiler.Compiler().compile(tfx_pipeline, 'tfx_pipeline.zip')


    client = kfp.Client()
    client.list_experiments()
    #exp = client.create_experiment(name='mdupdate')


    my_experiment = client.create_experiment(name='tfx_pipeline')
    my_run = client.run_pipeline(my_experiment.id, 'tfx', 
      'tfx_pipeline.zip')

我正在 Visual Studio 代码中处理 .ipynb

I'm using local kubeflow pipelines for building a continuous machine learning test project. I have one pipeline that preprocess the data using TFX, and it saves the outputs automatically to minio. Outside of this pipeline, I want to train the model using tfx's Trainer, but I need the artifacts generated in the preprocessing pipeline. Is there an implemented way to import this outputs? I've looked through the documentation and some issues, but can't find an answer. And because I'm trying to do it continuous, I can't rely on doing it manually.

Example of my preprocessing pipeline:


    @kfp.dsl.pipeline(
      name='TFX',
      description='TFX pipeline'
    )
    def tfx_pipeline():
    
        # DL with wget, can use gcs instead as well
        fetch = kfp.dsl.ContainerOp(
          name='download',
          image='busybox',
          command=['sh', '-c'],
          arguments=[
              'sleep 1;'
              'mkdir -p /tmp/data;'
              'wget <gcp link> -O /tmp/data/results.csv'],
          file_outputs={'downloaded': '/tmp/data'})
        records_example = tfx_csv_gen(input_base=fetch.output)
        stats = tfx_statistic_gen(input_data=records_example.output)
        schema_op = tfx_schema_gen(stats.output)
        tfx_example_validator(stats=stats.outputs['output'], schema=schema_op.outputs['output'])
        #tag::tft[]
        transformed_output = tfx_transform(
            input_data=records_example.output,
            schema=schema_op.outputs['output'],
            module_file=module_file) # Path to your TFT code on GCS/S3
        #end::tft[]

and then executing with


    kfp.compiler.Compiler().compile(tfx_pipeline, 'tfx_pipeline.zip')


    client = kfp.Client()
    client.list_experiments()
    #exp = client.create_experiment(name='mdupdate')


    my_experiment = client.create_experiment(name='tfx_pipeline')
    my_run = client.run_pipeline(my_experiment.id, 'tfx', 
      'tfx_pipeline.zip')

I'm working on a .ipynb in visual studio code

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

Spring初心 2025-01-19 16:24:31

您可以这样获取该信息:
https://github.com/kubeflow/pipelines/issues/4327#issuecomment- 687255001

component_name:可以在管道的 yaml 定义中的 templates.name 下进行检查(搜索包含所需输出的组件)

artifact_name:这也可以在管道的 yaml 定义中检查,在 outputs 属性上的同一组件下

一旦获得这两个参数,您就可以使用上述 url 中描述的函数:

#!/usr/bin/env python3

import json
import tarfile
from base64 import b64decode
from io import BytesIO

import kfp


def get_node_id(*, run_id: str, component_name: str, client: kfp.Client):
    run = client.runs.get_run(run_id)
    workflow = json.loads(run.pipeline_runtime.workflow_manifest)
    nodes = workflow["status"]["nodes"]
    for node_id, node_info in nodes.items():
        if node_info["displayName"] == component_name:
            return node_id
    else:
        raise RuntimeError(f"Unable to find node_id for Component '{component_name}'")


def get_artifact(*, run_id: str, node_id: str, artifact_name: str, client: kfp.Client):
    artifact = client.runs.read_artifact(run_id, node_id, artifact_name)
    # Artifacts are returned as base64-encoded .tar.gz strings
    data = b64decode(artifact.data)
    io_buffer = BytesIO()
    io_buffer.write(data)
    io_buffer.seek(0)
    data = None
    with tarfile.open(fileobj=io_buffer) as tar:
        member_names = tar.getnames()
        if len(member_names) == 1:
            data = tar.extractfile(member_names[0]).read().decode('utf-8')
        else:
            # Is it possible for KFP artifacts to have multiple members?
            data = {}
            for member_name in member_names:
                data[member_name] = tar.extractfile(member_name).read().decode('utf-8')
    return data


if __name__ == "__main__":
    run_id = "e498b0da-036e-4e81-84e9-6e9c6e64960b"
    component_name = "my-component"
    # For an output variable named "output_data"
    artifact_name = "my-component-output_data"

    client = kfp.Client()
    node_id = get_node_id(run_id=run_id, component_name=component_name, client=client)
    artifact = get_artifact(
        run_id=run_id, node_id=node_id, artifact_name=artifact_name, client=client,
    )
    # Do something with artifact ...

You can get that information like this:
https://github.com/kubeflow/pipelines/issues/4327#issuecomment-687255001

component_name: This can be checked in the yaml definition of the pipeline, under templates.name (search for the component containing the output you want)

artifact_name: This can also be checked in the yaml definition of the pipeline, under that same component on the outputs attribute

Once you got these two parameters, you can use the functions as described in the above url:

#!/usr/bin/env python3

import json
import tarfile
from base64 import b64decode
from io import BytesIO

import kfp


def get_node_id(*, run_id: str, component_name: str, client: kfp.Client):
    run = client.runs.get_run(run_id)
    workflow = json.loads(run.pipeline_runtime.workflow_manifest)
    nodes = workflow["status"]["nodes"]
    for node_id, node_info in nodes.items():
        if node_info["displayName"] == component_name:
            return node_id
    else:
        raise RuntimeError(f"Unable to find node_id for Component '{component_name}'")


def get_artifact(*, run_id: str, node_id: str, artifact_name: str, client: kfp.Client):
    artifact = client.runs.read_artifact(run_id, node_id, artifact_name)
    # Artifacts are returned as base64-encoded .tar.gz strings
    data = b64decode(artifact.data)
    io_buffer = BytesIO()
    io_buffer.write(data)
    io_buffer.seek(0)
    data = None
    with tarfile.open(fileobj=io_buffer) as tar:
        member_names = tar.getnames()
        if len(member_names) == 1:
            data = tar.extractfile(member_names[0]).read().decode('utf-8')
        else:
            # Is it possible for KFP artifacts to have multiple members?
            data = {}
            for member_name in member_names:
                data[member_name] = tar.extractfile(member_name).read().decode('utf-8')
    return data


if __name__ == "__main__":
    run_id = "e498b0da-036e-4e81-84e9-6e9c6e64960b"
    component_name = "my-component"
    # For an output variable named "output_data"
    artifact_name = "my-component-output_data"

    client = kfp.Client()
    node_id = get_node_id(run_id=run_id, component_name=component_name, client=client)
    artifact = get_artifact(
        run_id=run_id, node_id=node_id, artifact_name=artifact_name, client=client,
    )
    # Do something with artifact ...
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文