Composer DAG 触发 Dataflow 作业并成功运行,但最终文件未出现在输出桶中

发布于 2025-01-09 00:14:54 字数 1865 浏览 1 评论 0原文

我已经设置了一个运行数据流作业的 DAG。 Dag 触发它正常,并且它运行成功,但输出文件没有出现在输出位置。输出位置是另一个项目中的存储桶,并且正在使用的 SA 有权写入该存储桶...知道为什么该文件没有生成吗?

DF 作业:

import apache_beam as beam
from apache_beam.options.value_provider import StaticValueProvider
from apache_beam.options.pipeline_options import PipelineOptions
from datetime import datetime
import logging


class UserOptions(PipelineOptions):
        @classmethod
        def _add_argparse_args(cls, parser):
                parser.add_value_provider_argument('--templated_int', type=int)
                parser.add_value_provider_argument("--input", type=str )
                parser.add_value_provider_argument("--output", type=str )

class process_file(beam.DoFn):
        def __init__(self, templated_int):
                self.templated_int = templated_int

        def process(self, an_int):
                yield self.templated_int.get() + an_int

def clean_file():
        pipeline_options = PipelineOptions()
        user_options = pipeline_options.view_as(UserOptions)
        tstmp = datetime.now().strftime("%Y%m%d%H")
        output = user_options.output 
        logging.info('Input: ', user_options.input)
        logging.info('Output: ', output)
                                
        with beam.Pipeline(options=pipeline_options) as p:

                p | 'Read from a File' >> beam.io.ReadFromText(user_options.input, skip_header_lines=1) | 'Split into rows' >> beam.Map(lambda x:x.split(",")) | 'Confirm index locations' >> beam.Map(lambda x:f'{x[0]},{x[1]}{x[2]}{x[3]}{x[4]},{x[5]}') | 'Write to clean file' >> beam.io.WriteToText(output) 
        p.run().wait_until_finish()

if __name__ == "__main__":
        clean_file()

《DF工作图》</a

I have set up a DAG that runs a Dataflow job. Dag triggers it fine, and it runs successfully yet the output file doesn't appear in the output location. The output location is a bucket in another project and the SA being used has access to write to that bucket... any idea why the file is not generating?

DF Job:

import apache_beam as beam
from apache_beam.options.value_provider import StaticValueProvider
from apache_beam.options.pipeline_options import PipelineOptions
from datetime import datetime
import logging


class UserOptions(PipelineOptions):
        @classmethod
        def _add_argparse_args(cls, parser):
                parser.add_value_provider_argument('--templated_int', type=int)
                parser.add_value_provider_argument("--input", type=str )
                parser.add_value_provider_argument("--output", type=str )

class process_file(beam.DoFn):
        def __init__(self, templated_int):
                self.templated_int = templated_int

        def process(self, an_int):
                yield self.templated_int.get() + an_int

def clean_file():
        pipeline_options = PipelineOptions()
        user_options = pipeline_options.view_as(UserOptions)
        tstmp = datetime.now().strftime("%Y%m%d%H")
        output = user_options.output 
        logging.info('Input: ', user_options.input)
        logging.info('Output: ', output)
                                
        with beam.Pipeline(options=pipeline_options) as p:

                p | 'Read from a File' >> beam.io.ReadFromText(user_options.input, skip_header_lines=1) | 'Split into rows' >> beam.Map(lambda x:x.split(",")) | 'Confirm index locations' >> beam.Map(lambda x:f'{x[0]},{x[1]}{x[2]}{x[3]}{x[4]},{x[5]}') | 'Write to clean file' >> beam.io.WriteToText(output) 
        p.run().wait_until_finish()

if __name__ == "__main__":
        clean_file()

DF Job graph

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

找个人就嫁了吧 2025-01-16 00:14:54

当您在 Dataflow Pipeline 图表中选择一个步骤时,日志面板会切换显示由 Dataflow 服务生成的作业日志,显示来自运行管道步骤的 Compute Engine 实例的日志。

Cloud Logging 将从项目的 Compute Engine 实例收集的所有日志合并到一个位置。此外,请参阅记录管道消息,了解有关使用 dataflow 的各种日志记录的详细信息能力。

When you select a step in your Dataflow Pipeline graph, the logs panel toggles from displaying Job Logs generated by Dataflow service showing logs from the Compute Engine instances running your pipeline step.

Cloud Logging combines all the collected logs from your projects’s Compute Engine instances in one location. Additionally, see Logging pipeline messages for more information on using dataflow’s various logging capabilities.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文