气流 - 有条件的任务执行

发布于 2025-01-21 11:48:33 字数 3531 浏览 3 评论 0原文

我正在Ubuntu 20.04中运行气流2.1.0,并且发现了DAG中的优化可能性,我需要帮助弄清楚如何改进它。先感谢您。

我正在运行一个DAG执行两种任务:

  1. RAW LAYERE =查询数据库和在S3桶中记录文件
  2. Trust> Trust laster layer =从S3 Raw Bucket中读取,并且 我需要在S3受信任的图层存储桶中的记录,

我需要允许每个任务组的可信任务在原始任务结束后立即执行,但是当发生这种情况时,我还需要触发下一个任务组的原始任务。因为由于受信任的图层不会使用数据库,所以我可以通过允许它启动下一个原始任务始终保持数据库忙碌来优化DAG执行时间。 我也无法同时进行查询,因此我一次只能运行一个原始任务。

实际流量:(未优化,原始任务在可能已经启动时会闲置)

  1. 任务1 =原始启动
  2. 任务1 =原始末端
  3. 任务2 =受信任的启动
  4. 任务2 =受信任的端
  5. ...

预期流量:(优化,确保始终启动原始任务)

  1. 任务1 = RAW启动
  2. 任务1 = RAW ENDEN
  3. (任务1 = trusted starting) +(任务2 = RAW启动)
  4. 任务1 =可信端
  5. 任务2 = raw end
  6. (任务2 = trusted start) +(任务3 = RAW启动)
  7. ...

这是下面的DAG代码:

from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta

from airflow_pentaho.operators.KitchenOperator import KitchenOperator
from airflow_pentaho.operators.PanOperator import PanOperator
from airflow_pentaho.operators.CarteJobOperator import CarteJobOperator
from airflow_pentaho.operators.CarteTransOperator import CarteTransOperator
from airflow.utils.task_group import TaskGroup

local_tz = pendulum.timezone('America/Sao_Paulo')

DAG_NAME = "Example_Dag"
DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': pendulum.datetime(2022, 4, 13, tz=local_tz),
    'email': '[email protected]',
    'retries': 3,
    'retry_delay': timedelta(minutes=1)
}

with DAG(dag_id=DAG_NAME,
         default_args=DEFAULT_ARGS,
         dagrun_timeout=timedelta(hours=1),
         catchup=False,
         concurrency=2,
         max_active_runs=1,
         schedule_interval='*/10 * * * *') as dag:

  with TaskGroup(group_id='table1') as table1:

    job1 = CarteJobOperator(
        dag=dag,
        task_id="tsk_raw1",
        job="Example/raw_layer")

    job2 = CarteJobOperator(
        dag=dag,
        task_id="tsk_trusted1",
        job="Example/trusted_layer")

    job1 >> job2

  with TaskGroup(group_id='table2') as table2:

    job3 = CarteJobOperator(
        dag=dag,
        task_id="tsk_raw2",
        job="Example/raw_layer")

    job4 = CarteJobOperator(
        dag=dag,
        task_id="tsk_trusted2",
        job="Example/trusted_layer")

    job3 >> job4

  with TaskGroup(group_id='table3') as table3:

    job5 = CarteJobOperator(
        dag=dag,
        task_id="tsk_raw3",
        job="Example/raw_layer")

    job6 = CarteJobOperator(
        dag=dag,
        task_id="tsk_trusted3",
        job="Example/trusted_layer")

    job5 >> job6

  with TaskGroup(group_id='table4') as table4:

    job7 = CarteJobOperator(
        dag=dag,
        task_id="tsk_raw4",
        job="Example/raw_layer")

    job8 = CarteJobOperator(
        dag=dag,
        task_id="tsk_trusted4",
        job="Example/trusted_layer")

    job7 >> job8

  with TaskGroup(group_id='table5') as table5:
    job9 = CarteJobOperator(
        dag=dag,
        task_id="tsk_raw5",
        job="Example/raw_layer")

    job10 = CarteJobOperator(
        dag=dag,
        task_id="tsk_trusted5",
        job="Example/trusted_layer")

    job9 >> job10



table1 >> table2 >> table3 >> table4 >> table5

I'm running Airflow 2.1.0 in a Ubuntu 20.04 and I've detected an optimization possibility in a dag and I need help figuring out how can I improve it. Thank you in advance.

