为python列表中存在的气流任务创建动态工作流程

发布于 2025-01-28 09:34:12 字数 2059 浏览 2 评论 0 原文

我有以下方式的列表 -

[['x_api','y_api',....],['z_api','p_api',...],[....],[。 ..] ....]

在这里,每个API名称对应于Pythonoperator。

在气流中,我想创建任务依赖项,以便从起始虚拟任务中,我应该对主列表中的每个列表中的每个列表进行并行任务,并且列表列表中的操作员应按顺序执行:

我该怎么做?感谢您的任何帮助!

现有代码:

    args = {
            'depends_on_past': False,
            'start_date': datetime.now(),
            'email': '',
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 3,
            'retry_delay': timedelta(minutes=1)
        }   
    
    dag = DAG(dag_id, default_args=args, schedule_interval=None)
        
    with dag:
        tasks = []
        tmp, tmp2 = set(), set()
        Start = DummyOperator(
                task_id='Start',
                dag=dag
        )
    
        End = DummyOperator(
                task_id='End',
                dag=dag
        )
    
        for i in dags:
            for j in i:
                if 'APIs' in list(i.keys()):
                        
                    for l in i['APIs']:
                            tab = DummyOperator(
                            task_id=l['api'] + "_API",
                            dag=dag
                            )
                            tmp.add(tab)    
                elif 'tables' in list(i.keys()):
                        
                    for k in i['tables']:
                        tab2 = DummyOperator(
                            task_id=k['table'] + "_API",
                            dag=dag
                        )
                        tmp2.add(tab2)

      tasks.append(list(tmp))
      tasks.append(list(tmp2))

      for task in tasks:
        for op in range(0, len(task)-1):
            Start.set_downstream(task[op])
            task[op].set_downstream(task[op+1])
            task[op+1].set_downstream(End)

I have a list of lists in the following way -

[['X_API', 'Y_API',....], ['Z_API', 'P_API', ...], [....], [...] .... ]

Here, each API name corresponds to a PythonOperator.

In Airflow, I would like to create task dependencies such that from a starting dummy task, I should have parallel tasks for each of the list inside the main list, and the operators inside the list of lists should execute in sequence :

enter image description here

How can I do this ? I would appreciate any help in this !

Existing code:

    args = {
            'depends_on_past': False,
            'start_date': datetime.now(),
            'email': '',
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 3,
            'retry_delay': timedelta(minutes=1)
        }   
    
    dag = DAG(dag_id, default_args=args, schedule_interval=None)
        
    with dag:
        tasks = []
        tmp, tmp2 = set(), set()
        Start = DummyOperator(
                task_id='Start',
                dag=dag
        )
    
        End = DummyOperator(
                task_id='End',
                dag=dag
        )
    
        for i in dags:
            for j in i:
                if 'APIs' in list(i.keys()):
                        
                    for l in i['APIs']:
                            tab = DummyOperator(
                            task_id=l['api'] + "_API",
                            dag=dag
                            )
                            tmp.add(tab)    
                elif 'tables' in list(i.keys()):
                        
                    for k in i['tables']:
                        tab2 = DummyOperator(
                            task_id=k['table'] + "_API",
                            dag=dag
                        )
                        tmp2.add(tab2)

      tasks.append(list(tmp))
      tasks.append(list(tmp2))

      for task in tasks:
        for op in range(0, len(task)-1):
            Start.set_downstream(task[op])
            task[op].set_downstream(task[op+1])
            task[op+1].set_downstream(End)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

浊酒尽余欢 2025-02-04 09:34:12

这是我想到的解决方案 -

    with dag:
Start = ShortCircuitOperator(
    task_id = 'Create_Postgres_Schema',
    python_callable = create_postgres_schema,
    ignore_downstream_trigger_rules = True
)

End = DummyOperator (
    task_id = 'End_of_Data_extraction',
    trigger_rule = TriggerRule.ALL_DONE
)

# Get list of first and last task. This is done to enable the task flow in Airflow
start_task_list = list(map(itemgetter(0), tasks_master_list))
end_tasks_list = list(map(itemgetter(-1), tasks_master_list))

Start >> start_task_list

for task in tasks_master_list:
    for op in range(len(tasks_master_list)-1):
        task[op] >> task[op+1]

end_tasks_list >> End

This was the solution I came up with -

    with dag:
Start = ShortCircuitOperator(
    task_id = 'Create_Postgres_Schema',
    python_callable = create_postgres_schema,
    ignore_downstream_trigger_rules = True
)

End = DummyOperator (
    task_id = 'End_of_Data_extraction',
    trigger_rule = TriggerRule.ALL_DONE
)

# Get list of first and last task. This is done to enable the task flow in Airflow
start_task_list = list(map(itemgetter(0), tasks_master_list))
end_tasks_list = list(map(itemgetter(-1), tasks_master_list))

Start >> start_task_list

for task in tasks_master_list:
    for op in range(len(tasks_master_list)-1):
        task[op] >> task[op+1]

end_tasks_list >> End
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文