在顶点AI管道中读取数据

发布于 2025-01-23 04:52:34 字数 3449 浏览 4 评论 0原文

这是我第一次使用Google的顶点AI管道。我检查了 this codelab =“ https://towardsdatascience.com/how-to-set-up-custom-vertex-ai-pipelines-step-by-467487487f81cad” rel =“ nofollow noreferrer”> href =“ https://medium.com/google-cloud/google-vere-vertex-ai-the-the-aseest-way-to-run-ml-pipelines-3a41c5ed153” rel =“ nofollow noreferrer”>这篇文章 ,除了一些链接外,还从官方文档< /a>。我决定将所有这些知识运行起来,在一些玩具示例中:我打算构建一个由2个组件组成的管道:“ get-data”(读取一些存储在云存储中的.csv文件)和“ revory-data” (这基本上返回了上一个组件中读取的.csv数据的形状)。此外,我很谨慎地包括提供了一些建议在这个论坛中。我目前拥有的代码如下:


from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from google.cloud import aiplatform

# Components section   

@component(
    packages_to_install=[
        "google-cloud-storage",
        "pandas",
    ],
    base_image="python:3.9",
    output_component_file="get_data.yaml"
)
def get_data(
    bucket: str,
    url: str,
    dataset: Output[Dataset],
):
    import pandas as pd
    from google.cloud import storage
    
    storage_client = storage.Client("my-project")
    bucket = storage_client.get_bucket(bucket)
    blob = bucket.blob(url)
    blob.download_to_filename('localdf.csv')
    
    # path = "gs://my-bucket/program_grouping_data.zip"
    df = pd.read_csv('localdf.csv', compression='zip')
    df['new_skills'] = df['new_skills'].apply(ast.literal_eval)
    df.to_csv(dataset.path + ".csv" , index=False, encoding='utf-8-sig')


@component(
    packages_to_install=["pandas"],
    base_image="python:3.9",
    output_component_file="report_data.yaml"
)
def report_data(
    inputd: Input[Dataset],
):
    import pandas as pd
    df = pd.read_csv(inputd.path)
    return df.shape


# Pipeline section

@pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline.
    name="my-pipeline",
)
def my_pipeline(
    url: str = "test_vertex/pipeline_root/program_grouping_data.zip",
    bucket: str = "my-bucket"
):
    dataset_task = get_data(bucket, url)

    dimensions = report_data(
        dataset_task.output
    )

# Compilation section

compiler.Compiler().compile(
    pipeline_func=my_pipeline, package_path="pipeline_job.json"
)

# Running and submitting job

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

run1 = aiplatform.PipelineJob(
    display_name="my-pipeline",
    template_path="pipeline_job.json",
    job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
    parameter_values={"url": "test_vertex/pipeline_root/program_grouping_data.zip", "bucket": "my-bucket"},
    enable_caching=True,
)

run1.submit()

我很高兴看到管道没有错误并设法提交工作。然而,“我的幸福持续了很短”,就像我去Vertex ai管道时一样,我偶然发现了一些“错误”,就像:

DAG失败了,因为某些任务失败了。失败的任务是:[get-data]。 job(project_id = my-project,job_id = 4290278978419163136)由于上述错误而失败。无法处理工作:{project_number = xxxxxxxx,job_id = 42902789784191636}

我找不到网络上的任何相关信息简单的例子,仍然在躲避我。

很明显,我没有误入什么或在哪里。有建议吗?

This is my first time using Google's Vertex AI Pipelines. I checked this codelab as well as this post and this post, on top of some links derived from the official documentation. I decided to put all that knowledge to work, in some toy example: I was planning to build a pipeline consisting of 2 components: "get-data" (which reads some .csv file stored in Cloud Storage) and "report-data" (which basically returns the shape of the .csv data read in the previous component). Furthermore, I was cautious to include some suggestions provided in this forum. The code I currently have, goes as follows:


from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from google.cloud import aiplatform

# Components section   

