当收集大小为0时,我们如何防止在数据流管线中写入空文件?

发布于 2025-01-26 12:27:29 字数 452 浏览 4 评论 0原文

我有一个数据流管线,如果有任何不正确的记录,我正在解析文件,然后我在GCS存储桶上写下它,但是当输入文件中没有错误时,数据仍然在GCS桶上写入空文件。带标头。

那么,如果PCollection尺寸为零,然后跳过此步骤,我们该如何防止这种情况?

errorRecords.apply("WritingErrorRecords", TextIO.write().to(options.getBucketPath())
             .withHeader("ID|ERROR_CODE|ERROR_MESSAGE")
             .withoutSharding()
             .withSuffix(".txt")
             .withShardNameTemplate("-SSS")
             .withNumShards(1));
        

I have a dataflow pipeline and I'm parsing a file if I got any incorrect records then I'm writing it on the GCS bucket, but when there are no errors in the input file data still TextIO writes the empty file on the GCS bucket with a header.

So, how can we prevent this if the PCollection size is zero then skip this step?

errorRecords.apply("WritingErrorRecords", TextIO.write().to(options.getBucketPath())
             .withHeader("ID|ERROR_CODE|ERROR_MESSAGE")
             .withoutSharding()
             .withSuffix(".txt")
             .withShardNameTemplate("-SSS")
             .withNumShards(1));
        

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

和我恋爱吧 2025-02-02 12:27:29

textio.write()始终写入至少一个碎片,即使它是空的。 您可以通过手动在DOFN中手动进行写作来解决此行为

PCollectionView<List<String>> errorRecordsView = errorRecords.apply(
    View.<String>asList());

// Your "main" PCollection is a PCollection with a single input,
// so the DoFn will get invoked exactly once. 
p.apply(Create.of(new String[]{"whatever"}))
 // The side input is your error records.
 .apply(ParDo.of(new DoFn<String, String>() {
      @ProcessElement
      public void processElement(
          @Element String unused,
          OutputReceiver<String> out,
          ProcessContext c) {
        List<String> errors = c.sideInput(errorRecordsView);
        if (!errors.isEmpty()) {
          // Open the file manually and write all the errors to it.
        }
      }
  }).withSideInputs(errorRecordsView);

无论如何,当您写入单个碎片时, 合理的请求。通过设置Skipifempty,最新版本的Beam版本不支持这一点。

TextIO.write() always writes at least one shard, even if it is empty. As you are writing to a single shard anyway, you could get around this behavior by doing the write manually in a DoFn that takes the to-be-written elements as a side input, e.g.

PCollectionView<List<String>> errorRecordsView = errorRecords.apply(
    View.<String>asList());

// Your "main" PCollection is a PCollection with a single input,
// so the DoFn will get invoked exactly once. 
p.apply(Create.of(new String[]{"whatever"}))
 // The side input is your error records.
 .apply(ParDo.of(new DoFn<String, String>() {
      @ProcessElement
      public void processElement(
          @Element String unused,
          OutputReceiver<String> out,
          ProcessContext c) {
        List<String> errors = c.sideInput(errorRecordsView);
        if (!errors.isEmpty()) {
          // Open the file manually and write all the errors to it.
        }
      }
  }).withSideInputs(errorRecordsView);

Being able to do so with the native Beam writes is a reasonable request. This is not supported in the latest release of Beam by setting skipIfEmpty.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文