是否有一种实现方法可以在管道外使用 kubeflow 管道的输出?
我正在使用本地 kubeflow 管道来构建连续机器学习测试项目。我有一个使用 TFX 预处理数据的管道,它自动将输出保存到 minio。在此管道之外,我想使用 tfx 的 Trainer 来训练模型,但我需要在预处理管道中生成的工件。是否有导入此输出的实施方法?我查看了文档和一些问题,但找不到答案。而且因为我试图连续进行,所以我不能依赖手动进行。
我的预处理管道示例:
@kfp.dsl.pipeline(
name='TFX',
description='TFX pipeline'
)
def tfx_pipeline():
# DL with wget, can use gcs instead as well
fetch = kfp.dsl.ContainerOp(
name='download',
image='busybox',
command=['sh', '-c'],
arguments=[
'sleep 1;'
'mkdir -p /tmp/data;'
'wget <gcp link> -O /tmp/data/results.csv'],
file_outputs={'downloaded': '/tmp/data'})
records_example = tfx_csv_gen(input_base=fetch.output)
stats = tfx_statistic_gen(input_data=records_example.output)
schema_op = tfx_schema_gen(stats.output)
tfx_example_validator(stats=stats.outputs['output'], schema=schema_op.outputs['output'])
#tag::tft[]
transformed_output = tfx_transform(
input_data=records_example.output,
schema=schema_op.outputs['output'],
module_file=module_file) # Path to your TFT code on GCS/S3
#end::tft[]
然后执行
kfp.compiler.Compiler().compile(tfx_pipeline, 'tfx_pipeline.zip')
client = kfp.Client()
client.list_experiments()
#exp = client.create_experiment(name='mdupdate')
my_experiment = client.create_experiment(name='tfx_pipeline')
my_run = client.run_pipeline(my_experiment.id, 'tfx',
'tfx_pipeline.zip')
我正在 Visual Studio 代码中处理 .ipynb
I'm using local kubeflow pipelines for building a continuous machine learning test project. I have one pipeline that preprocess the data using TFX, and it saves the outputs automatically to minio. Outside of this pipeline, I want to train the model using tfx's Trainer, but I need the artifacts generated in the preprocessing pipeline. Is there an implemented way to import this outputs? I've looked through the documentation and some issues, but can't find an answer. And because I'm trying to do it continuous, I can't rely on doing it manually.
Example of my preprocessing pipeline:
@kfp.dsl.pipeline(
name='TFX',
description='TFX pipeline'
)
def tfx_pipeline():
# DL with wget, can use gcs instead as well
fetch = kfp.dsl.ContainerOp(
name='download',
image='busybox',
command=['sh', '-c'],
arguments=[
'sleep 1;'
'mkdir -p /tmp/data;'
'wget <gcp link> -O /tmp/data/results.csv'],
file_outputs={'downloaded': '/tmp/data'})
records_example = tfx_csv_gen(input_base=fetch.output)
stats = tfx_statistic_gen(input_data=records_example.output)
schema_op = tfx_schema_gen(stats.output)
tfx_example_validator(stats=stats.outputs['output'], schema=schema_op.outputs['output'])
#tag::tft[]
transformed_output = tfx_transform(
input_data=records_example.output,
schema=schema_op.outputs['output'],
module_file=module_file) # Path to your TFT code on GCS/S3
#end::tft[]
and then executing with
kfp.compiler.Compiler().compile(tfx_pipeline, 'tfx_pipeline.zip')
client = kfp.Client()
client.list_experiments()
#exp = client.create_experiment(name='mdupdate')
my_experiment = client.create_experiment(name='tfx_pipeline')
my_run = client.run_pipeline(my_experiment.id, 'tfx',
'tfx_pipeline.zip')
I'm working on a .ipynb in visual studio code
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您可以这样获取该信息:
https://github.com/kubeflow/pipelines/issues/4327#issuecomment- 687255001
component_name:可以在管道的 yaml 定义中的
templates.name
下进行检查(搜索包含所需输出的组件)artifact_name:这也可以在管道的 yaml 定义中检查,在
outputs
属性上的同一组件下一旦获得这两个参数,您就可以使用上述 url 中描述的函数:
You can get that information like this:
https://github.com/kubeflow/pipelines/issues/4327#issuecomment-687255001
component_name: This can be checked in the yaml definition of the pipeline, under
templates.name
(search for the component containing the output you want)artifact_name: This can also be checked in the yaml definition of the pipeline, under that same component on the
outputs
attributeOnce you got these two parameters, you can use the functions as described in the above url: