如何获取所有任务的列表及其在气流中运行的当前DAG的状态

发布于 2025-02-09 16:25:13 字数 95 浏览 1 评论 0原文

需要帮助,以提取所有任务的列表以及当前DAG运行的当前状态[成功/失败]。 我有一个Python操作员的任务,该操作员在工作流程结束时执行。此任务的责任是返回以状态执行的任务。

Need help to extract the list of all tasks along with their current status [Success/Failed] for the current dag run.
I have a task with a python operator which executes at the end of the workflow. The responsibility of this task is to return the no of tasks executed with the status.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

紫南 2025-02-16 16:25:13

您可以创建一个pythOnoperator,该操作员从DAG_RUN读取所有任务。

task_id =“ tasks”的XCOM是:

”在此处输入图像描述”

with DAG(
    dag_id="get_tasks",
    description="get tasks",
    schedule_interval=None,
    start_date=datetime.datetime(2021, 1, 1),
    tags=["tasks"],
) as dag:
    start_dag = EmptyOperator(task_id="start")
    end_dag = EmptyOperator(task_id="end")


    def get_tasks(**context):
        dagrun: DAG = context["dag_run"]
        tasks = {}
        for ti in dagrun.get_task_instances():
            tasks[ti.task_id] = ti.state
        return tasks

    tasks = PythonOperator(
        task_id="tasks",
        python_callable=get_tasks,
    )

    start_dag >> tasks >> end_dag

You can create a PythonOperator that read all tasks from the dag_run.

The xcom of task_id="tasks" is :

enter image description here

with DAG(
    dag_id="get_tasks",
    description="get tasks",
    schedule_interval=None,
    start_date=datetime.datetime(2021, 1, 1),
    tags=["tasks"],
) as dag:
    start_dag = EmptyOperator(task_id="start")
    end_dag = EmptyOperator(task_id="end")


    def get_tasks(**context):
        dagrun: DAG = context["dag_run"]
        tasks = {}
        for ti in dagrun.get_task_instances():
            tasks[ti.task_id] = ti.state
        return tasks

    tasks = PythonOperator(
        task_id="tasks",
        python_callable=get_tasks,
    )

    start_dag >> tasks >> end_dag
温柔戏命师 2025-02-16 16:25:13

您可以使用气流API提取有关工作流的信息。您可以读取REST API文档在这里。例如,要列出特定dag的所有DAG运行:

http://<AIRFLOW_IP>:8080/api/v1/dags/{dag_id}/dagRuns

之后,您可以列出此特定DAG运行的所有任务:

https://<AIRFLOW_IP>:8080/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances

in airflow UI&gt;&gt;文档&gt;&gt; REST API参考(Swagger UI)您可以使用Swagger文档访问和测试API。

You can use Airflow API to extract information about your workflows. You can read the Rest API documentation here. For instance, to list all DAG runs for a specific DAG:

http://<AIRFLOW_IP>:8080/api/v1/dags/{dag_id}/dagRuns

After, you can list all tasks for this specific DAG Run:

https://<AIRFLOW_IP>:8080/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances

In Airflow UI >> Docs >> REST API Reference (Swagger UI) you can access and test the API using Swagger documentation.

enter image description here

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文