具有 apache beam 和运行时参数的动态路径

发布于 2025-01-10 21:01:20 字数 1076 浏览 0 评论 0原文

我正在创建一个管道模板,它接受一些输入文件并计算其上的单词数。到目前为止一切正常,但问题是我需要传递另一个参数(来自我调用模板的函数),该参数让我传递文件名,以便我可以使用它创建路径。

我将向您展示我想要的示例,尽管我知道管道无法在管道构建期间或运行时上下文之外访问运行时参数,这可以帮助您了解我需要做什么:

class tempatableTest(PipelineOptions):
@classmethod
def _add_argparse_args(cls,parser):
    parser.add_value_provider_argument(
        '--input',
        type=str,
        help='path to the input file'
    )
    parser.add_value_provider_argument(
        '--fdinamic',
        type=str,
        help='folder name'
    )

templatable_test = PipelineOptions().view_as(tempatableTest)
beam_options= PipelineOptions()
input = templatable_test.input
dinamicName = templatable_test.fdinamic.get()

with beam.Pipeline(options=beam_options) as p:
    lines = p | beam.io.ReadFromText(input)
    len = lines | beam.combiners.Count.Globally()
    len | 'countTotalLen' >> beam.io.WriteToText(f'gs://bucket-test-out/processedFile/{dinamicName}/count.txt')

如果我使用 templatable_test.fdinamic .get() 我会收到运行时错误,但如果删除 .get() 我会在文件夹上得到一个超长的名称。

我知道这可能不是正确的方法,但只是为了说明我需要做什么,谢谢您的帮助。

I am creating a pipeline TEMPLATE which takes some input file and counts the words on it. All works fine until this point, but the thing is that I need to pass another parameter (from the function where I call the template) that lets me pass the name of the file so I can create a path with it.

I'll show you an example of what I want though I know pipelines can't access Runtime parameters during pipeline construction or outside a runtime context this can help to give you an Idea of what I need to do:

class tempatableTest(PipelineOptions):
@classmethod
def _add_argparse_args(cls,parser):
    parser.add_value_provider_argument(
        '--input',
        type=str,
        help='path to the input file'
    )
    parser.add_value_provider_argument(
        '--fdinamic',
        type=str,
        help='folder name'
    )

templatable_test = PipelineOptions().view_as(tempatableTest)
beam_options= PipelineOptions()
input = templatable_test.input
dinamicName = templatable_test.fdinamic.get()

with beam.Pipeline(options=beam_options) as p:
    lines = p | beam.io.ReadFromText(input)
    len = lines | beam.combiners.Count.Globally()
    len | 'countTotalLen' >> beam.io.WriteToText(f'gs://bucket-test-out/processedFile/{dinamicName}/count.txt')

If I use templatable_test.fdinamic.get() I'd get the runtime error but if I remove the .get() I'd get a super long name on the folder.

I know probably this isn't the way to go but is just to illustrate what I need to do, thank you for your help.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

韶华倾负 2025-01-17 21:01:20

不幸的是,WriteToText 转换不能用于此目的,因为它当前仅支持固定目标。因此,为了将文件写入动态目标,您需要使用 fileio 模块 支持动态目标。虽然这确实意味着切换到使用实验 WriteToFiles 转换。

Unfortunately the WriteToText transform can't be used for this because it currently only supports a fixed destination. So in order to write files to dynamic destinations, you would instead need to use utilities from the fileio module which supports dynamic destinations. Although this does mean switching to using the experimental WriteToFiles transform.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文