当收集大小为0时,我们如何防止在数据流管线中写入空文件?
我有一个数据流管线,如果有任何不正确的记录,我正在解析文件,然后我在GCS存储桶上写下它,但是当输入文件中没有错误时,数据仍然在GCS桶上写入空文件。带标头。
那么,如果PCollection尺寸为零,然后跳过此步骤,我们该如何防止这种情况?
errorRecords.apply("WritingErrorRecords", TextIO.write().to(options.getBucketPath())
.withHeader("ID|ERROR_CODE|ERROR_MESSAGE")
.withoutSharding()
.withSuffix(".txt")
.withShardNameTemplate("-SSS")
.withNumShards(1));
I have a dataflow pipeline and I'm parsing a file if I got any incorrect records then I'm writing it on the GCS bucket, but when there are no errors in the input file data still TextIO writes the empty file on the GCS bucket with a header.
So, how can we prevent this if the PCollection size is zero then skip this step?
errorRecords.apply("WritingErrorRecords", TextIO.write().to(options.getBucketPath())
.withHeader("ID|ERROR_CODE|ERROR_MESSAGE")
.withoutSharding()
.withSuffix(".txt")
.withShardNameTemplate("-SSS")
.withNumShards(1));
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
Beam Textio添加了对
skipifempty()
在2.40.0中的支持,请参见: https://beam.apache.org/releases/javadoc/current/current/current/org/org/apache/beam/beam/sdk/sdk/sdk/io/textio.textio.typedwrite。 html#skipifempty-Beam TextIO added support for
skipIfEmpty()
in 2.40.0, see: https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.TypedWrite.html#skipIfEmpty--textio.write()
始终写入至少一个碎片,即使它是空的。 您可以通过手动在DOFN中手动进行写作来解决此行为无论如何,当您写入单个碎片时, 合理的请求。通过设置Skipifempty,最新版本的Beam版本不支持这一点。
TextIO.write()
always writes at least one shard, even if it is empty. As you are writing to a single shard anyway, you could get around this behavior by doing the write manually in a DoFn that takes the to-be-written elements as a side input, e.g.Being able to do so with the native Beam writes is a reasonable request. This is not supported in the latest release of Beam by setting skipIfEmpty.