我们在 GCP(Kubernetes 集群上的托管 Airflow)中使用 Cloud Composer 来调度我们的 ETL 管道。
我们的 DAG (200-300) 是动态的,这意味着它们全部由单个生成器 DAG 生成。在 Airflow 1.x 中,由于调度程序的限制,它是一种反模式。然而,Airflow 2.x 中的调度程序可以更好地处理这种情况。请参阅第 3 点此处。
我们有一个非常强大的环境(请参阅下面的技术细节),但是我们在任务更改之间遇到了很大的延迟,这对调度程序来说是一个坏兆头。此外,大量任务在队列中等待,这对工作人员来说是一个坏兆头。当触发并运行 50-60 个 DAG 时,就会出现这些性能问题。在我看来这个并发并没有那么大。
我们正在使用 Cloud Composer,它具有根据
文档。正如我所提到的,任务在队列中等待很长时间,因此我们预计工作人员的资源不足,因此应该发生扩展事件。然而,事实并非如此,负载没有缩放事件。
Composer 特定详细信息:
- Composer 版本:composer-2.0.8
- Airflow 版本:airflow-2.2.3
- 调度程序资源:4 个 vCPU、15 GB 内存, 10 GB 存储
- 调度程序数量: 3
- 工作线程资源: 4 个 vCPU、15 GB 内存、10 GB 存储
- 调度程序数量工作人员:在 3 到 12 个工作人员之间自动缩放
气流特定详细信息:
- scheduler/min_file_process_interval: 300
- scheduler/parsing_processes: 24
- scheduler/dag_dir_list_interval:< /strong> 300
- 核心/dagbag_import_timeout: 3000
- core/min_serialized_dag_update_interval: 30
- 核心/并行度: 120
- core/enable_xcom_pickling: false
- core/dag_run_conf_overrides_params: true
- core/executor: CeleryExecutor
我们没有明确设置一个值worker_concurrency,因为它是根据此
我们一直在监控我们的环境,但到目前为止我们还没有发现任何东西。我们没有看到任何与工作线程/调度程序/数据库/网络服务器资源(CPU、内存、IO、网络)相关的瓶颈。
瓶颈可能是什么?任何提示和技巧都非常受欢迎。谢谢你!
We are using Cloud Composer in GCP (managed Airflow on a Kubernetes cluster) for scheduling our ETL pipelines.
Our DAGs (200-300) are dynamic, meaning all of them are generated by a single generator DAG. In Airflow 1.x it was an antipattern due to the limitations of scheduler. However, scheduler is better in Airflow 2.x to handle this scenario. See the 3. point here.
We have a pretty powerful environment (see the technical details below), however we are experiencing big latency between task changes which is a bad sign for the scheduler. Additionally, lots of tasks are waiting in the queue which is a bad sign for the workers. These performance problems are present when 50-60 DAGs are getting triggered and run. This concurrency is not that big in my opinion.
We are using Cloud Composer which has autoscaling feature according to the
documentation. As I mentioned, tasks are waiting in the queue for a long time, so we would expect that the resources of workers are not enough so a scaling event should take place. However, that is not the case, no scaling events the load.
Composer specific details:
- Composer version: composer-2.0.8
- Airflow version: airflow-2.2.3
- Scheduler resources: 4 vCPUs, 15 GB memory, 10 GB storage
- Number of schedulers: 3
- Worker resources: 4 vCPUs, 15 GB memory, 10 GB storage
- Number of workers: Auto-scaling between 3 and 12 workers
Airflow specific details:
- scheduler/min_file_process_interval: 300
- scheduler/parsing_processes: 24
- scheduler/dag_dir_list_interval: 300
- core/dagbag_import_timeout: 3000
- core/min_serialized_dag_update_interval: 30
- core/parallelism: 120
- core/enable_xcom_pickling: false
- core/dag_run_conf_overrides_params: true
- core/executor: CeleryExecutor
We do not explicitly set a value for worker_concurrency because it is automatically calculated according to this documentation. Furthermore, we have one pool with 100000 slots, however we have noticed that most of the time number of running slots are 8-10, number of queued slots are 65-85.
We are constantly monitoring our environment, but we were not able to find anything so far. We do not see any bottleneck related to worker/scheduler/database/webserver resources (CPU, memory, IO, network).
What could be the bottleneck? Any tips and tricks are more than welcomed. Thank you!
发布评论
评论(2)
2周前,我遇到了类似的问题。
问题在于,在气流2.x中,DAG发电机在某些情况下仍然是反图案(尤其是使用SLA)。
根据文档“一个文件只能由一个文件处理者解析”。
因此,气流调度程序只能为所有DAG运行一个子进程,时间表管道看起来像这样:
所有这些事情必须通过一个python线程来完成。我认为太多了。
就我而言,问题在于,由于一个失败的任务,生成的DAG之一过去被永久陷入困境,而调度程序过程则花费所有时间检查SLA。
禁用该DAG的SLA解决了问题。
您可以通过暂时禁用SLA来查看这是否是您的情况。
我认为正确的方法是:
I encountered a similar problem 2 weeks ago.
And the problem was that in Airflow 2.x the DAG generator is still anti-pattern in some cases (especially if you use SLA).
According to the documentation "one file can only be parsed by one FileProcessor".
So the Airflow scheduler will only run one child process for all your DAGs, and the schedule pipeline will look like this:
And all these things have to be done by one single python thread. Too much, I think.
In my case, the problem was that one of the generated DAG was permanently stuck in the past because of one failed task, and the scheduler process was spending all its time checking SLAs.
Disabling the SLA for that DAG solved the problem.
You can see if this is your case simply by temporarily disabling the SLA.
The correct way, I think, is the following:
您应该将环境大小设置为中或大,以在调度任务时增加数据库吞吐量。
该文档建议a 大 〜250 dags的环境,此环境大小参数独立于调度程序/Worker/WebServer机器的大小。
You should set your environment size to Medium or Large to increase the database throughput when scheduling tasks.
The documentation recommends a Large environment for ~250 DAGs, and this environment size parameter is independent from Scheduler/Worker/Webserver machine sizing.