Composer DAG 触发 Dataflow 作业并成功运行,但最终文件未出现在输出桶中
我已经设置了一个运行数据流作业的 DAG。 Dag 触发它正常,并且它运行成功,但输出文件没有出现在输出位置。输出位置是另一个项目中的存储桶,并且正在使用的 SA 有权写入该存储桶...知道为什么该文件没有生成吗?
DF 作业:
import apache_beam as beam
from apache_beam.options.value_provider import StaticValueProvider
from apache_beam.options.pipeline_options import PipelineOptions
from datetime import datetime
import logging
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--templated_int', type=int)
parser.add_value_provider_argument("--input", type=str )
parser.add_value_provider_argument("--output", type=str )
class process_file(beam.DoFn):
def __init__(self, templated_int):
self.templated_int = templated_int
def process(self, an_int):
yield self.templated_int.get() + an_int
def clean_file():
pipeline_options = PipelineOptions()
user_options = pipeline_options.view_as(UserOptions)
tstmp = datetime.now().strftime("%Y%m%d%H")
output = user_options.output
logging.info('Input: ', user_options.input)
logging.info('Output: ', output)
with beam.Pipeline(options=pipeline_options) as p:
p | 'Read from a File' >> beam.io.ReadFromText(user_options.input, skip_header_lines=1) | 'Split into rows' >> beam.Map(lambda x:x.split(",")) | 'Confirm index locations' >> beam.Map(lambda x:f'{x[0]},{x[1]}{x[2]}{x[3]}{x[4]},{x[5]}') | 'Write to clean file' >> beam.io.WriteToText(output)
p.run().wait_until_finish()
if __name__ == "__main__":
clean_file()
I have set up a DAG that runs a Dataflow job. Dag triggers it fine, and it runs successfully yet the output file doesn't appear in the output location. The output location is a bucket in another project and the SA being used has access to write to that bucket... any idea why the file is not generating?
DF Job:
import apache_beam as beam
from apache_beam.options.value_provider import StaticValueProvider
from apache_beam.options.pipeline_options import PipelineOptions
from datetime import datetime
import logging
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--templated_int', type=int)
parser.add_value_provider_argument("--input", type=str )
parser.add_value_provider_argument("--output", type=str )
class process_file(beam.DoFn):
def __init__(self, templated_int):
self.templated_int = templated_int
def process(self, an_int):
yield self.templated_int.get() + an_int
def clean_file():
pipeline_options = PipelineOptions()
user_options = pipeline_options.view_as(UserOptions)
tstmp = datetime.now().strftime("%Y%m%d%H")
output = user_options.output
logging.info('Input: ', user_options.input)
logging.info('Output: ', output)
with beam.Pipeline(options=pipeline_options) as p:
p | 'Read from a File' >> beam.io.ReadFromText(user_options.input, skip_header_lines=1) | 'Split into rows' >> beam.Map(lambda x:x.split(",")) | 'Confirm index locations' >> beam.Map(lambda x:f'{x[0]},{x[1]}{x[2]}{x[3]}{x[4]},{x[5]}') | 'Write to clean file' >> beam.io.WriteToText(output)
p.run().wait_until_finish()
if __name__ == "__main__":
clean_file()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
当您在 Dataflow Pipeline 图表中选择一个步骤时,日志面板会切换显示由 Dataflow 服务生成的作业日志,显示来自运行管道步骤的 Compute Engine 实例的日志。
Cloud Logging 将从项目的 Compute Engine 实例收集的所有日志合并到一个位置。此外,请参阅记录管道消息,了解有关使用 dataflow 的各种日志记录的详细信息能力。
When you select a step in your Dataflow Pipeline graph, the logs panel toggles from displaying Job Logs generated by Dataflow service showing logs from the Compute Engine instances running your pipeline step.
Cloud Logging combines all the collected logs from your projects’s Compute Engine instances in one location. Additionally, see Logging pipeline messages for more information on using dataflow’s various logging capabilities.