为Avroio作家添加标题和页脚

发布于 2025-02-05 14:29:49 字数 164 浏览 4 评论 0原文

我们有一个要求将标题和页脚添加到输出AVRO文件中,但SDK似乎不支持它。对于Textio Writer,似乎具有该功能withHeader和。

也就是说,在不创建单独的管道的情况下,最好的方法是什么?我尝试在作者之后添加另一个一步,但似乎假定该管道是在作者之后结束的。

We have a requirements to add a header and footer to a output avro file, but seems like the SDK doesn't support it. For TextIO writer, it seems to have that capability withHeader and withFooter.

That been said, what's the best way to do it without creating a separate pipeline? I tried add another step after the writer, but seems like the pipeline assumed to be ended after the writer.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

情归归情 2025-02-12 14:29:50

最终通过扩展filebasedsink找到了解决方案。

public class CustomAvroSink extens FileBasedSink{
...
 @Override
  public WriteOperation<DestinationT, GenericRecord> createWriteOperation() {
    return new CustomAvroWriteOperation(this, this.genericRecords, this.header, this.footer);
....
  }

private static class CustomAvroWriteOperation<DestinationT, OutputT> extends
      WriteOperation<DestinationT, OutputT> {

    private final DynamicAvroDestinations<?, DestinationT, OutputT> dynamicDestinations;
    private final boolean genericRecords;
    private final OutputT header;
    private final OutputT footer;

    private CustomAvroWriteOperation(HeaderFooterAvroSink<?, DestinationT, OutputT> sink,
        boolean genericRecords, OutputT header, OutputT footer) {
      super(sink);
      this.dynamicDestinations = sink.getDynamicDestinations();
      this.genericRecords = genericRecords;
      this.header = header;
      this.footer = footer;
    }

    public Writer<DestinationT, OutputT> createWriter() throws Exception {
      return new CustomAvroWriter<>(this, this.dynamicDestinations, this.genericRecords,
          this.header, this.footer);
    }
  }

...
  private static class CustomAvroWriter<DestinationT, OutputT> extends
      Writer<DestinationT, OutputT> {
   @Override
    protected void writeHeader() throws Exception {
      if (this.header != null) {
        this.dataFileWriter.append(this.header);
      }
    }

  @Override
    protected void writeFooter() throws Exception {
      if (this.footer != null) {
        this.dataFileWriter.append(this.footer);
      }
    }
 }
}

然后,我只能做mypCollection.apply(“ header footer”,writefiles.to(new customavrosink(... header,footer ...)))>

Finally found a solution by extending FileBasedSink.

public class CustomAvroSink extens FileBasedSink{
...
 @Override
  public WriteOperation<DestinationT, GenericRecord> createWriteOperation() {
    return new CustomAvroWriteOperation(this, this.genericRecords, this.header, this.footer);
....
  }

private static class CustomAvroWriteOperation<DestinationT, OutputT> extends
      WriteOperation<DestinationT, OutputT> {

    private final DynamicAvroDestinations<?, DestinationT, OutputT> dynamicDestinations;
    private final boolean genericRecords;
    private final OutputT header;
    private final OutputT footer;

    private CustomAvroWriteOperation(HeaderFooterAvroSink<?, DestinationT, OutputT> sink,
        boolean genericRecords, OutputT header, OutputT footer) {
      super(sink);
      this.dynamicDestinations = sink.getDynamicDestinations();
      this.genericRecords = genericRecords;
      this.header = header;
      this.footer = footer;
    }

    public Writer<DestinationT, OutputT> createWriter() throws Exception {
      return new CustomAvroWriter<>(this, this.dynamicDestinations, this.genericRecords,
          this.header, this.footer);
    }
  }

...
  private static class CustomAvroWriter<DestinationT, OutputT> extends
      Writer<DestinationT, OutputT> {
   @Override
    protected void writeHeader() throws Exception {
      if (this.header != null) {
        this.dataFileWriter.append(this.header);
      }
    }

  @Override
    protected void writeFooter() throws Exception {
      if (this.footer != null) {
        this.dataFileWriter.append(this.footer);
      }
    }
 }
}

Then I can just do myPCollection.apply("header footer",WriteFiles.to(new CustomAvroSink(...header, footer...)))

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文