数据流 如何将 PubSub 消息写入 GCS 单独文件

发布于 2025-01-14 05:12:42 字数 1778 浏览 5 评论 0原文

我有一个场景,其中有一个流数据流。逻辑是从 PubSub 订阅中读取 json 数据 并对外部系统进行 api 调用。如果外部系统无法访问,我需要将 json 数据保存到 GCS 以便以后重新处理。我可以将故障数据写入 GCS,但我面临的挑战是,它不是在提到的窗口之间写入单个文件,而是写入单个文件。有什么方法可以根据每个 json 数据的值将每个 pubsub 消息作为具有唯一文件名的单独文件写入 GCS。

    final TupleTag<String> failureData =new TupleTag<String>(){};
    PCollectionTuple APIOutputCollection= pipeline.apply("Read PubSub Events",PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
           .apply("Send to Apigee",ParDo.of(new APICall()).withOutputTags(successData,TupleTagList.of(failureData)));
    
    
     APIOutputCollection.get(failureData)
     .apply(
                options.getWindowDuration() + " Window",
                Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
        .apply(
                "Write File(s)",
                TextIO.write()
                    .withWindowedWrites()
                    .withNumShards(options.getNumShards())
                    .to(
                        WindowedFilenamePolicy.writeWindowedFiles()
                            .withOutputDirectory(options.getOutputDir())
                            .withOutputFilenamePrefix((options.getOutputFileNamePrefix())
                            .withShardTemplate(options.getOutputShardTemplate())
                            .withSuffix(".json")
                            .withTempDirectory(
                            NestedValueProvider.of(
                            maybeUseUserTempLocation(options.getUserTempLocation(),options.getOutputDirectory()),
                           (SerializableFunction<String, ResourceId>)
                           input -> FileBasedSink.convertToFileResourceIfPossible(input))));

I have a scenario, where I have a streaming dataflow. The logic is to read json data from a PubSub subscription
and make an api call to external system. In case if external system is not reachable, I need to persist the json data to GCS for later reprocessing. I could able to write the failure data to GCS but the challenge I have is instead of individual files, it's getting written to single file between the window mentioned. Is there any way I could write each pubsub message to GCS as individual files with unique file name based on value from each json data.

    final TupleTag<String> failureData =new TupleTag<String>(){};
    PCollectionTuple APIOutputCollection= pipeline.apply("Read PubSub Events",PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
           .apply("Send to Apigee",ParDo.of(new APICall()).withOutputTags(successData,TupleTagList.of(failureData)));
    
    
     APIOutputCollection.get(failureData)
     .apply(
                options.getWindowDuration() + " Window",
                Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
        .apply(
                "Write File(s)",
                TextIO.write()
                    .withWindowedWrites()
                    .withNumShards(options.getNumShards())
                    .to(
                        WindowedFilenamePolicy.writeWindowedFiles()
                            .withOutputDirectory(options.getOutputDir())
                            .withOutputFilenamePrefix((options.getOutputFileNamePrefix())
                            .withShardTemplate(options.getOutputShardTemplate())
                            .withSuffix(".json")
                            .withTempDirectory(
                            NestedValueProvider.of(
                            maybeUseUserTempLocation(options.getUserTempLocation(),options.getOutputDirectory()),
                           (SerializableFunction<String, ResourceId>)
                           input -> FileBasedSink.convertToFileResourceIfPossible(input))));

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文