通过电子邮件将电子邮件列表作为来自气流中其他DAG的参数

发布于 2025-02-07 09:53:02 字数 1628 浏览 2 评论 0原文

我创建了一个自定义的电子邮件警报Python文件,我在另一个DAG中调用该功能以发送电子邮件。 假设 - 电子邮件警报文件是 custom_alert.py

from airflow.operators.email import EmailOperator
from airflow.utils.email import send_email

def custom_failure_email(context, **kwargs):
    """Send custom email alerts."""
    dag_run = context.get('task_instance').dag_id
    subject = f"[ActionReq]-dag failure-{dag_run}"
    # email contents
    body= """Hi Team,<br><br>
            <b style="font-size:15px;color:red;">Airflow job on error, please find details below.</b>
            Thank you!,<br>
            )
    email_list = ['[email protected]', '[email protected]']
    for i in range(len(email_list)):
        send_email(str(email_list[i]),subject,body)

in parent dag:say ** email.py ** - 我在调用上述发送失败电子邮件的功能。

from custom_alert import custom_failure_email
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'on_failure_callback': custom_failure_email}

这样,我就可以发送自定义电子邮件,但是每个DAG的收件人列表相同。 请让我知道,如何自定义它以发送不同的收件人电子邮件地址以进行不同的DAG。 我如何通过父级dag传递收件人电子邮件地址。

I have created a custom email alert python file, I am calling that function in another dag to send email.
Lets say - email alert file is custom_alert.py

from airflow.operators.email import EmailOperator
from airflow.utils.email import send_email

def custom_failure_email(context, **kwargs):
    """Send custom email alerts."""
    dag_run = context.get('task_instance').dag_id
    subject = f"[ActionReq]-dag failure-{dag_run}"
    # email contents
    body= """Hi Team,<br><br>
            <b style="font-size:15px;color:red;">Airflow job on error, please find details below.</b>
            Thank you!,<br>
            )
    email_list = ['[email protected]', '[email protected]']
    for i in range(len(email_list)):
        send_email(str(email_list[i]),subject,body)

In Parent DAG : Lets Say **email.py** - I am calling the above function to send failure email.

from custom_alert import custom_failure_email
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'on_failure_callback': custom_failure_email}

This way I am able to send the custom email, but my recipient list is same for every dag.
Please let me know, How I can customize it to send different recipient email address for different dag. How I can pass recipient email address from Parent Dag.?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

桃扇骨 2025-02-14 09:53:02

一个选项是在您的dag中设置params dict,然后从上下文中检索此值。

def custom_failure_email(context):
    """Send custom email alerts."""
    #...
    email_list = context['dag'].params['mailing_list']
    for i in range(len(email_list)):
        send_email(str(email_list[i]),subject,body)

default_args = {
    #....
    'params': {
        'mailing_list': ['[email protected]', '[email protected]']
    },
    'email_on_failure': False,
    'on_failure_callback': custom_failure_email
}

An option is to set params dict in your DAG then retrieve this value from the context.

def custom_failure_email(context):
    """Send custom email alerts."""
    #...
    email_list = context['dag'].params['mailing_list']
    for i in range(len(email_list)):
        send_email(str(email_list[i]),subject,body)

default_args = {
    #....
    'params': {
        'mailing_list': ['[email protected]', '[email protected]']
    },
    'email_on_failure': False,
    'on_failure_callback': custom_failure_email
}
心在旅行 2025-02-14 09:53:02

还有另一种方法可以使用Functools实现此目标。

from functools import partial
new_custom_failure_email = partial(custom_failure_email,
                                   email_list=['[email protected]'])
                                                                    
# Now Pass this in default _args
'on_failure_callback': new_custom_failure_email

我们需要通过 email_list 作为** custom_alert.py **中的参数

There is one more way to achieve this using functools.

from functools import partial
new_custom_failure_email = partial(custom_failure_email,
                                   email_list=['[email protected]'])
                                                                    
# Now Pass this in default _args
'on_failure_callback': new_custom_failure_email

and we need to pass email_list as argument in **custom_alert.py**

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文