使用 apache beam dataflow java 流式传输 pubsub -bigtable

发布于 2025-01-13 12:15:26 字数 1764 浏览 1 评论 0原文

尝试将 pubsub json 消息更新为 bigtable。我正在从本地计算机运行代码。正在创建数据流作业。但我没有看到 bigtable 实例中更新了任何数据,而且它也不会在控制台或数据流作业中引发任何错误。我还尝试了硬编码值并尝试在 bigtable 中更新,但仍然不起作用。请任何人在这个问题上建议或指导我

 try{
    PipelineOptions options = PipelineOptionsFactory.fromArgs(projectArgs).create();
    options.setRunner(DataflowRunner.class);
    System.out.println("tempfile-" + options.getTempLocation());
    Pipeline p = Pipeline.create(options);
    System.out.println("options" + options.getTempLocation());
    p.apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(PUBSUB_SUBSCRIPTION))
            .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
            .apply(ParDo.of(new RowGenerator())).apply(CloudBigtableIO.writeToTable(bigtableConfig));

    p.run();
    }catch (Exception e) {
        // TODO: handle exception
        System.out.println(e);
    }
}

@ProcessElement public void processElement(ProcessContext context) {

try {
System.out.println("In for RowGenerator");
String decodedMessageAsJsonString = context.element();
System.out.println("decodedMessageAsJsonString"+decodedMessageAsJsonString);

String rowKey = String.valueOf(
        LocalDateTime.ofInstant(Instant.now(), ZoneId.of("UTC"))
                .toEpochSecond(ZoneOffset.UTC));
System.out.println("rowKey"+rowKey);

Put put = new Put(rowKey.getBytes());
put.addColumn("VALUE".getBytes(), "VALUE".getBytes(), decodedMessageAsJsonString.getBytes());
     // put.addColumn(Bytes.toBytes("IBS"), Bytes.toBytes("name"),Bytes.toBytes("ram"));

context.output(put);
}catch (Throwable e) {
    // TODO: handle exception
    System.out.println(e);
}

}在此处输入图像描述

Trying to update the pubsub json message to bigtable .I am running code from local machine .the dataflow job is getting created .but i dont see any data updated in bigtable instance and also it does not throw any error in console or dataflow job.I also tried to have hardcode value and try to update in bigtable but still it didnt work. Please can anyone suggest or guide me in this issue

 try{
    PipelineOptions options = PipelineOptionsFactory.fromArgs(projectArgs).create();
    options.setRunner(DataflowRunner.class);
    System.out.println("tempfile-" + options.getTempLocation());
    Pipeline p = Pipeline.create(options);
    System.out.println("options" + options.getTempLocation());
    p.apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(PUBSUB_SUBSCRIPTION))
            .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
            .apply(ParDo.of(new RowGenerator())).apply(CloudBigtableIO.writeToTable(bigtableConfig));

    p.run();
    }catch (Exception e) {
        // TODO: handle exception
        System.out.println(e);
    }
}

@ProcessElement
public void processElement(ProcessContext context) {

try {
System.out.println("In for RowGenerator");
String decodedMessageAsJsonString = context.element();
System.out.println("decodedMessageAsJsonString"+decodedMessageAsJsonString);

String rowKey = String.valueOf(
        LocalDateTime.ofInstant(Instant.now(), ZoneId.of("UTC"))
                .toEpochSecond(ZoneOffset.UTC));
System.out.println("rowKey"+rowKey);

Put put = new Put(rowKey.getBytes());
put.addColumn("VALUE".getBytes(), "VALUE".getBytes(), decodedMessageAsJsonString.getBytes());
     // put.addColumn(Bytes.toBytes("IBS"), Bytes.toBytes("name"),Bytes.toBytes("ram"));

context.output(put);
}catch (Throwable e) {
    // TODO: handle exception
    System.out.println(e);
}

}enter image description here

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

苦笑流年记忆 2025-01-20 12:15:26

我没有看到模板的 Bigtable 方面有任何问题。只需确保目标表上存在列族(我假设为“VALUE”)。

您确定您正在阅读正确的 PubSub 订阅并且有消息发送到 PubSub。如果一切正确,则似乎存在PubSub 配置中存在一些问题,也许可以在问题上添加 PubSub 标签,并且来自 pubsub 社区的人可以提供帮助。

I don't see any issue with the Bigtable side of the template. Just make sure that the column family (which I am assuming is "VALUE" exists on the destination table.

Are you sure that you are reading the right PubSub subscription and there are messages being sent to PubSub. If its all correct, it seems there is some issue in the PubSub configuration. Maybe add the PubSub tag on the question and someone from the pubsub community can help.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文