气流2.2.3-任务之间的延迟变化,任务在队列中停留太长[云作曲家]

发布于 2025-01-18 22:48:22 字数 1846 浏览 3 评论 0 原文

我们在 GCP(Kubernetes 集群上的托管 Airflow)中使用 Cloud Composer 来调度我们的 ETL 管道。

我们的 DAG (200-300) 是动态的,这意味着它们全部由单个生成器 DAG 生成。在 Airflow 1.x 中,由于调度程序的限制,它是一种反模式。然而,Airflow 2.x 中的调度程序可以更好地处理这种情况。请参阅第 3 点此处

我们有一个非常强大的环境(请参阅下面的技术细节),但是我们在任务更改之间遇到了很大的延迟,这对调度程序来说是一个坏兆头。此外,大量任务在队列中等待,这对工作人员来说是一个坏兆头。当触发并运行 50-60 个 DAG 时,就会出现这些性能问题。在我看来这个并发并没有那么大。

我们正在使用 Cloud Composer,它具有根据 文档。正如我所提到的,任务在队列中等待很长时间,因此我们预计工作人员的资源不足,因此应该发生扩展事件。然而,事实并非如此,负载没有缩放事件。

Composer 特定详细信息:

  • Composer 版本:composer-2.0.8
  • Airflow 版本:airflow-2.2.3
  • 调度程序资源:4 个 vCPU、15 GB 内存, 10 GB 存储
  • 调度程序数量: 3
  • 工作线程资源: 4 个 vCPU、15 GB 内存、10 GB 存储
  • 调度程序数量工作人员:在 3 到 12 个工作人员之间自动缩放

气流特定详细信息:

  • scheduler/min_file_process_interval: 300
  • scheduler/parsing_processes: 24
  • scheduler/dag_dir_list_interval:< /strong> 300
  • 核心/dagbag_import_timeout: 3000
  • core/min_serialized_dag_update_interval: 30
  • 核心/并行度: 120
  • core/enable_xcom_pickling: false
  • core/dag_run_conf_overrides_params: true
  • core/executor: CeleryExecutor

我们没有明确设置一个值worker_concurrency,因为它是根据此

我们一直在监控我们的环境,但到目前为止我们还没有发现任何东西。我们没有看到任何与工作线程/调度程序/数据库/网络服务器资源(CPU、内存、IO、网络)相关的瓶颈。

瓶颈可能是什么?任何提示和技巧都非常受欢迎。谢谢你!

We are using Cloud Composer in GCP (managed Airflow on a Kubernetes cluster) for scheduling our ETL pipelines.

Our DAGs (200-300) are dynamic, meaning all of them are generated by a single generator DAG. In Airflow 1.x it was an antipattern due to the limitations of scheduler. However, scheduler is better in Airflow 2.x to handle this scenario. See the 3. point here.

We have a pretty powerful environment (see the technical details below), however we are experiencing big latency between task changes which is a bad sign for the scheduler. Additionally, lots of tasks are waiting in the queue which is a bad sign for the workers. These performance problems are present when 50-60 DAGs are getting triggered and run. This concurrency is not that big in my opinion.

We are using Cloud Composer which has autoscaling feature according to the
documentation. As I mentioned, tasks are waiting in the queue for a long time, so we would expect that the resources of workers are not enough so a scaling event should take place. However, that is not the case, no scaling events the load.

Composer specific details:

  • Composer version: composer-2.0.8
  • Airflow version: airflow-2.2.3
  • Scheduler resources: 4 vCPUs, 15 GB memory, 10 GB storage
  • Number of schedulers: 3
  • Worker resources: 4 vCPUs, 15 GB memory, 10 GB storage
  • Number of workers: Auto-scaling between 3 and 12 workers

Airflow specific details:

  • scheduler/min_file_process_interval: 300
  • scheduler/parsing_processes: 24
  • scheduler/dag_dir_list_interval: 300
  • core/dagbag_import_timeout: 3000
  • core/min_serialized_dag_update_interval: 30
  • core/parallelism: 120
  • core/enable_xcom_pickling: false
  • core/dag_run_conf_overrides_params: true
  • core/executor: CeleryExecutor

We do not explicitly set a value for worker_concurrency because it is automatically calculated according to this documentation. Furthermore, we have one pool with 100000 slots, however we have noticed that most of the time number of running slots are 8-10, number of queued slots are 65-85.

We are constantly monitoring our environment, but we were not able to find anything so far. We do not see any bottleneck related to worker/scheduler/database/webserver resources (CPU, memory, IO, network).

What could be the bottleneck? Any tips and tricks are more than welcomed. Thank you!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

铜锣湾横着走 2025-01-25 22:48:22

2周前,我遇到了类似的问题。
问题在于,在气流2.x中,DAG发电机在某些情况下仍然是反图案(尤其是使用SLA)。
根据文档“一个文件只能由一个文件处理者解析”。
因此,气流调度程序只能为所有DAG运行一个子进程,时间表管道看起来像这样:

  1. 解析所有DAG(数百个)
  2. 检查所有必要的任务实例(数百甚至数千个?),以便在数据库中存在并生成缺少的(根据Schedue_interval参数)。
  3. 检查SLA是否所有出色的任务(再次 - 数百甚至数千个?),并在必要时发送通知。检查SLA是该管道中最难,最慢的部分。
    所有这些事情必须通过一个python线程来完成。我认为太多了。

就我而言,问题在于,由于一个失败的任务,生成的DAG之一过去被永久陷入困境,而调度程序过程则花费所有时间检查SLA。
禁用该DAG的SLA解决了问题。

您可以通过暂时禁用SLA来查看这是否是您的情况。

我认为正确的方法是:

  1. 根据某些条件,将DAG分为组(每个组一个文件)。
  2. 使用您自己的SLA检查(在调度程序过程之外)的实现。

I encountered a similar problem 2 weeks ago.
And the problem was that in Airflow 2.x the DAG generator is still anti-pattern in some cases (especially if you use SLA).
According to the documentation "one file can only be parsed by one FileProcessor".
So the Airflow scheduler will only run one child process for all your DAGs, and the schedule pipeline will look like this:

  1. Parse all DAGs (hundreds)
  2. Check all necessary task instances (hundreds or even thousands?) for existence in the database and generate the missing ones (according to the schedule_interval parameter).
  3. Check SLA for all outstanding tasks (again - hundreds or even thousands?) and send notifications if necessary. Checking SLAs is the hardest and slowest part of this pipeline.
    And all these things have to be done by one single python thread. Too much, I think.

In my case, the problem was that one of the generated DAG was permanently stuck in the past because of one failed task, and the scheduler process was spending all its time checking SLAs.
Disabling the SLA for that DAG solved the problem.

You can see if this is your case simply by temporarily disabling the SLA.

The correct way, I think, is the following:

  1. Split DAGs into groups (one file per group) according to some criteria.
  2. Use your own implementation of the SLA check (outside the scheduler process).
著墨染雨君画夕 2025-01-25 22:48:22

您应该将环境大小设置为中或大,以在调度任务时增加数据库吞吐量。

该文档建议a 〜250 dags的环境,此环境大小参数独立于调度程序/Worker/WebServer机器的大小。

You should set your environment size to Medium or Large to increase the database throughput when scheduling tasks.

The documentation recommends a Large environment for ~250 DAGs, and this environment size parameter is independent from Scheduler/Worker/Webserver machine sizing.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文