@component(
    packages_to_install=[
        "google-cloud-storage",
        "pandas",
    ],
    base_image="python:3.9",
    output_component_file="get_data.yaml"
)
def get_data(
    bucket: str,
    url: str,
    dataset: Output[Dataset],
):
    import pandas as pd
    from google.cloud import storage
    
    storage_client = storage.Client("my-project")
    bucket = storage_client.get_bucket(bucket)
    blob = bucket.blob(url)
    blob.download_to_filename('localdf.csv')
    
    # path = "gs://my-bucket/program_grouping_data.zip"
    df = pd.read_csv('localdf.csv', compression='zip')
    df['new_skills'] = df['new_skills'].apply(ast.literal_eval)
    df.to_csv(dataset.path + ".csv" , index=False, encoding='utf-8-sig')


@component(
    packages_to_install=["pandas"],
    base_image="python:3.9",
    output_component_file="report_data.yaml"
)
def report_data(
    inputd: Input[Dataset],
):
    import pandas as pd
    df = pd.read_csv(inputd.path)
    return df.shape


# Pipeline section

@pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline.
    name="my-pipeline",
)
def my_pipeline(
    url: str = "test_vertex/pipeline_root/program_grouping_data.zip",
    bucket: str = "my-bucket"
):
    dataset_task = get_data(bucket, url)

    dimensions = report_data(
        dataset_task.output
    )

# Compilation section

compiler.Compiler().compile(
    pipeline_func=my_pipeline, package_path="pipeline_job.json"
)

# Running and submitting job

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

run1 = aiplatform.PipelineJob(
    display_name="my-pipeline",
    template_path="pipeline_job.json",
    job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
    parameter_values={"url": "test_vertex/pipeline_root/program_grouping_data.zip", "bucket": "my-bucket"},
    enable_caching=True,
)

run1.submit()

I was happy to see that the pipeline compiled with no errors, and managed to submit the job. However "my happiness lasted short", as when I went to Vertex AI Pipelines, I stumbled upon some "error", which goes like:

The DAG failed because some tasks failed. The failed tasks are: [get-data].; Job (project_id = my-project, job_id = 4290278978419163136) is failed due to the above error.; Failed to handle the job: {project_number = xxxxxxxx, job_id = 4290278978419163136}

I did not find any related info on the web, neither could I find any log or something similar, and I feel a bit overwhelmed that the solution to this (seemingly) easy example, is still eluding me.

Quite obviously, I don't what or where I am mistaking. Any suggestion?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

浊酒尽余欢 2025-01-30 04:52:34

在评论中提供了一些建议,我认为我设法使演示管道起作用。我将首先包含更新的代码:

from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from datetime import datetime
from google.cloud import aiplatform
from typing import NamedTuple


# Importing 'COMPONENTS' of the 'PIPELINE'

@component(
    packages_to_install=[
        "google-cloud-storage",
        "pandas",
    ],
    base_image="python:3.9",
    output_component_file="get_data.yaml"
)
def get_data(
    bucket: str,
    url: str,
    dataset: Output[Dataset],
):
    """Reads a csv file, from some location in Cloud Storage"""
    import ast
    import pandas as pd
    from google.cloud import storage
    
    # 'Pulling' demo .csv data from a know location in GCS
    storage_client = storage.Client("my-project")
    bucket = storage_client.get_bucket(bucket)
    blob = bucket.blob(url)
    blob.download_to_filename('localdf.csv')
    
    # Reading the pulled demo .csv data
    df = pd.read_csv('localdf.csv', compression='zip')
    df['new_skills'] = df['new_skills'].apply(ast.literal_eval)
    df.to_csv(dataset.path + ".csv" , index=False, encoding='utf-8-sig')


@component(
    packages_to_install=["pandas"],
    base_image="python:3.9",
    output_component_file="report_data.yaml"
)
def report_data(
    inputd: Input[Dataset],
) -> NamedTuple("output", [("rows", int), ("columns", int)]):
    """From a passed csv file existing in Cloud Storage, returns its dimensions"""
    import pandas as pd
    
    df = pd.read_csv(inputd.path+".csv")
    
    return df.shape


# Building the 'PIPELINE'

@pipeline(
    # i.e. in my case: PIPELINE_ROOT = 'gs://my-bucket/test_vertex/pipeline_root/'
    # Can be overriden when submitting the pipeline
    pipeline_root=PIPELINE_ROOT,
    name="readcsv-pipeline",  # Your own naming for the pipeline.
)
def my_pipeline(
    url: str = "test_vertex/pipeline_root/program_grouping_data.zip",
    bucket: str = "my-bucket"
):
    dataset_task = get_data(bucket, url)

    dimensions = report_data(
        dataset_task.output
    )
    

