来自 Gsheets 的ExternalTable(Ghseets) 运算符的 Airflow 2.x 存储桶和范围参数

发布于 2025-01-15 02:47:19 字数 4032 浏览 4 评论 0原文

我在 Google-Composer 中使用新的 Airflow 运算符 BigQueryCreateExternalTableOperator 时遇到问题:

问题 1

创建 Airflow 任务后会发生这种情况:

AttributeError: 'BigQueryCreateExternalTableOperator' object has no attribute 'bucket'

但是,当我查询 gsheets 文件时,为什么它要查找存储桶参数?我疯狂地试图找出发生了什么!根据 docs 它是可选的!

示例代码

task1 = BigQueryCreateExternalTableOperator(
        task_id="task1_externaltable",
        table_resource={
            "tableReference": {
                "projectId": projectid,
                "datasetId": datasetid,
                "tableId": tableid,
            },
            "schema": schema_fields,
            "externalDataConfiguration": {
                "sourceFormat": "GOOGLE_SHEETS",
                "autodetect": False,
                "compression": "NONE",
                "googleSheetsOptions": {
                    "skipLeadingRows": 1,
                    "range": gsheets_tab_name,
                },
                "sourceUris": gsheets_url,
            },
        },
    )

按照Elad的建议,错误回溯!:

AttributeError: 'BigQueryCreateExternalTableOperator' object has no attribute 'bucket'
[2022-03-18, 14:45:38 UTC] {taskinstance.py:1268} INFO - Marking task as UP_FOR_RETRY. dag_id=trm_analytics_attribution_collision_checker_dag, task_id=create_manual_attribution_2_external_table, execution_date=20220318T144520, start_date=20220318T144536, end_date=20220318T144538
[2022-03-18, 14:45:38 UTC] {standard_task_runner.py:89} ERROR - Failed to execute job 444 for task create_manual_attribution_2_external_table
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
    args.func(args, dag=self.dag)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/cli.py", line 94, in wrapper
    return f(*args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 302, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
    _run_raw_task(args, ti)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
    ti._run_raw_task(
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1330, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1420, in _execute_task_with_callbacks
    self.render_templates(context=context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1995, in render_templates
    self.task.render_template_fields(context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1061, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1072, in _do_render_template_fields
    content = getattr(parent, attr_name)
AttributeError: 'BigQueryCreateExternalTableOperator' object has no attribute 'bucket'
[2022-03-18, 14:45:38 UTC] {local_task_job.py:154} INFO - Task exited with return code 1
[2022-03-18, 14:45:38 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check

I am having problems using the new Airflow operator BigQueryCreateExternalTableOperator within Google-Composer:

Question 1

After creating an Airflow task this is happening :

AttributeError: 'BigQueryCreateExternalTableOperator' object has no attribute 'bucket'

However, as I am querying a gsheets file why it is looking for bucket argument? I am getting crazy trying to find what is happening! According to the docs it is optional!

Sample Code

task1 = BigQueryCreateExternalTableOperator(
        task_id="task1_externaltable",
        table_resource={
            "tableReference": {
                "projectId": projectid,
                "datasetId": datasetid,
                "tableId": tableid,
            },
            "schema": schema_fields,
            "externalDataConfiguration": {
                "sourceFormat": "GOOGLE_SHEETS",
                "autodetect": False,
                "compression": "NONE",
                "googleSheetsOptions": {
                    "skipLeadingRows": 1,
                    "range": gsheets_tab_name,
                },
                "sourceUris": gsheets_url,
            },
        },
    )

Following Elad's suggestion, error traceback!:

AttributeError: 'BigQueryCreateExternalTableOperator' object has no attribute 'bucket'
[2022-03-18, 14:45:38 UTC] {taskinstance.py:1268} INFO - Marking task as UP_FOR_RETRY. dag_id=trm_analytics_attribution_collision_checker_dag, task_id=create_manual_attribution_2_external_table, execution_date=20220318T144520, start_date=20220318T144536, end_date=20220318T144538
[2022-03-18, 14:45:38 UTC] {standard_task_runner.py:89} ERROR - Failed to execute job 444 for task create_manual_attribution_2_external_table
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
    args.func(args, dag=self.dag)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/cli.py", line 94, in wrapper
    return f(*args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 302, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
    _run_raw_task(args, ti)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
    ti._run_raw_task(
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1330, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1420, in _execute_task_with_callbacks
    self.render_templates(context=context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1995, in render_templates
    self.task.render_template_fields(context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1061, in render_template_fields
    self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/baseoperator.py", line 1072, in _do_render_template_fields
    content = getattr(parent, attr_name)
AttributeError: 'BigQueryCreateExternalTableOperator' object has no attribute 'bucket'
[2022-03-18, 14:45:38 UTC] {local_task_job.py:154} INFO - Task exited with return code 1
[2022-03-18, 14:45:38 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文