来自 Gsheets 的ExternalTable(Ghseets) 运算符的 Airflow 2.x 存储桶和范围参数
我在 Google-Composer 中使用新的 Airflow 运算符 BigQueryCreateExternalTableOperator
时遇到问题:
问题 1
创建 Airflow 任务后会发生这种情况:
AttributeError: 'BigQueryCreateExternalTableOperator' object has no attribute 'bucket'
但是,当我查询 gsheets 文件时,为什么它要查找存储桶参数?我疯狂地试图找出发生了什么!根据 docs 它是可选的!
示例代码
task1 = BigQueryCreateExternalTableOperator(
task_id="task1_externaltable",
table_resource={
"tableReference": {
"projectId": projectid,
"datasetId": datasetid,
"tableId": tableid,
},
"schema": schema_fields,
"externalDataConfiguration": {
"sourceFormat": "GOOGLE_SHEETS",
"autodetect": False,
"compression": "NONE",
"googleSheetsOptions": {
"skipLeadingRows": 1,
"range": gsheets_tab_name,
},
"sourceUris": gsheets_url,
},
},
)
按照Elad的建议,错误回溯!:
AttributeError: 'BigQueryCreateExternalTableOperator' object has no attribute 'bucket'
[2022-03-18, 14:45:38 UTC] {taskinstance.py:1268} INFO - Marking task as UP_FOR_RETRY. dag_id=trm_analytics_attribution_collision_checker_dag, task_id=create_manual_attribution_2_external_table, execution_date=20220318T144520, start_date=20220318T144536, end_date=20220318T144538
[2022-03-18, 14:45:38 UTC] {standard_task_runner.py:89} ERROR - Failed to execute job 444 for task create_manual_attribution_2_external_table
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
args.func(args, dag=self.dag)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/cli.py", line 94, in wrapper
return f(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 302, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
_run_raw_task(args, ti)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
ti._run_raw_task(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1330, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1420, in _execute_task_with_callbacks
self.render_templates(context=context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1995, in render_templates
self.task.render_template_fields(context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1061, in render_template_fields
self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1072, in _do_render_template_fields
content = getattr(parent, attr_name)
AttributeError: 'BigQueryCreateExternalTableOperator' object has no attribute 'bucket'
[2022-03-18, 14:45:38 UTC] {local_task_job.py:154} INFO - Task exited with return code 1
[2022-03-18, 14:45:38 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
I am having problems using the new Airflow operator BigQueryCreateExternalTableOperator
within Google-Composer:
Question 1
After creating an Airflow task this is happening :
AttributeError: 'BigQueryCreateExternalTableOperator' object has no attribute 'bucket'
However, as I am querying a gsheets file why it is looking for bucket argument? I am getting crazy trying to find what is happening! According to the docs it is optional!
Sample Code
task1 = BigQueryCreateExternalTableOperator(
task_id="task1_externaltable",
table_resource={
"tableReference": {
"projectId": projectid,
"datasetId": datasetid,
"tableId": tableid,
},
"schema": schema_fields,
"externalDataConfiguration": {
"sourceFormat": "GOOGLE_SHEETS",
"autodetect": False,
"compression": "NONE",
"googleSheetsOptions": {
"skipLeadingRows": 1,
"range": gsheets_tab_name,
},
"sourceUris": gsheets_url,
},
},
)
Following Elad's suggestion, error traceback!:
AttributeError: 'BigQueryCreateExternalTableOperator' object has no attribute 'bucket'
[2022-03-18, 14:45:38 UTC] {taskinstance.py:1268} INFO - Marking task as UP_FOR_RETRY. dag_id=trm_analytics_attribution_collision_checker_dag, task_id=create_manual_attribution_2_external_table, execution_date=20220318T144520, start_date=20220318T144536, end_date=20220318T144538
[2022-03-18, 14:45:38 UTC] {standard_task_runner.py:89} ERROR - Failed to execute job 444 for task create_manual_attribution_2_external_table
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
args.func(args, dag=self.dag)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/cli.py", line 94, in wrapper
return f(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 302, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
_run_raw_task(args, ti)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
ti._run_raw_task(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1330, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1420, in _execute_task_with_callbacks
self.render_templates(context=context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1995, in render_templates
self.task.render_template_fields(context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1061, in render_template_fields
self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1072, in _do_render_template_fields
content = getattr(parent, attr_name)
AttributeError: 'BigQueryCreateExternalTableOperator' object has no attribute 'bucket'
[2022-03-18, 14:45:38 UTC] {local_task_job.py:154} INFO - Task exited with return code 1
[2022-03-18, 14:45:38 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论