在顶点AI管道中读取数据
这是我第一次使用Google的顶点AI管道。我检查了 this codelab =“ https://towardsdatascience.com/how-to-set-up-custom-vertex-ai-pipelines-step-by-467487487f81cad” rel =“ nofollow noreferrer”> href =“ https://medium.com/google-cloud/google-vere-vertex-ai-the-the-aseest-way-to-run-ml-pipelines-3a41c5ed153” rel =“ nofollow noreferrer”>这篇文章 ,除了一些链接外,还从官方文档< /a>。我决定将所有这些知识运行起来,在一些玩具示例中:我打算构建一个由2个组件组成的管道:“ get-data”(读取一些存储在云存储中的.csv文件)和“ revory-data” (这基本上返回了上一个组件中读取的.csv数据的形状)。此外,我很谨慎地包括提供了一些建议在这个论坛中。我目前拥有的代码如下:
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from google.cloud import aiplatform
# Components section
@component(
packages_to_install=[
"google-cloud-storage",
"pandas",
],
base_image="python:3.9",
output_component_file="get_data.yaml"
)
def get_data(
bucket: str,
url: str,
dataset: Output[Dataset],
):
import pandas as pd
from google.cloud import storage
storage_client = storage.Client("my-project")
bucket = storage_client.get_bucket(bucket)
blob = bucket.blob(url)
blob.download_to_filename('localdf.csv')
# path = "gs://my-bucket/program_grouping_data.zip"
df = pd.read_csv('localdf.csv', compression='zip')
df['new_skills'] = df['new_skills'].apply(ast.literal_eval)
df.to_csv(dataset.path + ".csv" , index=False, encoding='utf-8-sig')
@component(
packages_to_install=["pandas"],
base_image="python:3.9",
output_component_file="report_data.yaml"
)
def report_data(
inputd: Input[Dataset],
):
import pandas as pd
df = pd.read_csv(inputd.path)
return df.shape
# Pipeline section
@pipeline(
# Default pipeline root. You can override it when submitting the pipeline.
pipeline_root=PIPELINE_ROOT,
# A name for the pipeline.
name="my-pipeline",
)
def my_pipeline(
url: str = "test_vertex/pipeline_root/program_grouping_data.zip",
bucket: str = "my-bucket"
):
dataset_task = get_data(bucket, url)
dimensions = report_data(
dataset_task.output
)
# Compilation section
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path="pipeline_job.json"
)
# Running and submitting job
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
run1 = aiplatform.PipelineJob(
display_name="my-pipeline",
template_path="pipeline_job.json",
job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
parameter_values={"url": "test_vertex/pipeline_root/program_grouping_data.zip", "bucket": "my-bucket"},
enable_caching=True,
)
run1.submit()
我很高兴看到管道没有错误并设法提交工作。然而,“我的幸福持续了很短”,就像我去Vertex ai管道时一样,我偶然发现了一些“错误”,就像:
DAG失败了,因为某些任务失败了。失败的任务是:[get-data]。 job(project_id = my-project,job_id = 4290278978419163136)由于上述错误而失败。无法处理工作:{project_number = xxxxxxxx,job_id = 42902789784191636}
我找不到网络上的任何相关信息简单的例子,仍然在躲避我。
很明显,我没有误入什么或在哪里。有建议吗?
This is my first time using Google's Vertex AI Pipelines. I checked this codelab as well as this post and this post, on top of some links derived from the official documentation. I decided to put all that knowledge to work, in some toy example: I was planning to build a pipeline consisting of 2 components: "get-data" (which reads some .csv file stored in Cloud Storage) and "report-data" (which basically returns the shape of the .csv data read in the previous component). Furthermore, I was cautious to include some suggestions provided in this forum. The code I currently have, goes as follows:
from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from google.cloud import aiplatform
# Components section
@component(
packages_to_install=[
"google-cloud-storage",
"pandas",
],
base_image="python:3.9",
output_component_file="get_data.yaml"
)
def get_data(
bucket: str,
url: str,
dataset: Output[Dataset],
):
import pandas as pd
from google.cloud import storage
storage_client = storage.Client("my-project")
bucket = storage_client.get_bucket(bucket)
blob = bucket.blob(url)
blob.download_to_filename('localdf.csv')
# path = "gs://my-bucket/program_grouping_data.zip"
df = pd.read_csv('localdf.csv', compression='zip')
df['new_skills'] = df['new_skills'].apply(ast.literal_eval)
df.to_csv(dataset.path + ".csv" , index=False, encoding='utf-8-sig')
@component(
packages_to_install=["pandas"],
base_image="python:3.9",
output_component_file="report_data.yaml"
)
def report_data(
inputd: Input[Dataset],
):
import pandas as pd
df = pd.read_csv(inputd.path)
return df.shape
# Pipeline section
@pipeline(
# Default pipeline root. You can override it when submitting the pipeline.
pipeline_root=PIPELINE_ROOT,
# A name for the pipeline.
name="my-pipeline",
)
def my_pipeline(
url: str = "test_vertex/pipeline_root/program_grouping_data.zip",
bucket: str = "my-bucket"
):
dataset_task = get_data(bucket, url)
dimensions = report_data(
dataset_task.output
)
# Compilation section
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path="pipeline_job.json"
)
# Running and submitting job
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
run1 = aiplatform.PipelineJob(
display_name="my-pipeline",
template_path="pipeline_job.json",
job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
parameter_values={"url": "test_vertex/pipeline_root/program_grouping_data.zip", "bucket": "my-bucket"},
enable_caching=True,
)
run1.submit()
I was happy to see that the pipeline compiled with no errors, and managed to submit the job. However "my happiness lasted short", as when I went to Vertex AI Pipelines, I stumbled upon some "error", which goes like:
The DAG failed because some tasks failed. The failed tasks are: [get-data].; Job (project_id = my-project, job_id = 4290278978419163136) is failed due to the above error.; Failed to handle the job: {project_number = xxxxxxxx, job_id = 4290278978419163136}
I did not find any related info on the web, neither could I find any log or something similar, and I feel a bit overwhelmed that the solution to this (seemingly) easy example, is still eluding me.
Quite obviously, I don't what or where I am mistaking. Any suggestion?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
在评论中提供了一些建议,我认为我设法使演示管道起作用。我将首先包含更新的代码:
现在,我将添加一些补充注释,总而言之,这些注释可以解决我的问题:
INPUT [DATASET]
或ouput [dataset]
时,需要添加文件扩展名(并且很容易忘记)。以get_data
组件的OUPUT为例,并注意通过专门添加文件扩展名(即dataSet.path.path.path +“ .csv”
)注意数据的记录。当然,这是一个非常小的例子,项目可以轻松地扩展到庞大的项目,但是由于某种“ Hello Vertex AI管道”的运作良好。
谢谢。
With some suggestions provided in the comments, I think I managed to make my demo pipeline work. I will first include the updated code:
Now, I will add some complementary comments, which in sum, managed to solve my problem:
Input[Dataset]
orOuput[Dataset]
, adding the file extension is needed (and quite easy to forget). Take for instance the ouput of theget_data
component, and notice how the data is recorded by specifically adding the file extension, i.e.dataset.path + ".csv"
.Of course, this is a very tiny example, and projects can easily scale to huge projects, however as some sort of "Hello Vertex AI Pipelines" it will work well.
Thank you.
感谢您的文章。非常有帮助!我有同样的错误,但事实证明是出于不同的原因,所以在这里注意...
在我的管道定义步骤中,我有以下参数...
''
''
我的错误是当我运行管道时,我没有输入这些相同的参数。以下是固定版本...
'''
'''
Thanks for your writeup. Very helpful! I had the same error, but it turned out to be for a different reasons, so noting it here...
In my pipeline definition step I have the following parameters...
'''
'''
My error was when I run the pipeline, I did not have these same parameters entered. Below is the fixed version...
'''
'''