在气流版本2.2.5中解析DAG时面对任务超时错误

发布于 2025-02-07 00:18:46 字数 4050 浏览 1 评论 0原文

我正在使用AirFlow版本2.2.5/Composer 2.0.15击中任务超时错误。相同的代码在airflow版本2.2.3 /Composer版本1.18.0 < /code>

错误消息中绝对可以运行,错误消息:

Broken DAG: [/home/airflow/gcs/dags/test_dag.py] Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/enum.py", line 256, in __new__
    if canonical_member._value_ == enum_member._value_:
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/timeout.py", line 37, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /home/airflow/gcs/dags/test_dag.py after 30.0s.
Please take a look at these docs to improve your DAG import time:
* https://airflow.apache.org/docs/apache-airflow/2.2.5/best-practices.html#top-level-python-code
* https://airflow.apache.org/docs/apache-airflow/2.2.5/best-practices.html#reducing-dag-complexity, PID: 1827

根据文档或有关顶级Python代码的错误消息中的链接。 我们有一个用于DAG和任务的框架。

main_folder

| ___ dags

| ___任务

| ___ libs

a)所有主要dag文件均在dags文件夹中

b)将实际函数或查询(来自PythOnoperator函数/ sql查询/ sql查询)都放在 *.py文件中,将其放置在 *.py文件中

。放在Libs文件夹中的Python文件中。

在这里提供基本的DAG结构:

# Import libraries and functions 
import datetime

from airflow import models, DAG
from airflow.contrib.operators import bigquery_operator, bigquery_to_gcs, bigquery_table_delete_operator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
##from airflow.executors.sequential_executor import SequentialExecutor
from airflow.utils.task_group import TaskGroup

## Import codes from tasks and libs folder
from libs.compres_suppress.cot_suppress import *
from libs.teams_plugin.teams_plugin import *
from tasks.email_code.trigger_email import *

# Set up Airflow DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime(2020, 12, 15, 0),
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=1),
    'on_failure_callback': trigger_email
}

DAG_ID = 'test_dag'


# Check exscution date
if "<some condition>" matches:
    run_date = <date in config file>
else:
    run_date = datetime.datetime.now().strftime("%Y-%m-%d")
    run_date_day = datetime.datetime.now().isoweekday()


dag = DAG(
    DAG_ID,
    default_args=default_args, catchup=False,
    max_active_runs=1, schedule_interval=SCHEDULE_INTERVAL
)


next_dag_name = "next_dag1"
if env == "prod":
    if run_date_day == 7:
        next_dag_name = "next_dag2"
    else:
        next_dag_name = "next_dag1"


run_id = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")

# Define Airflow DAG
with dag:

    team_notify_task = MSTeamsWebhookOperator(
        task_id='teams_notifi_start_task',
        http_conn_id='http_conn_id',
        message=f"DAG has started <br />"
                f"<strong> DAG ID:</strong> {DAG_ID}.<br />",
        theme_color="00FF00",
        button_text="My button",
        dag=dag)
        
    task1_bq = bigquery_operator.BigQueryOperator(
        task_id='task1',
        sql=task1_query(
            table1="table1",
            start_date=start_date),
        use_legacy_sql=False,
        destination_dataset_table="destination_tbl_name",
        write_disposition='WRITE_TRUNCATE'      
    )

    ##### Base Skeletons #####
    with TaskGroup("taskgroup_lbl", tooltip="taskgroup_sample") as task_grp:
         tg_process(args=default_args,run_date=run_date)
         

    if run_mode == "<env_name>" and next_dag != "":
        next_dag_trigg = BashOperator(
            task_id=f'trigger_{next_dag_name}',
            bash_command="gcloud composer environments run " + <env> + "-cust_comp --location us-east1 dags trigger -- " + next_dag_name + " --run-id='trigger_ "'"
        )
        task_grp >> next_dag_trigger
        
    team_notify_task >> task1_bq >> task_grp 
    enter code here

有人可以就造成问题的原因提供帮助?

I am hitting the task timeout error with Airflow Version 2.2.5/Composer 2.0.15. The same code is running absolutely fine in Airflow version2.2.3 /Composer Version 1.18.0

Error Message :

