我们可以在出口条件下创建气流DAG周期吗?

发布于 2025-02-02 22:42:07 字数 371 浏览 0 评论 0 原文

我们有一个巨大的CSV文件,气流DAG看起来好像

>> read_csv 
>> apply filter
>> store in database

我们必须将数据从CSV读取器操作员传递给过滤器操作员。我们可以使用以下工作流程,而不是阅读完整的CSV并将其提供给过滤器?

>> read_chunk_from_csv
>> apply filter
>> store in database
>> [read_chunk_from_csv, exit]

我们可以从CSV迭代阅读一块,并在周期中处理每个块直到完成。

We have a huge CSV file, and the airflow DAG looks like

>> read_csv 
>> apply filter
>> store in database

we have to pass the data from the CSV reader operator to the filter operator. Instead of reading the complete CSV and giving it to the filter, can we use the following workflow?

>> read_chunk_from_csv
>> apply filter
>> store in database
>> [read_chunk_from_csv, exit]

can we read a chunk from CSV iteratively and process every chunk in the cycle until the completion.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

凝望流年 2025-02-09 22:42:07

不,你不能。 DAG代表直接循环图,这意味着没有循环。

如果您尝试尝试,IT气流将提高

您正在寻找的是MapReduce,例如行为。为此,您可以使用动态任务映射此功能在AirFlow 2.3.0中发布,您可以使用它来创建映射的任务,其中每个任务都可以处理数据的一部分。

No you can't. DAG stands for Direct Acyclic Graph which means no cycles.

Should you try, it Airflow will raise AirflowDagCycleException

What you are looking for is MapReduce like behavior. For that you can use Dynamic Task Mapping this feature was released in Airflow 2.3.0 you can use it to created mapped tasks where each one of them handles portion of the data.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文