GCS Bucket 文件路径作为 Cloud Function - GCP 触发的数据流管道的输入

发布于 2025-01-18 13:52:16 字数 142 浏览 1 评论 0原文

我正在尝试使用云函数(创建/最终确定)触发器,以使GCS存储键启动数据流管线。我正在尝试弄清楚如何在触发时将GCS存储桶中的CSV文件路径提供给自定义数据流管线。

请让我知道您是否遇到了类似的问题,您在Python中的解决方案是什么?

谢谢。

I am trying to use Cloud Function (Create/Finalize) trigger for GCS Bucket to start a Data flow pipeline. I am trying to figure out how to give the csv file path in GCS Bucket to custom dataflow pipeline when triggered.

Please let me know if you came across similar issue and what's your solution in python?

Thanks.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

蒲公英的约定 2025-01-25 13:52:16

您需要为数据流创建Flex模板。
https://cloud.google.google.com/dataflow/dataflow/docs/指南/模板/使用FLEX-TEMPLATE
在此,您可以在运行时动态传递参数到数据流。完成后,在云功能中添加数据流触发代码类似:

def startDataflow(project, flex_template_path, jobName, bq_dataset, raw_table, prc_table, start_date):

# Defining JSON request for trigger flex DF Job

parameters = {
    "gcp_project": project,
    "bq_dataset": bq_dataset,
    "raw_table": raw_table,
    "prc_table": prc_table,
    "start_date": start_date
}
environment = {
    "stagingLocation": DATAFLOW_STAGING_LOCATION,
    "additionalExperiments": DATAFLOW_ADDITIONAL_EXPERIMENTS,
    "maxWorkers": DATAFLOW_MAX_WORKER_COUNT,
    "machineType": DATAFLOW_MACHINE_TYPE,
    "serviceAccountEmail": DATAFLOW_SERVICE_ACCOUNT_EMAIL,
    "network": DATAFLOW_NETWORK,
    "subnetwork": DATAFLOW_SUBNETWORK,
    "ipConfiguration": DATAFLOW_IP_CONFIGURATION
}
body = {
    "launchParameter": {
        "jobName": jobName,
        "parameters": parameters,
        "environment": environment,
        "containerSpecGcsPath": flex_template_path,
    }
}
service = build("dataflow", "v1b3", cache_discovery=False)
# Creating request to trigger the Flex DF Pipeline
request = (
    service.projects().locations().flexTemplates().launch(
        projectId=DATAFLOW_RUN_LOCATION,
        location=DATAFLOW_RUN_PROJECT_REGION,
        body=body
    )
)
try:
    response = request.execute()
except Exception as e:
    logging.exception(
        "There was an exception while triggering the dataflow pipeline with the job name: {}. The exception is: {}".format(
            jobName, e))

return response

在上面的代码中,参数字典是您在运行时传递参数到数据流管线的地方。
现在,由于GCS事件触发了云功能,因此您可以从GCS访问文件名,并在调用它时将其发送到数据流管线。

这个答案吗?

You need to create flex template for dataflow.
https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
In this, you can pass parameters dynamically to the dataflow at runtime. Once done, add dataflow trigger code in your cloud function similar to this:

def startDataflow(project, flex_template_path, jobName, bq_dataset, raw_table, prc_table, start_date):

# Defining JSON request for trigger flex DF Job

parameters = {
    "gcp_project": project,
    "bq_dataset": bq_dataset,
    "raw_table": raw_table,
    "prc_table": prc_table,
    "start_date": start_date
}
environment = {
    "stagingLocation": DATAFLOW_STAGING_LOCATION,
    "additionalExperiments": DATAFLOW_ADDITIONAL_EXPERIMENTS,
    "maxWorkers": DATAFLOW_MAX_WORKER_COUNT,
    "machineType": DATAFLOW_MACHINE_TYPE,
    "serviceAccountEmail": DATAFLOW_SERVICE_ACCOUNT_EMAIL,
    "network": DATAFLOW_NETWORK,
    "subnetwork": DATAFLOW_SUBNETWORK,
    "ipConfiguration": DATAFLOW_IP_CONFIGURATION
}
body = {
    "launchParameter": {
        "jobName": jobName,
        "parameters": parameters,
        "environment": environment,
        "containerSpecGcsPath": flex_template_path,
    }
}
service = build("dataflow", "v1b3", cache_discovery=False)
# Creating request to trigger the Flex DF Pipeline
request = (
    service.projects().locations().flexTemplates().launch(
        projectId=DATAFLOW_RUN_LOCATION,
        location=DATAFLOW_RUN_PROJECT_REGION,
        body=body
    )
)
try:
    response = request.execute()
except Exception as e:
    logging.exception(
        "There was an exception while triggering the dataflow pipeline with the job name: {}. The exception is: {}".format(
            jobName, e))

return response

In the above code, the parameters dictionary is the place where you pass in parameters to the dataflow pipeline at run time.
Now, as your cloud function is triggered by gcs event, you can access the file name from GCS and send it to your dataflow pipeline when you invoke it.

Does this answer?

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文