GCP 从 Composer 错误触发数据流作业
我正在尝试使用下面的代码从 Composer Airflow DAG 运行数据流作业。
根据代码,我收到两种类型的错误消息。
请建议如何修复它。
a) 错误 1:当服务帐户电子邮件被注释时 (#)
# "serviceAccountEmail": "service-7276363xxxxx@cloudcomposer-accounts.iam.gserviceaccount.com",
错误:
Error: Required 'compute.subnetworks.get' permission for 'projects/vpc-host/regions/us-central1/subnetworks/sbn-dataflow'
b) 错误 2:当使用服务帐户电子邮件时
"serviceAccountEmail": "service-7276363xxxxx@cloudcomposer-accounts.iam.gserviceaccount.com",
错误:
Current user cannot act as service account service-7276363xxxxx@cloudcomposer-accounts.iam.gserviceaccount.com
代码:
import datetime
from airflow import models
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.utils.dates import days_ago
bucket_path = models.Variable.get("bucket_path")
project_id = models.Variable.get("project_id")
gce_zone = models.Variable.get("gce_zone")
default_args = {
"owner": "Airflow",
"start_date": days_ago(1),
"depends_on_past": False,
"dataflow_default_options": {
"project": project_id,
"zone": gce_zone,
"serviceAccountEmail": "service-7276363xxxxx@cloudcomposer-accounts.iam.gserviceaccount.com",
"subnetwork": "https://www.googleapis.com/compute/v1/projects/vpc-host/regions/us-central1/subnetworks/sbn-dataflow",
"tempLocation": bucket_path + "/tmp/",
}
}
with models.DAG(
dag_id="composer_dataflow_dag",
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)
) as dag:
dataflow_template_job = DataflowTemplateOperator(
task_id="dataflow_csv_to_bq",
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
parameters={
"javascriptTextTransformFunctionName": "transformCSVtoJSON",
"javascriptTextTransformGcsPath": bucket_path + "/SCORE_STG.js",
"JSONPath": bucket_path + "/SCORE_STG.json",
"inputFilePattern": bucket_path + "/stg_data.csv",
"outputTable": project_id + ":gcp_stage.SCORE_STG",
"bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
},
dag=dag,
)
I am trying to run Dataflow job from Composer Airflow DAG using below code.
I am getting 2 types of error messages depending on the code.
Please suggest how to fix it.
a) Error 1 : When the Service Account eMail is commented (#)
# "serviceAccountEmail": "service-7276363xxxxx@cloudcomposer-accounts.iam.gserviceaccount.com",
Error:
Error: Required 'compute.subnetworks.get' permission for 'projects/vpc-host/regions/us-central1/subnetworks/sbn-dataflow'
b) Error 2 : When the Service Account eMail is used
"serviceAccountEmail": "service-7276363xxxxx@cloudcomposer-accounts.iam.gserviceaccount.com",
Error:
Current user cannot act as service account service-7276363xxxxx@cloudcomposer-accounts.iam.gserviceaccount.com
Code:
import datetime
from airflow import models
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.utils.dates import days_ago
bucket_path = models.Variable.get("bucket_path")
project_id = models.Variable.get("project_id")
gce_zone = models.Variable.get("gce_zone")
default_args = {
"owner": "Airflow",
"start_date": days_ago(1),
"depends_on_past": False,
"dataflow_default_options": {
"project": project_id,
"zone": gce_zone,
"serviceAccountEmail": "service-7276363xxxxx@cloudcomposer-accounts.iam.gserviceaccount.com",
"subnetwork": "https://www.googleapis.com/compute/v1/projects/vpc-host/regions/us-central1/subnetworks/sbn-dataflow",
"tempLocation": bucket_path + "/tmp/",
}
}
with models.DAG(
dag_id="composer_dataflow_dag",
default_args=default_args,
schedule_interval=datetime.timedelta(days=1)
) as dag:
dataflow_template_job = DataflowTemplateOperator(
task_id="dataflow_csv_to_bq",
template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
parameters={
"javascriptTextTransformFunctionName": "transformCSVtoJSON",
"javascriptTextTransformGcsPath": bucket_path + "/SCORE_STG.js",
"JSONPath": bucket_path + "/SCORE_STG.json",
"inputFilePattern": bucket_path + "/stg_data.csv",
"outputTable": project_id + ":gcp_stage.SCORE_STG",
"bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
},
dag=dag,
)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您必须使用不同的服务帐户。请记住,它必须有权访问资源。这应该可以解决这两个问题。
您可以创建一个服务帐号来充当工作人员,如角色分配中所述。即:一名工人和一名管理员。
除此之外,我没有发现任何不正常的东西。连参数都正确传递了。其他用户参考:
You have to use different service accounts.Remember that it have to had the access to the resources. That should fix both issues.
You can create a service account to act as a worker as explained on Role Assignment. ie: a worker and a admin.
Besides that I don't find anything outside of the normal. Even the parameters are correctly passed. For other users reference: