从气流中的另一个DAG获取DAG电子邮件
我想在此DAG的默认值中提到的电子邮件使用气流中的另一个DAG。我该怎么做?请帮忙,我是气流的新手!
from airflow.models import DagRun
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from datetime import datetime, timedelta
from airflow import DAG
def first_function(**context):
print("hello")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG(
'main',
default_args=default_args,
description='Sample DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2022,6,10),
catchup=False
) as dag:
first_function = PythonOperator(
task_id="first_function",
python_callable=first_function,
)
first_function
I want to get the email mentioned in this DAG's default args using another DAG in the airflow. How can I do that? Please help, I am new to airflow!
from airflow.models import DagRun
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from datetime import datetime, timedelta
from airflow import DAG
def first_function(**context):
print("hello")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG(
'main',
default_args=default_args,
description='Sample DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2022,6,10),
catchup=False
) as dag:
first_function = PythonOperator(
task_id="first_function",
python_callable=first_function,
)
first_function
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您可以使用自定义模块/操作员/或跨DAG的任何任意Python代码。
通常,您会在dags目录中创建一个目录(默认情况下是
{airflow_home}/dags
。以共享
default_args
在dags之间,您可以创建以下布局:{airflow_home}/dags/custom/custom/custom/shared_config.py.py.py
create
{airflow_home}代码>
.airflowignore
的第一行。您可以在airflow文档中看到此布局,在此
。您的dags(默认情况下每30秒发生一次) - 因为
自定义
目录包含python,但从来没有任何dags,调度程序应该跳过这些文件,以避免向调度程序添加不必要的加载。这在上面的文档链接中进行了解释。您需要在自定义目录中添加
__ INT __。py
- 即使在python3中写入时,气流也需要它,因为隐含的命名空间,您不需要它(同样在上面的同一链接中解释了) 。从您的DAG中,您可以根据需要导入:
You can use a custom module in Airflow to share config options/Operators/or any arbitrary Python code across DAGs.
Typically you would create a directory in your DAGs directory (which by default is
{AIRFLOW_HOME}/dags
.To share
default_args
between DAGs, you could create the following layout:{AIRFLOW_HOME}/dags/custom/__init__.py
{AIRFLOW_HOME}/dags/custom/shared_config.py
{AIRFLOW_HOME}/dags/.airflowignore
custom
to the first line of.airflowignore
.default_args
dictionary from your DAG into{AIRFLOW_HOME}/dags/custom/shared_config.py
You can see this layout suggested in the Airflow documentation here.
The
.airflowignore
tells the scheduler to skip thecustom
directory when it parses your DAGs (which by default happens every 30s) - because thecustom
directory contains Python, but never any DAGs, the scheduler should skip these files to avoid adding unnecessary load to the scheduler. This is explained in the documentation link above.You need to add an
__init__.py
to the custom directory - airflow requires it even though when writing in Python3 you don't need it because of implicit namespaces (again this is explained in the same link above).From your dag you can then import as needed: