有没有办法从气流中的XCOM_PULL中解析信息?

发布于 2025-01-26 10:42:17 字数 1966 浏览 4 评论 0原文

因此,我正在与之合作的是,我有一个DAG,它具有通过任务传递的特定信息,一切都可以正常工作。该文件需要存储到Reports/文件夹中,以使以下任务正确工作。我正在通过XCOM_PULL调用报告的实际名称,但我也想从此XCOM_PULL中解析信息在其他任务中。稍后,我有一个任务将此文件名插入CSV文件中,但是我需要它以匹配文件名本身,以便它是1:1匹配。

我想解析xcom_pull选项的信息,并且我遇到的问题。我的示例如下:

    report_filename = "reports/{}_{}".format('report_example', str(uuid.uuid1()))
    get_report = GoogleCampaignManagerDownloadReportOperator(
        task_id="get_report",
        profile_id=1234,
        api_version=1234,
        bucket_name=test_bucket,
        report_name=report_filename,
        report_id=report_id,
        file_id=file_id,
    )
    
    report_filename_test = xcom_pull(get_report, 'report_name')
    sanitize_report = SanitizeReportOperator(
        task_id='sanitize_report',
        dest_bucket=test_bucket,
        dest_object=report_filename_test,
        shared_object=str(report_filename_test).replace('reports/', ''),
        append_timestamp=True,
        append_filename=True
    )

截至目前XCOM_PULL删除以下内容:

reports/report_example_b3413b62-cc8a-11ec-bded-52e9ae62e477.csv.gz

但是,我想拥有另一个XCOM_PULL,它只会拉下以下内容:

report_example_b3413b62-cc8a-11ec-bded-52e9ae62e477.csv.gz

我已经尝试了将report_filename_test转换为字符串并使用替换函数,例如:

new_test = str(report_filename_test).replace('reports/', '')

但是,尝试此功能时,它使new_test转换为null格式或完全忽略它,然后以后将文件保存到Reports/文件夹中。

我还曾尝试将report_filename传递到列表中,并抓住第一个迭代并抓住第一次迭代,但是随着气流如何从任务到任务,它每次都会创建一个新的文件名,每次都有不同的UUID,这不是我的目标。我还尝试过执行Pythonoperator选项来创建一个专门为文件命名的函数,并在整个DAG中稍后打电话,但对此也没有任何运气。

有没有办法这样做,您可以从XCOM_PULL或其他方法来解析信息?最终目标基本上是拥有一个具有特定uuid的文件名,我可以将其通过该文件将其传递到CSV文件中,并将文件重命名为前面没有文件夹名称的同一特定的UUID。

我只是希望通过多个任务通过uuid格式通过多个任务。我对如何完成这项工作的想法已经用尽,并且已经陷入了将近两个星期。

对此的任何帮助将不胜感激!

So what I'm working with is I have a DAG that has specific information that is being passed through tasks, everything is working as it should. The file needs to be stored into a reports/ folder for the following tasks to work correctly. I'm calling the actual name of the report through a xcom_pull but I also want to parse out information from this xcom_pull in order to capture the unique filename itself to use later on in other tasks. I have a task later on that inserts this filename into the csv file, but I need it to match the filename itself so its a 1:1 match.

I want to parse out information of a xcom_pull option and I'm having issues doing so. The example I have is below:

    report_filename = "reports/{}_{}".format('report_example', str(uuid.uuid1()))
    get_report = GoogleCampaignManagerDownloadReportOperator(
        task_id="get_report",
        profile_id=1234,
        api_version=1234,
        bucket_name=test_bucket,
        report_name=report_filename,
        report_id=report_id,
        file_id=file_id,
    )
    
    report_filename_test = xcom_pull(get_report, 'report_name')
    sanitize_report = SanitizeReportOperator(
        task_id='sanitize_report',
        dest_bucket=test_bucket,
        dest_object=report_filename_test,
        shared_object=str(report_filename_test).replace('reports/', ''),
        append_timestamp=True,
        append_filename=True
    )

As of right now the xcom_pull pulls down the following:

reports/report_example_b3413b62-cc8a-11ec-bded-52e9ae62e477.csv.gz

However, I want to have another xcom_pull that will only pull the following:

report_example_b3413b62-cc8a-11ec-bded-52e9ae62e477.csv.gz

I have tried converting report_filename_test to a string and using the replace function, so for example:

new_test = str(report_filename_test).replace('reports/', '')

But when attempting this, it makes the new_test converting into a NULL format or ignores it completely and saves the file later on into a reports/ folder.

I have also tried passing the report_filename into a list and grabbing the first iteration and grabbing the first iteration, but with how Airflow works from task to task, it creates a new filename with a different uuid each time, which is not what I'm aiming to have done. I have also tried doing a PythonOperator option to create a function specifically to name the file and be called later on throughout the DAG but have not had any luck with this either.

Is there a way to do this where you can parse out the information from a xcom_pull or another way to make this work? The end goal is to essentially have a file name with a specific uuid that I can pass through into the csv file and rename the file to the same specific uuid that is being built without the folder name in front.

I'm just looking to have a unique filename be passed through multiple tasks that is the exact same each time with a uuid format. I'm running out of ideas of how to make this work and have been stuck on this for almost two weeks now.

Any help with this would be greatly appreciated!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文