我们在 GCP(Kubernetes 集群上的托管 Airflow)中使用 Cloud Composer 来调度我们的 ETL 管道。
我们的 DAG (200-300) 是动态的,这意味着它们全部由单个生成器 DAG 生成。在 Airflow 1.x 中,由于调度程序的限制,它是一种反模式。然而,Airflow 2.x 中的调度程序可以更好地处理这种情况。请参阅第 3 点此处。
我们有一个非常强大的环境(请参阅下面的技术细节),但是我们在任务更改之间遇到了很大的延迟,这对调度程序来说是一个坏兆头。此外,大量任务在队列中等待,这对工作人员来说是一个坏兆头。当触发并运行 50-60 个 DAG 时,就会出现这些性能问题。在我看来这个并发并没有那么大。
我们正在使用 Cloud Composer,它具有根据
Composer 特定详细信息:
- Composer 版本:composer-2.0.8
- Airflow 版本:airflow-2.2.3
- 调度程序资源:4 个 vCPU、15 GB 内存, 10 GB 存储
- 调度程序数量: 3
- 工作线程资源: 4 个 vCPU、15 GB 内存、10 GB 存储
- 调度程序数量工作人员:在 3 到 12 个工作人员之间自动缩放
- scheduler/min_file_process_interval: 300
- scheduler/parsing_processes: 24
- scheduler/dag_dir_list_interval:< /strong> 300
- 核心/dagbag_import_timeout: 3000
- core/min_serialized_dag_update_interval: 30
- 核心/并行度: 120
- core/enable_xcom_pickling: false
- core/dag_run_conf_overrides_params: true
- core/executor: CeleryExecutor
We are using Cloud Composer in GCP (managed Airflow on a Kubernetes cluster) for scheduling our ETL pipelines.
Our DAGs (200-300) are dynamic, meaning all of them are generated by a single generator DAG. In Airflow 1.x it was an antipattern due to the limitations of scheduler. However, scheduler is better in Airflow 2.x to handle this scenario. See the 3. point here.
We have a pretty powerful environment (see the technical details below), however we are experiencing big latency between task changes which is a bad sign for the scheduler. Additionally, lots of tasks are waiting in the queue which is a bad sign for the workers. These performance problems are present when 50-60 DAGs are getting triggered and run. This concurrency is not that big in my opinion.
We are using Cloud Composer which has autoscaling feature according to the
documentation. As I mentioned, tasks are waiting in the queue for a long time, so we would expect that the resources of workers are not enough so a scaling event should take place. However, that is not the case, no scaling events the load.
Composer specific details:
- Composer version: composer-2.0.8
- Airflow version: airflow-2.2.3
- Scheduler resources: 4 vCPUs, 15 GB memory, 10 GB storage
- Number of schedulers: 3
- Worker resources: 4 vCPUs, 15 GB memory, 10 GB storage
- Number of workers: Auto-scaling between 3 and 12 workers
Airflow specific details:
- scheduler/min_file_process_interval: 300
- scheduler/parsing_processes: 24
- scheduler/dag_dir_list_interval: 300
- core/dagbag_import_timeout: 3000
- core/min_serialized_dag_update_interval: 30
- core/parallelism: 120
- core/enable_xcom_pickling: false
- core/dag_run_conf_overrides_params: true
- core/executor: CeleryExecutor
We do not explicitly set a value for worker_concurrency because it is automatically calculated according to this documentation. Furthermore, we have one pool with 100000 slots, however we have noticed that most of the time number of running slots are 8-10, number of queued slots are 65-85.
We are constantly monitoring our environment, but we were not able to find anything so far. We do not see any bottleneck related to worker/scheduler/database/webserver resources (CPU, memory, IO, network).
What could be the bottleneck? Any tips and tricks are more than welcomed. Thank you!
I encountered a similar problem 2 weeks ago.
And the problem was that in Airflow 2.x the DAG generator is still anti-pattern in some cases (especially if you use SLA).
According to the documentation "one file can only be parsed by one FileProcessor".
So the Airflow scheduler will only run one child process for all your DAGs, and the schedule pipeline will look like this:
And all these things have to be done by one single python thread. Too much, I think.
In my case, the problem was that one of the generated DAG was permanently stuck in the past because of one failed task, and the scheduler process was spending all its time checking SLAs.
Disabling the SLA for that DAG solved the problem.
You can see if this is your case simply by temporarily disabling the SLA.
The correct way, I think, is the following:
该文档建议a 大 〜250 dags的环境,此环境大小参数独立于调度程序/Worker/WebServer机器的大小。
You should set your environment size to Medium or Large to increase the database throughput when scheduling tasks.
The documentation recommends a Large environment for ~250 DAGs, and this environment size parameter is independent from Scheduler/Worker/Webserver machine sizing.