使用 Azure 数据工厂进行多步增量加载和处理

发布于 2025-01-11 16:59:10 字数 646 浏览 0 评论 0原文

我想实现增量加载/处理,并在处理它们后使用Azure数据工厂将它们存储在不同的地方,例如:

外部数据源(数据是结构化的)-> ADLS(原始)-> ADLS(已处理)-> SQL DB

因此,我需要根据当前日期从源中提取原始数据样本,将它们存储在 ADLS 容器中,然后处理相同的数据采样数据,将它们存储在另一个 ADLS 容器中,最后将处理结果附加到 SQL DB 中。

ADLS 原始

2022-03-01.txt

2022-03-02.txt

ADLS 处理

2022-03-01-processed.txt

2022-03-02-processed.txt

SQL DB

ADLS 中处理的所有 txt 文件容器将被附加并存储在SQL DB中。

因此想检查在必须批量运行的单个管道中实现此目的的最佳方法是什么?

I wanted to achieve an incremental load/processing and store them in different places using Azure Data Factory after processing them, e.g:

External data source (data is structured) -> ADLS (Raw) -> ADLS (Processed) -> SQL DB

Hence, I will need to extract a sample of the raw data from the source, based on the current date, store them in an ADLS container, then process the same sample data, store them in another ADLS container, and finally append the processed result in a SQL DB.

ADLS raw:

2022-03-01.txt

2022-03-02.txt

ADLS processed:

2022-03-01-processed.txt

2022-03-02-processed.txt

SQL DB:

All the txt files in the ADLS processed container will be appended and stored inside SQL DB.

Hence would like to check what will be the best way to achieve this in a single pipeline that has to be run in batches?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

血之狂魔 2025-01-18 16:59:11

您可以使用动态管道来实现此目的,如下所示:

  1. 在 SQL DB 中创建一个配置/元数据表,您可以在其中放置源表名称、源名称等详细信息。

  2. 按如下方式创建管道:

    a) 添加一个查找活动,您可以在其中根据您的配置表创建查询
    https://learn.microsoft.com/ en-us/azure/data-factory/control-flow-lookup-activity

    b) 添加 ForEach 活动并使用 Lookup 输出作为 ForEach 的输入
    https://learn.microsoft。 com/en-us/azure/data-factory/control-flow-for-each-activity

    c) 在 ForEach 内部,您可以添加一个 switch 活动,其中每个 Switch 案例区分表或源

    d) 在每种情况下添加您需要在 RAW 层中创建文件的副本或其他活动

    e) 在已处理层的管道中添加另一个 ForEach,其中您可以添加与 RAW 层类似类型的内部活动,并且在此活动中您可以添加处理逻辑

通过这种方式,您可以创建一个单个管道,也是一个动态管道,可以对所有源执行必要的操作

You can achieve this using a dynamic pipeline as follows:

  1. Create a Config / Metadata table in SQL DB wherein you would place the details like source table name, source name etc.

  2. Create a pipeline as follows:

    a) Add a lookup activity wherein you would create a query based on your Config table
    https://learn.microsoft.com/en-us/azure/data-factory/control-flow-lookup-activity

    b) Add a ForEach activity and use Lookup output as an input to ForEach
    https://learn.microsoft.com/en-us/azure/data-factory/control-flow-for-each-activity

    c) Inside ForEach you can add a switch activity where each Switch case distinguishes table or source

    d) In each case add a COPY or other activities which you need to create file in RAW layer

    e) Add another ForEach in your pipeline for Processed layer wherein you can add similar type of inner activities as you did for RAW layer and in this activity you can add processing logic

This way you can create a single pipeline and that too a dynamic one which can perform necessary operations for all sources

海夕 2025-01-18 16:59:11

您无法一次重命名多个文件,因此必须一个接一个地复制文件。

  • 创建带有翻滚窗口触发器的管道 - 在触发器和管道中创建两个参数,分别命名为 WindowStartTime 和 WindowEndTime
  • 创建 GetMetaData 活动,使用上次修改的参数datetime 并传递 WindowStartTime 和 WindowEndTime 以获取放置在 WindowStartTime 和 WindowEndTime 之间的文件列表
  • 创建一个 ForEach 活动,传递从Getmetadata
  • 在活动内创建复制活动,并从 ForEach 循环传递文件名
  • 在接收器数据集中传递文件名并连接“_processed/txt”
  • 创建复制< /strong> 对于以源为处理层的每个活动再次传递 WindowStartTime 和 WindowEndTime 之后的活动
  • 复制活动将读取当天收到的最新文件并将其附加到 SQL DB

You can't rename multiple files at once so you have to copy files one after the other.

  • Create a pipeline with tumbling window trigger - create two parameters in the trigger and pipeline named WindowStartTime and WindowEndTime
  • Create a GetMetaData activity use the parameter last modified datetime and pass WindowStartTime and WindowEndTime to get list of files that were placed between WindowStartTime and WindowEndTime
  • Create a ForEach activity pass the data received from Getmetadata
  • Create copy activity inside for activity and pass the file name from ForEach loop
  • In the sink dataset pass file name and concatenate "_processed/txt"
  • Create a Copy activity after the for each activity with source as processed layer again pass WindowStartTime and WindowEndTime
  • This Copy activity will read the latest files received on the current day and append it to SQL DB
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文