数据流 如何将 PubSub 消息写入 GCS 单独文件
我有一个场景,其中有一个流数据流。逻辑是从 PubSub 订阅中读取 json 数据 并对外部系统进行 api 调用。如果外部系统无法访问,我需要将 json 数据保存到 GCS 以便以后重新处理。我可以将故障数据写入 GCS,但我面临的挑战是,它不是在提到的窗口之间写入单个文件,而是写入单个文件。有什么方法可以根据每个 json 数据的值将每个 pubsub 消息作为具有唯一文件名的单独文件写入 GCS。
final TupleTag<String> failureData =new TupleTag<String>(){};
PCollectionTuple APIOutputCollection= pipeline.apply("Read PubSub Events",PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
.apply("Send to Apigee",ParDo.of(new APICall()).withOutputTags(successData,TupleTagList.of(failureData)));
APIOutputCollection.get(failureData)
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
WindowedFilenamePolicy.writeWindowedFiles()
.withOutputDirectory(options.getOutputDir())
.withOutputFilenamePrefix((options.getOutputFileNamePrefix())
.withShardTemplate(options.getOutputShardTemplate())
.withSuffix(".json")
.withTempDirectory(
NestedValueProvider.of(
maybeUseUserTempLocation(options.getUserTempLocation(),options.getOutputDirectory()),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input))));
I have a scenario, where I have a streaming dataflow. The logic is to read json data from a PubSub subscription
and make an api call to external system. In case if external system is not reachable, I need to persist the json data to GCS for later reprocessing. I could able to write the failure data to GCS but the challenge I have is instead of individual files, it's getting written to single file between the window mentioned. Is there any way I could write each pubsub message to GCS as individual files with unique file name based on value from each json data.
final TupleTag<String> failureData =new TupleTag<String>(){};
PCollectionTuple APIOutputCollection= pipeline.apply("Read PubSub Events",PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
.apply("Send to Apigee",ParDo.of(new APICall()).withOutputTags(successData,TupleTagList.of(failureData)));
APIOutputCollection.get(failureData)
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
WindowedFilenamePolicy.writeWindowedFiles()
.withOutputDirectory(options.getOutputDir())
.withOutputFilenamePrefix((options.getOutputFileNamePrefix())
.withShardTemplate(options.getOutputShardTemplate())
.withSuffix(".json")
.withTempDirectory(
NestedValueProvider.of(
maybeUseUserTempLocation(options.getUserTempLocation(),options.getOutputDirectory()),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input))));
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论