# Compiling the 'PIPELINE'    

compiler.Compiler().compile(
    pipeline_func=my_pipeline, package_path="pipeline_job.json"
)


# Running the 'PIPELINE'

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

run1 = aiplatform.PipelineJob(
    display_name="my-pipeline",
    template_path="pipeline_job.json",
    job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
    parameter_values={
        "url": "test_vertex/pipeline_root/program_grouping_data.zip",
        "bucket": "my-bucket"
    },
    enable_caching=True,
)

# Submitting the 'PIPELINE'

run1.submit()

现在,我将添加一些补充注释,总而言之,这些注释可以解决我的问题:

  • 首先,让您的用户启用了“ logs vieper”(角色/logging.viewer),将极大地帮助您要解决管道中的任何现有错误(请注意:该角色对我有用,但是您可能想寻找自己的目的更好的匹配角色

https://i.sstatic.net/jpgfu.png“ alt =”在此处输入图像说明”>

  • 注意:在上面的图片中,当显示“日志”时,仔细检查每个人可能会有所帮助log(接近创建管道的时间),因为通常每个EOF与单个警告或错误行相对应:

”

  • 其次,我的管道的输出是一个元组。在我的原始方法中,我刚刚退还了普通的元组,但是建议返回a 命名tuple 。通常,如果您需要输入 /输出一个或多个“ 小值< / em>”(出于任何原因,int或str),请选择一个名为tuple。
  • 第三,当您的管道之间的连接为INPUT [DATASET]ouput [dataset]时,需要添加文件扩展名(并且很容易忘记)。以get_data组件的OUPUT为例,并注意通过专门添加文件扩展名(即dataSet.path.path.path +“ .csv”)注意数据的记录。

当然,这是一个非常小的例子,项目可以轻松地扩展到庞大的项目,但是由于某种“ Hello Vertex AI管道”的运作良好。

谢谢。

With some suggestions provided in the comments, I think I managed to make my demo pipeline work. I will first include the updated code:

from kfp.v2 import compiler
from kfp.v2.dsl import pipeline, component, Dataset, Input, Output
from datetime import datetime
from google.cloud import aiplatform
from typing import NamedTuple


# Importing 'COMPONENTS' of the 'PIPELINE'

@component(
    packages_to_install=[
        "google-cloud-storage",
        "pandas",
    ],
    base_image="python:3.9",
    output_component_file="get_data.yaml"
)
def get_data(
    bucket: str,
    url: str,
    dataset: Output[Dataset],
):
    """Reads a csv file, from some location in Cloud Storage"""
    import ast
    import pandas as pd
    from google.cloud import storage
    
    # 'Pulling' demo .csv data from a know location in GCS
    storage_client = storage.Client("my-project")
    bucket = storage_client.get_bucket(bucket)
    blob = bucket.blob(url)
    blob.download_to_filename('localdf.csv')
    
    # Reading the pulled demo .csv data
    df = pd.read_csv('localdf.csv', compression='zip')
    df['new_skills'] = df['new_skills'].apply(ast.literal_eval)
    df.to_csv(dataset.path + ".csv" , index=False, encoding='utf-8-sig')


@component(
    packages_to_install=["pandas"],
    base_image="python:3.9",
    output_component_file="report_data.yaml"
)
def report_data(
    inputd: Input[Dataset],
) -> NamedTuple("output", [("rows", int), ("columns", int)]):
    """From a passed csv file existing in Cloud Storage, returns its dimensions"""
    import pandas as pd
    
    df = pd.read_csv(inputd.path+".csv")
    
    return df.shape


# Building the 'PIPELINE'

@pipeline(
    # i.e. in my case: PIPELINE_ROOT = 'gs://my-bucket/test_vertex/pipeline_root/'
    # Can be overriden when submitting the pipeline
    pipeline_root=PIPELINE_ROOT,
    name="readcsv-pipeline",  # Your own naming for the pipeline.
)
def my_pipeline(
    url: str = "test_vertex/pipeline_root/program_grouping_data.zip",
    bucket: str = "my-bucket"
):
    dataset_task = get_data(bucket, url)

    dimensions = report_data(
        dataset_task.output
    )
    

# Compiling the 'PIPELINE'    

compiler.Compiler().compile(
    pipeline_func=my_pipeline, package_path="pipeline_job.json"
)


# Running the 'PIPELINE'

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

run1 = aiplatform.PipelineJob(
    display_name="my-pipeline",
    template_path="pipeline_job.json",
    job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
    parameter_values={
        "url": "test_vertex/pipeline_root/program_grouping_data.zip",
        "bucket": "my-bucket"
    },
    enable_caching=True,
)

# Submitting the 'PIPELINE'

run1.submit()

Now, I will add some complementary comments, which in sum, managed to solve my problem:

  • First, having the "Logs Viewer" (roles/logging.viewer) enabled for your user, will greatly help to troubleshoot any existing error in your pipeline (Note: that role worked for me, however you might want to look for a better matching role for you own purposes here). Those errors will appear as "Logs", which can be accessed by clicking the corresponding button:

enter image description here

  • NOTE: In the picture above, when the "Logs" are displayed, it might be helpful to carefully check each log (close to the time when you created you pipeline), as generally each eof them corresponds with a single warning or error line:

Verte AI Pipelines Logs

  • Second, the output of my pipeline was a tuple. In my original approach, I just returned the plain tuple, but it is advised to return a NamedTuple instead. In general, if you need to input / output one or more "small values" (int or str, for any reason), pick a NamedTuple to do so.
  • Third, when the connection between your pipelines is Input[Dataset] or Ouput[Dataset], adding the file extension is needed (and quite easy to forget). Take for instance the ouput of the get_data component, and notice how the data is recorded by specifically adding the file extension, i.e. dataset.path + ".csv".

Of course, this is a very tiny example, and projects can easily scale to huge projects, however as some sort of "Hello Vertex AI Pipelines" it will work well.

Thank you.

还不是爱你 2025-01-30 04:52:34

感谢您的文章。非常有帮助!我有同样的错误,但事实证明是出于不同的原因,所以在这里注意...
在我的管道定义步骤中,我有以下参数...
''

def my_pipeline(bq_source_project: str = BQ_SOURCE_PROJECT,  
                    bq_source_dataset: str = BQ_SOURCE_DATASET,  
                    bq_source_table: str = BQ_SOURCE_TABLE,  
                    output_data_path: str = "crime_data.csv"):

''

我的错误是当我运行管道时,我没有输入这些相同的参数。以下是固定版本...
'''

job = pipeline_jobs.PipelineJob(  
project=PROJECT_ID,  
      location=LOCATION,  
      display_name=PIPELINE_NAME,  
      job_id=JOB_ID,  
      template_path=FILENAME,  
      pipeline_root=PIPELINE_ROOT,  
      parameter_values={'bq_source_project': BQ_SOURCE_PROJECT,  
                          'bq_source_dataset': BQ_SOURCE_DATASET,  
                          'bq_source_table': BQ_SOURCE_TABLE}  

'''

Thanks for your writeup. Very helpful! I had the same error, but it turned out to be for a different reasons, so noting it here...
In my pipeline definition step I have the following parameters...
'''

def my_pipeline(bq_source_project: str = BQ_SOURCE_PROJECT,  
                    bq_source_dataset: str = BQ_SOURCE_DATASET,  
                    bq_source_table: str = BQ_SOURCE_TABLE,  
                    output_data_path: str = "crime_data.csv"):

'''

My error was when I run the pipeline, I did not have these same parameters entered. Below is the fixed version...
'''

job = pipeline_jobs.PipelineJob(  
project=PROJECT_ID,  
      location=LOCATION,  
      display_name=PIPELINE_NAME,  
      job_id=JOB_ID,  
      template_path=FILENAME,  
      pipeline_root=PIPELINE_ROOT,  
      parameter_values={'bq_source_project': BQ_SOURCE_PROJECT,  
                          'bq_source_dataset': BQ_SOURCE_DATASET,  
                          'bq_source_table': BQ_SOURCE_TABLE}  

'''

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文