以编程方式读取 kubeflow 管道的输出

发布于 2025-01-15 02:26:52 字数 480 浏览 2 评论 0原文

我使用 python 命令在 Kubeflow 上运行管道,例如:

client.create_run_from_pipeline_func(pipeline_function, arguments=params_dict[name], run_name=name)

它在 Kubeflow 管道上创建一个作业,我希望能够使用 python API 访问有关管道不同步骤的信息。

job.get({step_name}).get_custom_properties({property_name})

我可以通过在 Kubeflow 中打开运行并查看我感兴趣的管道步骤的自定义属性来做到这一点,但我想自动化此过程。你知道是否可以使用 python API 来做到这一点吗?

I run pipelines on Kubeflow with a python command like:

client.create_run_from_pipeline_func(pipeline_function, arguments=params_dict[name], run_name=name)

It creates a job on Kubeflow pipelines and I would like to be able to access the information about the different steps of the pipeline with a python API.

job.get({step_name}).get_custom_properties({property_name})

I can do that by opening the run in Kubeflow and looking at the custom properties of the step of the pipeline I am interested into but I would like to automate this process. Do you know if it is possible to do that with a python API?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

人生戏 2025-01-22 02:26:52

我使用此类从 kubeflow 运行中提取参数:

import json
from typing import Dict
from typing import List

from kfp_server_api.models.api_run_detail import ApiRunDetail


class PipelineResult:

    DATASET_PATH_NAME = "data-load-features-DATA_SET_PATH"

    def __init__(self, run_description: ApiRunDetail):
        self._run_description = run_description

    @property
    def workflow_manifest(self) -> dict:
        return json.loads(self._run_description.pipeline_runtime.workflow_manifest)

    @property
    def status(self) -> str:
        return self.workflow_manifest["status"]["phase"]

    @property
    def params(self) -> List[Dict]:
        params_list = []
        for k, v in self.workflow_manifest["status"]["nodes"].items():

            for params in v.get("inputs", {}).get("parameters", []):
                params_list.append({"node_name": k, **params})
        return params_list

    def get_param(self, param_name: str):
        for el in self.params:
            if el["name"] == param_name:
                return el["value"]

    @property
    def training_set_path(self):
        return self.get_param(self.DATASET_PATH_NAME)

    @property
    def run_name(self):
        return self.workflow_manifest["metadata"]["annotations"]["pipelines.kubeflow.org/run_name"]

    def as_dict(self):
        return {
            "status": self.status,
            "training_set_path": self.training_set_path,
            "run_name": self.run_name,
        }


client = kfp.Client()
api_response = client.list_runs(namespace='...', sort_by='created_at desc', page_size=30)
runs_descriptions = [client.get_run(run.id) for run in api_response.runs]
runs = pd.DataFrame([PipelineResult(el).as_dict() for el in runs_descriptions])

I used this class to extract the parameters from a kubeflow run:

import json
from typing import Dict
from typing import List

from kfp_server_api.models.api_run_detail import ApiRunDetail


class PipelineResult:

    DATASET_PATH_NAME = "data-load-features-DATA_SET_PATH"

    def __init__(self, run_description: ApiRunDetail):
        self._run_description = run_description

    @property
    def workflow_manifest(self) -> dict:
        return json.loads(self._run_description.pipeline_runtime.workflow_manifest)

    @property
    def status(self) -> str:
        return self.workflow_manifest["status"]["phase"]

    @property
    def params(self) -> List[Dict]:
        params_list = []
        for k, v in self.workflow_manifest["status"]["nodes"].items():

            for params in v.get("inputs", {}).get("parameters", []):
                params_list.append({"node_name": k, **params})
        return params_list

    def get_param(self, param_name: str):
        for el in self.params:
            if el["name"] == param_name:
                return el["value"]

    @property
    def training_set_path(self):
        return self.get_param(self.DATASET_PATH_NAME)

    @property
    def run_name(self):
        return self.workflow_manifest["metadata"]["annotations"]["pipelines.kubeflow.org/run_name"]

    def as_dict(self):
        return {
            "status": self.status,
            "training_set_path": self.training_set_path,
            "run_name": self.run_name,
        }


client = kfp.Client()
api_response = client.list_runs(namespace='...', sort_by='created_at desc', page_size=30)
runs_descriptions = [client.get_run(run.id) for run in api_response.runs]
runs = pd.DataFrame([PipelineResult(el).as_dict() for el in runs_descriptions])
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文