从气流中的另一个DAG获取DAG电子邮件

发布于 2025-02-06 03:11:27 字数 1003 浏览 1 评论 0原文

我想在此DAG的默认值中提到的电子邮件使用气流中的另一个DAG。我该怎么做?请帮忙,我是气流的新手!

from airflow.models import DagRun
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from datetime import datetime, timedelta
from airflow import DAG

def first_function(**context):
    print("hello")

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    'main',
    default_args=default_args,
    description='Sample DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2022,6,10),
    catchup=False
) as dag:
    first_function = PythonOperator(
        task_id="first_function",
        python_callable=first_function,
    )
first_function

I want to get the email mentioned in this DAG's default args using another DAG in the airflow. How can I do that? Please help, I am new to airflow!

from airflow.models import DagRun
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from datetime import datetime, timedelta
from airflow import DAG

def first_function(**context):
    print("hello")

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    'main',
    default_args=default_args,
    description='Sample DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2022,6,10),
    catchup=False
) as dag:
    first_function = PythonOperator(
        task_id="first_function",
        python_callable=first_function,
    )
first_function

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

南城追梦 2025-02-13 03:11:27

您可以使用自定义模块/操作员/或跨DAG的任何任意Python代码。

通常,您会在dags目录中创建一个目录(默认情况下是{airflow_home}/dags

以共享default_args在dags之间,您可以创建以下布局:

  • 创建<<代码> {airflow_home}/dags/dags/custom/__ init __. py
  • create {airflow_home}/dags/custom/custom/custom/shared_config.py.py.py
  • create create {airflow_home}代码>
  • 目录名称<代码> custom 添加到.airflowignore的第一行。
  • 将 airflow_home}/dags/custom/shared_config.py

您可以在airflow文档中看到此布局,在此

。您的dags(默认情况下每30秒发生一次) - 因为自定义目录包含python,但从来没有任何dags,调度程序应该跳过这些文件,以避免向调度程序添加不必要的加载。这在上面的文档链接中进行了解释。

您需要在自定义目录中添加__ INT __。py - 即使在python3中写入时,气流也需要它,因为隐含的命名空间,您不需要它(同样在上面的同一链接中解释了) 。

从您的DAG中,您可以根据需要导入:

from custom.shared_config import default_args

You can use a custom module in Airflow to share config options/Operators/or any arbitrary Python code across DAGs.

Typically you would create a directory in your DAGs directory (which by default is {AIRFLOW_HOME}/dags.

To share default_args between DAGs, you could create the following layout:

  • Create {AIRFLOW_HOME}/dags/custom/__init__.py
  • Create {AIRFLOW_HOME}/dags/custom/shared_config.py
  • Create {AIRFLOW_HOME}/dags/.airflowignore
  • Add the directory name custom to the first line of .airflowignore.
  • Cut and paste your default_args dictionary from your DAG into {AIRFLOW_HOME}/dags/custom/shared_config.py

You can see this layout suggested in the Airflow documentation here.

The .airflowignore tells the scheduler to skip the custom directory when it parses your DAGs (which by default happens every 30s) - because the custom directory contains Python, but never any DAGs, the scheduler should skip these files to avoid adding unnecessary load to the scheduler. This is explained in the documentation link above.

You need to add an __init__.py to the custom directory - airflow requires it even though when writing in Python3 you don't need it because of implicit namespaces (again this is explained in the same link above).

From your dag you can then import as needed:

from custom.shared_config import default_args
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文