I'm running a dag to execute two kinds of tasks:

  1. Raw layer = Queries databases and record files in an S3 bucket
  2. Trusted layer = Reads from the S3 raw bucket, and records in the S3 trusted layer bucket

I need to allow the Trusted task of each task group to execute right after the Raw task ends, but, when that happens, I also need to trigger the Raw task of the next Task Group. Because since the Trusted layer won't use the database, I can optimize the dag execution time by allowing it to already start the next Raw task always keeping the database busy.
I also can't run simultaneous queries, so I can only run a single Raw task at a time.

Actual flow: (Not optimized, Raw tasks idle while they could already have started)

  1. Task 1 = Raw starts
  2. Task 1 = Raw ends
  3. Task 2 = Trusted starts
  4. Task 2 = Trusted ends
  5. ...

Expected flow: (Optimized, making sure Raw tasks are up at all times)

  1. Task 1 = Raw starts
  2. Task 1 = Raw ends
  3. (Task 1 = Trusted Starts) + (Task 2 = Raw Starts)
  4. Task 1 = Trusted ends
  5. Task 2 = Raw ends
  6. (Task 2 = Trusted Starts) + (Task 3 = Raw Starts)
  7. ...

This is the dag code below:

from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta

from airflow_pentaho.operators.KitchenOperator import KitchenOperator
from airflow_pentaho.operators.PanOperator import PanOperator
from airflow_pentaho.operators.CarteJobOperator import CarteJobOperator
from airflow_pentaho.operators.CarteTransOperator import CarteTransOperator
from airflow.utils.task_group import TaskGroup

local_tz = pendulum.timezone('America/Sao_Paulo')

DAG_NAME = "Example_Dag"
DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': pendulum.datetime(2022, 4, 13, tz=local_tz),
    'email': '[email protected]',
    'retries': 3,
    'retry_delay': timedelta(minutes=1)
}

with DAG(dag_id=DAG_NAME,
         default_args=DEFAULT_ARGS,
         dagrun_timeout=timedelta(hours=1),
         catchup=False,
         concurrency=2,
         max_active_runs=1,
         schedule_interval='*/10 * * * *') as dag:

  with TaskGroup(group_id='table1') as table1:

    job1 = CarteJobOperator(
        dag=dag,
        task_id="tsk_raw1",
        job="Example/raw_layer")

    job2 = CarteJobOperator(
        dag=dag,
        task_id="tsk_trusted1",
        job="Example/trusted_layer")

    job1 >> job2

  with TaskGroup(group_id='table2') as table2:

    job3 = CarteJobOperator(
        dag=dag,
        task_id="tsk_raw2",
        job="Example/raw_layer")

    job4 = CarteJobOperator(
        dag=dag,
        task_id="tsk_trusted2",
        job="Example/trusted_layer")

    job3 >> job4

  with TaskGroup(group_id='table3') as table3:

    job5 = CarteJobOperator(
        dag=dag,
        task_id="tsk_raw3",
        job="Example/raw_layer")

    job6 = CarteJobOperator(
        dag=dag,
        task_id="tsk_trusted3",
        job="Example/trusted_layer")

    job5 >> job6

  with TaskGroup(group_id='table4') as table4:

    job7 = CarteJobOperator(
        dag=dag,
        task_id="tsk_raw4",
        job="Example/raw_layer")

    job8 = CarteJobOperator(
        dag=dag,
        task_id="tsk_trusted4",
        job="Example/trusted_layer")

    job7 >> job8

  with TaskGroup(group_id='table5') as table5:
    job9 = CarteJobOperator(
        dag=dag,
        task_id="tsk_raw5",
        job="Example/raw_layer")

    job10 = CarteJobOperator(
        dag=dag,
        task_id="tsk_trusted5",
        job="Example/trusted_layer")

    job9 >> job10



table1 >> table2 >> table3 >> table4 >> table5

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

如歌彻婉言 2025-01-28 11:48:33

您的任务组正在妨碍您的最佳流程。我建议单独列出所有任务,然后仅使用>>运算符以显示实际依赖项。如果在结果图上有一组任务作为任务组有意义的任务,那么您才应该将其正式添加为任务组。

Your task groups are getting in the way of your optimal flow. I recommend laying out all your tasks individually and then just using the >> operator to show the actual dependencies. If, on the resulting graph, there is a group of tasks that makes sense as a task group, only then should you add it formally as a task group.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文