如何让 Airflow 仅在多个任务实例失败后发送电子邮件警报?

发布于 2025-01-12 03:33:51 字数 171 浏览 0 评论 0原文

我们有一些经常运行的 DAG,即使重试也偶尔会不稳定。我们希望仅在该 DAG(或 DAG 中的任务)连续多次失败时收到警报。

即,如果 DAG 每小时运行一次,只有当它每小时连续失败 3 次时,我们才会收到来自 Airflow 的电子邮件警报。

有没有一种方法可以配置 Airflow 来做到这一点?

We have some DAG's that run often, and are occasionally flaky even with retries. We'd like to only get alerts if this DAG (or tasks within the DAG) fail multiple times in a row.

i.e., if the DAG runs every hour, we'd like to get an email alert from Airflow only if it fails each hour 3 times in a row.

Is there a way we can configure Airflow to do this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

左耳近心 2025-01-19 03:33:51

@Zak

有几种方法可以实现这一点。

第一个选项是以编程方式创建一个监视 try_number 的函数。然后,您可以使用 on_failure_callback 甚至 BranchPythonOperator 根据您所需的阈值

from airflow import DagRun,我们可以解决您的问题:

  1. 使用 .find 获取特定 DAG 的实例
  2. 对其进行排序最后一次运行
  3. 缩放到该实例内的特定任务
  4. 在 get_try_instance 中,您可以调用 .try_number 默认设置为 1
  5. 您可以将其与默认参数内的 on_failure_callback 配对on_failure_callback: name_of_function_to_send_email_with_retries
  6. 您可以导入 from airflow.utils.email import send_email 来发送此电子邮件

类似下面的内容就足够了,我自己还没有执行过。

from airflow import DAG
from airflow.models import DagRun
from airflow.utils.email import send_email
from airflow.operators.python import PythonOperator

def check_task_failure_count(dag_id, task_id):
    """ Returns the number of times a task has failed in the last dag run

    Args:
        dag_id (str): The dag_id to check
        task_id (str): The task_id to check
    Returns:
        List - The number of times a task has failed in the last dag run
    """
    last_dag_run = DagRun.find(dag_id=dag_id)
    last_dag_run.sort(key=lambda x: x.execution_date, reverse=True)
    return last_dag_run[0].get_task_instance(task_id).try_number

def failture_email(dag_id, task_id, **context):
    """ Emails when a task id fails 3 times

    Args:
        dag_id (str): The dag_id to check
        task_id (str): The task_id to check
        context (dict): The dag_run context
    """
    # retained this for posterity, however it is not used in this example
    # instead we use the retries value from the DAG context via 'retries' : 3
    last_dag_run = DagRun.find(dag_id=dag_id)
    last_dag_run.sort(key=lambda x: x.execution_date, reverse=True)
    retries = last_dag_run[0].get_task_instance(task_id).try_number

    msg = "DAG/TASK Failed!"
    subject = f"DAG {dag_run} has failed"
    send_email(to=your_emails, subject=subject, html_content=msg)

default_args = {
  'on_failure_callback': failture_email,
  'retries': 3
}

with DAG(
  'stack_overflow_ans_2',
  tags = ['SO'],
  start_date = datetime(2022, 1, 1),
  schedule_interval = None,
  catchup = False,
  is_paused_upon_creation = False,
  default_args=default_args
) as dag:

  t1 = PythonOperator(
    task_id = 'task_that_will_fail',
    python_callable = some_function_that_will_fail,
  )

  t1 

第二个选项是利用 BaseOperator,并在默认参数中传递重试键/值对。当然,在通过路线 1 实现的流程的绝对控制与路线 2 的简易性之间存在权衡。

default_args = {
    'start_date': datetime(2022, 1, 1),
    'email': ['YOUR_EMAIL'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retry_exponential_backoff': True,
    'retry_delay' = timedelta(seconds=300)
    'retries': 3
}

这两种路线都假设您已正确配置 Apache Airflow 发送电子邮件

@Zak

There are a couple of approaches to this.

The first option is to programmatically create a function that monitors the try_number. Then you can use an on_failure_callback or perhaps even a BranchPythonOperator to send an email based on your desired threshold

Within from airflow import DagRun, we can solve your question:

  1. Using .find to get an instance of a specific DAG
  2. Sort it for the last run
  3. Zoom into a specific task within that instance
  4. Within get_try_instance, you can call .try_number which is set to 1 by default
  5. You can pair this with an on_failure_callback inside your default parameters on_failure_callback: name_of_function_to_send_email_with_retries
  6. You can import from airflow.utils.email import send_email to send this email

Something like the below should suffice, I have not executed this myself.

from airflow import DAG
from airflow.models import DagRun
from airflow.utils.email import send_email
from airflow.operators.python import PythonOperator

def check_task_failure_count(dag_id, task_id):
    """ Returns the number of times a task has failed in the last dag run

    Args:
        dag_id (str): The dag_id to check
        task_id (str): The task_id to check
    Returns:
        List - The number of times a task has failed in the last dag run
    """
    last_dag_run = DagRun.find(dag_id=dag_id)
    last_dag_run.sort(key=lambda x: x.execution_date, reverse=True)
    return last_dag_run[0].get_task_instance(task_id).try_number

def failture_email(dag_id, task_id, **context):
    """ Emails when a task id fails 3 times

    Args:
        dag_id (str): The dag_id to check
        task_id (str): The task_id to check
        context (dict): The dag_run context
    """
    # retained this for posterity, however it is not used in this example
    # instead we use the retries value from the DAG context via 'retries' : 3
    last_dag_run = DagRun.find(dag_id=dag_id)
    last_dag_run.sort(key=lambda x: x.execution_date, reverse=True)
    retries = last_dag_run[0].get_task_instance(task_id).try_number

    msg = "DAG/TASK Failed!"
    subject = f"DAG {dag_run} has failed"
    send_email(to=your_emails, subject=subject, html_content=msg)

default_args = {
  'on_failure_callback': failture_email,
  'retries': 3
}

with DAG(
  'stack_overflow_ans_2',
  tags = ['SO'],
  start_date = datetime(2022, 1, 1),
  schedule_interval = None,
  catchup = False,
  is_paused_upon_creation = False,
  default_args=default_args
) as dag:

  t1 = PythonOperator(
    task_id = 'task_that_will_fail',
    python_callable = some_function_that_will_fail,
  )

  t1 

The second option is to harness the BaseOperator, and inside your default parameters pass in the retry key/value pairs. Naturally, there is a tradeoff between absolute control of the process which can occur through route 1 or the ease of route 2.

default_args = {
    'start_date': datetime(2022, 1, 1),
    'email': ['YOUR_EMAIL'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retry_exponential_backoff': True,
    'retry_delay' = timedelta(seconds=300)
    'retries': 3
}

Both routes assume you have correctly configured Apache Airflow to send emails.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文