如何从两个旋转的Azure存储帐户容器中摄取/复制文件到另一个容器

发布于 2025-02-05 06:28:08 字数 1256 浏览 3 评论 0原文

给定两个容器:

来源:一个Azure Storagev2帐户,其中两个包含a和b的容器包含blob文件,这些文件将在容器中的根目录中存储。

目的地: Azure Data Lake gen2(为了简化目的,请考虑一个带有单个目标容器的存储帐户)。

目标:我正在尝试在当前有效的源容器中复制/摄取当前活动源容器中的所有文件。在该月的剩余时间里,还需要摄入活动源容器中的任何新添加/覆盖的文件。

对于每个月,我们只有一个我们关心的活动容器。因此,一月将使用容器A,2月将使用容器B,March将使用容器A等。使用Azure Data Factory,我已经弄清楚了如何通过在文件路径中使用动态表达式来完成交换容器的逻辑。

@if(equals(mod(int(formatDateTime(utcnow(),'%M')), 2), 0), ‘containerB, ‘ContainerA’)

我到目前为止尝试过的内容:我使用翻滚窗口方法设置了复制管道,触发器每天运行触发器以基于LastModifiedDate检查新/更改的文件,如下所述: https://learn.microsoft。 com/en-us/azure/data-factory/tutorial-incremental-copy-lastModified-copy-data-tool 。但是,我遇到了一个难题,说明要在本月顶部摄入的文件的最后一个修改日期,本质上将在过去的 em>过去中与触发启动日期相比,窗口,因为在换装容器之前,该容器会提前准备,然后在架子上交换之前。因此,由于与触发器的启动窗口相比,最后一个模拟的date是过去的,因此本月第1个现有文件将永远不会复制,只有触发开始日期之后的新/更改文件。如果我通过硬编码较早的开始日期手动触发扳机,则按预期的预期,任何一个月中旬添加到容器中的任何文件都会在本月的剩余时间内摄入。

那么,如何在开始日期之前修改的文件解决该基本案例呢?如果可以解决这个问题,那么一切都可以在一个管道和一个触发器中发生。否则,我将不得不找出另一种方法。

总的来说,我对这里采取的最佳方法是什么。这些文件的数量将〜2GB和约20,000。

Given two containers:

Source: An Azure StorageV2 Account with two containers named A and B containing blob files that will be stored flat in the root directory in the container.

Destination: A Azure Data Lake Gen2 (for simplification purposes, consider it another Storage Account with a single destination container).

Objective: I am trying to copy/ingest all files within the currently active source container at the top of the month. For the remainder of that month, any files newly added/overwritten files inside the active source container need to be ingested as well.

For each month, there will only be one active container that we care about. So January would use Container A, Feb would use Container B, March would use Container A, etc. Using Azure Data Factory, I’ve already figured out how to accomplish this logic of swapping containers by using a dynamic expression in the file path.

@if(equals(mod(int(formatDateTime(utcnow(),'%M')), 2), 0), ‘containerB, ‘ContainerA’)

What I’ve tried so far: I set up a Copy pipeline using a Tumbling Window approach where a trigger runs daily to check for new/changed files based on the LastModifiedDate as described here: https://learn.microsoft.com/en-us/azure/data-factory/tutorial-incremental-copy-lastmodified-copy-data-tool. However, I ran into a conundrum regarding the fact that the Last Modified date of the files to be ingested at the top of the month will by nature have a LastModifiedDate in the past compared to when the trigger's start date window, as this container is prepared ahead of time in the days leading up the turn of the mount right before the containers are swapped. So because the LastModifiedDate is in the past compared to the start window of the trigger, then those existing files on the 1st of the month will never get copied, only new/changed files after the trigger start date. If I manually fire the trigger by hardcoding an earlier start date, then any files added to the container mid-month end up getting ingested for the remainder of the month as expected.

So how do I solve that base case for files modified before the start date? If this can be solved, then everything can happen in one pipeline and one trigger. Otherwise, I will have to figure out another approach.

And in general, I am open to ideas as to what is the best approach to take here. The files will be ~2gb and around 20,000 in quantity.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

梦醒灬来后我 2025-02-12 06:28:08

您可以通过在每天结束时设置触发器来完成此操作,并尝试使用以下那天的最后修改日期复制所有新/更新的文件。

假设当第一个容器处于活动状态时,当没有文件上传到第二个容器。

请按以下步骤操作:

  • 转到数据工厂并将复制活动拖动您的管道中。

  • 通过创建链接服务来创建源数据集。通过单击在源数据集中添加动态内容来提供您的容器条件。

    @if(equals(mod(int(formatdateTime(utcnow(),'%​​m')),2),0),'containerb,'containera')

  • 然后在文件路径类型中选择 Wildcard文件路径,然后在文件路径中提供 *复制多个文件。

  • 在这里,我在过去24小时内复制新/更新的文件。转到通过最后修改过滤,然后给出@adddays(utcnow(), - 1)在开始时间和@utcnow()中时间。

  • 当我们在每天结束时使用触发器进行安排时,它将寻找过去24小时的新/更新文件以开始时间。

  • 将您的另一个存储帐户的容器作为接收器数据集

  • 。 >
    如下。

您可以根据管道执行时间在一天结束时将上面的开始日期作为您的愿望。

请确保您在执行前发布管道并触发

如果您的第二个容器在第一个容器处于活动状态时也具有新/修改的文件,则可以在上次修改日期的开始时间尝试这样的尝试。

@if(equals(int(formatDateTime(utcNow(),'%D')),1), adddays(utcNow(),-31), adddays(utcNow(),-1))

You can do it by setting your trigger at the end of each day and try to copy all the new/updated files using last modified date on that day like below.

Assuming that there is no file uploading to second container when first container is active.

Please follow the below steps:

  • Go to Data factory and drag the copy activity in your pipeline.

  • Create the source dataset by creating the linked service. Give your container condition by clicking on Add dynamic content in source dataset.

    @if(equals(mod(int(formatDateTime(utcnow(),'%M')), 2), 0), ‘containerb, ‘containera’)

enter image description here

  • Then select the Wildcard file path in the File path type and give * in file path like below to copy multiple files.
    enter image description here

  • Here I am copying new/updated files in the last 24 hours. Go to Filter by last modified and give @adddays(utcNow(),-1) in start time and @utcNow() in the end time.

enter image description here

  • As we are scheduling this with trigger at the end of each day, it will look for new/updated files from the last 24 hours to start time.

  • Give your container of another storage account as sink dataset.

  • Now, click on the Add trigger and create a Tumbling Window trigger
    like below.

enter image description here

You can give the start Date above as your wish at the end of the day based on your pipeline execution time.

Please make sure you publish the pipeline and trigger before execution.

If your second container also having new/modified files when the first container is active, then you may give a try like this in the start time of last modified date.

@if(equals(int(formatDateTime(utcNow(),'%D')),1), adddays(utcNow(),-31), adddays(utcNow(),-1))
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文