处理 pytest 失败断言的最佳实践

发布于 2025-01-12 19:59:33 字数 1172 浏览 3 评论 0原文

我使用 pyspark、EMR、terraform 和气流进行触发。我正在我的 pyspark 代码中编写 pytest 用例。我有以下问题:

I)如果我得到“False”断言。我正在考虑发送松弛和电子邮件通知并关闭 EMR 集群。这是每个人都遵循的最佳实践吗? 。

有人可以解释如何在断言失败的情况下处理 pytests 吗?或处理 pytest 失败的一般方法

示例代码:

def task_fail_slack_alert(context):
    SLACK_CONN_ID = 'slack'
    slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
    slack_msg = """
            :red_circle: crunchbase Job Failed.
            *Task*: {task}
            *Dag*: {dag}
            *Execution Time*: {exec_date}
            *Log Url*: {log_url}
            *Error*:{exception}
            """.format(
            task=context.get('task_instance').task_id,
            dag=context.get('task_instance').dag_id,
            exec_date=context.get('execution_date'),
            log_url=context.get('task_instance').log_url,
            exception=context.get('exception')

        )
    failed_alert = SlackWebhookOperator(
        task_id='slack',
        http_conn_id='slack',
        webhook_token=slack_webhook_token,
        message=slack_msg,
        username='airflow_sld',
        dag=dag)
    return failed_alert.execute(context=context)

I am using pyspark, EMR, terraform and airflow for triggering. I am writing pytest cases in my pyspark code. I have the following questions:

I)If I get an assertion as "False". I am thinking of sending a slack and email notification and shut down EMR cluster. is this the best practice that everyone follows? .

Can someone explain how to handle pytests in cases of failure assertions? or general methods of handling pytest failures

Sample code:

def task_fail_slack_alert(context):
    SLACK_CONN_ID = 'slack'
    slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
    slack_msg = """
            :red_circle: crunchbase Job Failed.
            *Task*: {task}
            *Dag*: {dag}
            *Execution Time*: {exec_date}
            *Log Url*: {log_url}
            *Error*:{exception}
            """.format(
            task=context.get('task_instance').task_id,
            dag=context.get('task_instance').dag_id,
            exec_date=context.get('execution_date'),
            log_url=context.get('task_instance').log_url,
            exception=context.get('exception')

        )
    failed_alert = SlackWebhookOperator(
        task_id='slack',
        http_conn_id='slack',
        webhook_token=slack_webhook_token,
        message=slack_msg,
        username='airflow_sld',
        dag=dag)
    return failed_alert.execute(context=context)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文