在气流版本2.2.5中解析DAG时面对任务超时错误
我正在使用AirFlow版本2.2.5/Composer 2.0.15
击中任务超时错误。相同的代码在airflow版本2.2.3 /Composer版本1.18.0 < /code>
错误消息中绝对可以运行,错误消息:
Broken DAG: [/home/airflow/gcs/dags/test_dag.py] Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/enum.py", line 256, in __new__
if canonical_member._value_ == enum_member._value_:
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/timeout.py", line 37, in handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /home/airflow/gcs/dags/test_dag.py after 30.0s.
Please take a look at these docs to improve your DAG import time:
* https://airflow.apache.org/docs/apache-airflow/2.2.5/best-practices.html#top-level-python-code
* https://airflow.apache.org/docs/apache-airflow/2.2.5/best-practices.html#reducing-dag-complexity, PID: 1827
根据文档或有关顶级Python代码的错误消息中的链接。 我们有一个用于DAG和任务的框架。
main_folder
| ___ dags
| ___任务
| ___ libs
a)所有主要dag文件均在dags文件夹中
b)将实际函数或查询(来自PythOnoperator函数/ sql查询/ sql查询)都放在 *.py文件中,将其放置在 *.py文件中
。放在Libs文件夹中的Python文件中。
在这里提供基本的DAG结构:
# Import libraries and functions
import datetime
from airflow import models, DAG
from airflow.contrib.operators import bigquery_operator, bigquery_to_gcs, bigquery_table_delete_operator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
##from airflow.executors.sequential_executor import SequentialExecutor
from airflow.utils.task_group import TaskGroup
## Import codes from tasks and libs folder
from libs.compres_suppress.cot_suppress import *
from libs.teams_plugin.teams_plugin import *
from tasks.email_code.trigger_email import *
# Set up Airflow DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime(2020, 12, 15, 0),
'retries': 1,
'retry_delay': datetime.timedelta(minutes=1),
'on_failure_callback': trigger_email
}
DAG_ID = 'test_dag'
# Check exscution date
if "<some condition>" matches:
run_date = <date in config file>
else:
run_date = datetime.datetime.now().strftime("%Y-%m-%d")
run_date_day = datetime.datetime.now().isoweekday()
dag = DAG(
DAG_ID,
default_args=default_args, catchup=False,
max_active_runs=1, schedule_interval=SCHEDULE_INTERVAL
)
next_dag_name = "next_dag1"
if env == "prod":
if run_date_day == 7:
next_dag_name = "next_dag2"
else:
next_dag_name = "next_dag1"
run_id = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
# Define Airflow DAG
with dag:
team_notify_task = MSTeamsWebhookOperator(
task_id='teams_notifi_start_task',
http_conn_id='http_conn_id',
message=f"DAG has started <br />"
f"<strong> DAG ID:</strong> {DAG_ID}.<br />",
theme_color="00FF00",
button_text="My button",
dag=dag)
task1_bq = bigquery_operator.BigQueryOperator(
task_id='task1',
sql=task1_query(
table1="table1",
start_date=start_date),
use_legacy_sql=False,
destination_dataset_table="destination_tbl_name",
write_disposition='WRITE_TRUNCATE'
)
##### Base Skeletons #####
with TaskGroup("taskgroup_lbl", tooltip="taskgroup_sample") as task_grp:
tg_process(args=default_args,run_date=run_date)
if run_mode == "<env_name>" and next_dag != "":
next_dag_trigg = BashOperator(
task_id=f'trigger_{next_dag_name}',
bash_command="gcloud composer environments run " + <env> + "-cust_comp --location us-east1 dags trigger -- " + next_dag_name + " --run-id='trigger_ "'"
)
task_grp >> next_dag_trigger
team_notify_task >> task1_bq >> task_grp
enter code here
有人可以就造成问题的原因提供帮助?
I am hitting the task timeout error with Airflow Version 2.2.5/Composer 2.0.15
. The same code is running absolutely fine in Airflow version2.2.3 /Composer Version 1.18.0
Error Message :
Broken DAG: [/home/airflow/gcs/dags/test_dag.py] Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/enum.py", line 256, in __new__
if canonical_member._value_ == enum_member._value_:
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/timeout.py", line 37, in handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /home/airflow/gcs/dags/test_dag.py after 30.0s.
Please take a look at these docs to improve your DAG import time:
* https://airflow.apache.org/docs/apache-airflow/2.2.5/best-practices.html#top-level-python-code
* https://airflow.apache.org/docs/apache-airflow/2.2.5/best-practices.html#reducing-dag-complexity, PID: 1827
As per the documentation or the links in error message about Top Level Python code.
We have a framework in place for Dags and tasks.
main_folder
|___ dags
|___ tasks
|___ libs
a) All the main dag files are in dags folder
b) Actual functions or queries (from PythonOperator functions/ Sql Queries) are placed in *.py files under tasks folder
c) Common functionalities are placed in python files in libs folder.
Providing basic dag structure here:
# Import libraries and functions
import datetime
from airflow import models, DAG
from airflow.contrib.operators import bigquery_operator, bigquery_to_gcs, bigquery_table_delete_operator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
##from airflow.executors.sequential_executor import SequentialExecutor
from airflow.utils.task_group import TaskGroup
## Import codes from tasks and libs folder
from libs.compres_suppress.cot_suppress import *
from libs.teams_plugin.teams_plugin import *
from tasks.email_code.trigger_email import *
# Set up Airflow DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime(2020, 12, 15, 0),
'retries': 1,
'retry_delay': datetime.timedelta(minutes=1),
'on_failure_callback': trigger_email
}
DAG_ID = 'test_dag'
# Check exscution date
if "<some condition>" matches:
run_date = <date in config file>
else:
run_date = datetime.datetime.now().strftime("%Y-%m-%d")
run_date_day = datetime.datetime.now().isoweekday()
dag = DAG(
DAG_ID,
default_args=default_args, catchup=False,
max_active_runs=1, schedule_interval=SCHEDULE_INTERVAL
)
next_dag_name = "next_dag1"
if env == "prod":
if run_date_day == 7:
next_dag_name = "next_dag2"
else:
next_dag_name = "next_dag1"
run_id = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
# Define Airflow DAG
with dag:
team_notify_task = MSTeamsWebhookOperator(
task_id='teams_notifi_start_task',
http_conn_id='http_conn_id',
message=f"DAG has started <br />"
f"<strong> DAG ID:</strong> {DAG_ID}.<br />",
theme_color="00FF00",
button_text="My button",
dag=dag)
task1_bq = bigquery_operator.BigQueryOperator(
task_id='task1',
sql=task1_query(
table1="table1",
start_date=start_date),
use_legacy_sql=False,
destination_dataset_table="destination_tbl_name",
write_disposition='WRITE_TRUNCATE'
)
##### Base Skeletons #####
with TaskGroup("taskgroup_lbl", tooltip="taskgroup_sample") as task_grp:
tg_process(args=default_args,run_date=run_date)
if run_mode == "<env_name>" and next_dag != "":
next_dag_trigg = BashOperator(
task_id=f'trigger_{next_dag_name}',
bash_command="gcloud composer environments run " + <env> + "-cust_comp --location us-east1 dags trigger -- " + next_dag_name + " --run-id='trigger_ "'"
)
task_grp >> next_dag_trigger
team_notify_task >> task1_bq >> task_grp
enter code here
Can someone help on this on what is causing the issue?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
增加DAG/任务超时时间可以解决问题。
转到顶部栏上的气流(Web UI),导航到
变量 - &gt;配置 - &gt; [核心] - &GT; dagbag_import_timeout =&lt;从30(默认)更改为160&gt;。
如果使用作曲家,则可以通过以下步骤完成相同的操作。
a)转到作曲家服务,然后选择要修改设置的作曲家。
b)单击
气流配置覆盖
- &gt;编辑 - &GT; (添加/编辑)dagbag_import_timeout = 160c)单击保存
Increasing the dag/task timeout time does the trick.
Go to Airflow (Web UI), On the top bar navigate to
Variables--> Configuration --> [core] --> dagbag_import_timeout = <changed from 30(default) to 160>.
If using Composer, the same can be done through following steps.
a) Go to Composer service and select the composer to which the settings are to be modified.
b) Click on
AIRFLOW CONFIGURATION OVERRIDES
--> EDIT --> (add/edit) dagbag_import_timeout=160c) Click on save