气流 - 当我需要用DAG处理的工作数量可变时,该怎么办?

发布于 2025-01-28 13:09:20 字数 2836 浏览 1 评论 0原文

我有一个传感器任务,该任务会听到在S3中创建的文件。

戳后,我可能有3个文件,在另一个戳后,我可能还有5个文件。

我想创建一个聆听工作请求的DAG(或多个DAG),并创建其他任务或DAG来处理这项工作数量。

我希望我可以从DAG定义访问XCOM或DAG_RUN变量(请参见伪代码如下):


def wait_for_s3_data(ti, **kwargs):
    s3_wrapper = S3Wrapper()
    work_load = s3_wrapper.work()
    # work_load: {"filename1.json": "s3/key/filename1.json", ....}
    ti.xcom_push(key="work_load", value=work_load)
    return len(work_load) > 0

def get_work(self, dag_run, ti, **_):
    s3_wrapper = S3Wrapper()
    work_load = ti.xcom_pull(key="work_load")
    dag_run.conf['work_load'] = work_load
    s3_wrapper.move_messages_from_waiting_to_processing(work_load)

with DAG(
    "ListenAndCallWorkers",
    description="This DAG waits for work request from s3",
    schedule_interval="@once",
    max_active_runs=1,
) as dag:

    wait_for_s3_data: PythonSensor = PythonSensor(
        task_id="wait_for_s3_data",
        python_callable=wait_for_s3_data,
        timeout=60,
        poke_interval=30,
        retries=2,
        mode="reschedule",
    )

    get_data_task = PythonOperator(
        task_id="GetData",
        python_callable=query.get_work,
        provide_context=True,
    )

    work_load  = "{{ dag_run.conf['work_load'] }}" # <--- I WISH I COULD DO THIS
    
    do_work_tasks  = [
        TriggerDagRunOperator(
            task_id=f"TriggerDoWork_{work}",
            trigger_dag_id="Work",  # Ensure this equals the dag_id of the DAG to trigger
            conf={"work":keypath},
        )
        for work, keypath in work_load.items():
    ]

    wait_for_s3_data >> get_data_task >> do_work_tasks

我知道我不能这样做。

我还试图定义自己的自定义多重磁力dag对象(如此 https://stackover.com.com.com.com/a/a/a/a/a/a/51790697/1497/1494511 )。但是在此步骤中,我仍然无法访问需要完成的工作量。

另一个想法:

我正在考虑使用n Dowork任务构建一个DAG,并且我将工作最多通过XCOM来


def get_work(self, dag_run, ti, **_):
    s3_wrapper = S3Wrapper()
    work_load = ti.xcom_pull(key="work_load")
    i = 1
    for work, keypath in work_load.items()
        dag_run.conf[f'work_{i}'] = keypath
        i += 1
        if i > N:
            break
    s3_wrapper.move_messages_from_waiting_to_processing(work_load[:N])

完成工作,但听起来很效率

相关的问题:

这与我的问题相同,但是没有关于如何解决的代码:

AIRFFILE:适当的方式:为每个文件运行DAG

这个答案看起来可以解决问题,但似乎与低于2.2.2的气流版本有关 我们如何使用TriggerDagrunoperator触发多个气流DAGS?

I have a sensor task that listens to files being created in S3.

After a poke I may have 3 files, after another poke I might have another 5 files.

I want to create a DAG (or multiple dags) that listen to work request, and creates others tasks or DAGs to handle that amount of work.

I wish I could access the xcom or dag_run variable from the DAG definition (see pseudo-code as follows):


def wait_for_s3_data(ti, **kwargs):
    s3_wrapper = S3Wrapper()
    work_load = s3_wrapper.work()
    # work_load: {"filename1.json": "s3/key/filename1.json", ....}
    ti.xcom_push(key="work_load", value=work_load)
    return len(work_load) > 0

def get_work(self, dag_run, ti, **_):
    s3_wrapper = S3Wrapper()
    work_load = ti.xcom_pull(key="work_load")
    dag_run.conf['work_load'] = work_load
    s3_wrapper.move_messages_from_waiting_to_processing(work_load)

with DAG(
    "ListenAndCallWorkers",
    description="This DAG waits for work request from s3",
    schedule_interval="@once",
    max_active_runs=1,
) as dag:

    wait_for_s3_data: PythonSensor = PythonSensor(
        task_id="wait_for_s3_data",
        python_callable=wait_for_s3_data,
        timeout=60,
        poke_interval=30,
        retries=2,
        mode="reschedule",
    )

    get_data_task = PythonOperator(
        task_id="GetData",
        python_callable=query.get_work,
        provide_context=True,
    )

    work_load  = "{{ dag_run.conf['work_load'] }}" # <--- I WISH I COULD DO THIS
    
    do_work_tasks  = [
        TriggerDagRunOperator(
            task_id=f"TriggerDoWork_{work}",
            trigger_dag_id="Work",  # Ensure this equals the dag_id of the DAG to trigger
            conf={"work":keypath},
        )
        for work, keypath in work_load.items():
    ]

    wait_for_s3_data >> get_data_task >> do_work_tasks

I know I cannot do that.

I also tried to defined my own custom MultiTriggerDAG object (as in this https://stackoverflow.com/a/51790697/1494511). But at that step I still don't have access to the amount of work that needs to be done.

Another idea:

I am considering build a DAG with N doWork tasks, and I pass work to up to N via xcom


def get_work(self, dag_run, ti, **_):
    s3_wrapper = S3Wrapper()
    work_load = ti.xcom_pull(key="work_load")
    i = 1
    for work, keypath in work_load.items()
        dag_run.conf[f'work_{i}'] = keypath
        i += 1
        if i > N:
            break
    s3_wrapper.move_messages_from_waiting_to_processing(work_load[:N])

This idea would get the job done, but it sounds very inefficient

Related questions:

This is the same question as I have, but no code is presented on how to solve it:

Airflow: Proper way to run DAG for each file

This answer looks like it would solve the problem, but it seems to be related to Airflow versions lower than 2.2.2
How do we trigger multiple airflow dags using TriggerDagRunOperator?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文