Broken DAG: [/home/airflow/gcs/dags/test_dag.py] Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/enum.py", line 256, in __new__
    if canonical_member._value_ == enum_member._value_:
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/timeout.py", line 37, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /home/airflow/gcs/dags/test_dag.py after 30.0s.
Please take a look at these docs to improve your DAG import time:
* https://airflow.apache.org/docs/apache-airflow/2.2.5/best-practices.html#top-level-python-code
* https://airflow.apache.org/docs/apache-airflow/2.2.5/best-practices.html#reducing-dag-complexity, PID: 1827

As per the documentation or the links in error message about Top Level Python code.
We have a framework in place for Dags and tasks.

main_folder

|___ dags

|___ tasks

|___ libs

a) All the main dag files are in dags folder

b) Actual functions or queries (from PythonOperator functions/ Sql Queries) are placed in *.py files under tasks folder

c) Common functionalities are placed in python files in libs folder.

Providing basic dag structure here:

# Import libraries and functions 
import datetime

from airflow import models, DAG
from airflow.contrib.operators import bigquery_operator, bigquery_to_gcs, bigquery_table_delete_operator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
##from airflow.executors.sequential_executor import SequentialExecutor
from airflow.utils.task_group import TaskGroup

## Import codes from tasks and libs folder
from libs.compres_suppress.cot_suppress import *
from libs.teams_plugin.teams_plugin import *
from tasks.email_code.trigger_email import *

# Set up Airflow DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime(2020, 12, 15, 0),
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=1),
    'on_failure_callback': trigger_email
}

DAG_ID = 'test_dag'


# Check exscution date
if "<some condition>" matches:
    run_date = <date in config file>
else:
    run_date = datetime.datetime.now().strftime("%Y-%m-%d")
    run_date_day = datetime.datetime.now().isoweekday()


dag = DAG(
    DAG_ID,
    default_args=default_args, catchup=False,
    max_active_runs=1, schedule_interval=SCHEDULE_INTERVAL
)


next_dag_name = "next_dag1"
if env == "prod":
    if run_date_day == 7:
        next_dag_name = "next_dag2"
    else:
        next_dag_name = "next_dag1"


run_id = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")

# Define Airflow DAG
with dag:

    team_notify_task = MSTeamsWebhookOperator(
        task_id='teams_notifi_start_task',
        http_conn_id='http_conn_id',
        message=f"DAG has started <br />"
                f"<strong> DAG ID:</strong> {DAG_ID}.<br />",
        theme_color="00FF00",
        button_text="My button",
        dag=dag)
        
    task1_bq = bigquery_operator.BigQueryOperator(
        task_id='task1',
        sql=task1_query(
            table1="table1",
            start_date=start_date),
        use_legacy_sql=False,
        destination_dataset_table="destination_tbl_name",
        write_disposition='WRITE_TRUNCATE'      
    )

    ##### Base Skeletons #####
    with TaskGroup("taskgroup_lbl", tooltip="taskgroup_sample") as task_grp:
         tg_process(args=default_args,run_date=run_date)
         

    if run_mode == "<env_name>" and next_dag != "":
        next_dag_trigg = BashOperator(
            task_id=f'trigger_{next_dag_name}',
            bash_command="gcloud composer environments run " + <env> + "-cust_comp --location us-east1 dags trigger -- " + next_dag_name + " --run-id='trigger_ "'"
        )
        task_grp >> next_dag_trigger
        
    team_notify_task >> task1_bq >> task_grp 
    enter code here

Can someone help on this on what is causing the issue?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

终止放荡 2025-02-14 00:18:46

增加DAG/任务超时时间可以解决问题。

转到顶部栏上的气流(Web UI),导航到

变量 - &gt;配置 - &gt; [核心] - &GT; dagbag_import_timeout =&lt;从30(默认)更改为160&gt;。

如果使用作曲家,则可以通过以下步骤完成相同的操作。

a)转到作曲家服务,然后选择要修改设置的作曲家。

b)单击气流配置覆盖 - &gt;编辑 - &GT; (添加/编辑)dagbag_import_timeout = 160

c)单击保存

Increasing the dag/task timeout time does the trick.

Go to Airflow (Web UI), On the top bar navigate to

Variables--> Configuration --> [core] --> dagbag_import_timeout = <changed from 30(default) to 160>.

If using Composer, the same can be done through following steps.

a) Go to Composer service and select the composer to which the settings are to be modified.

b) Click on AIRFLOW CONFIGURATION OVERRIDES --> EDIT --> (add/edit) dagbag_import_timeout=160

c) Click on save

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文