处理弹性搜索错误将被重定向到BigQuery Table/errortopic,用于使用Java中的DataFlow/Beam记录

发布于 2025-02-09 07:11:55 字数 1530 浏览 1 评论 0原文

使用DataFlow/Apache Beam Framework/GCP,将要写入BigQuery Table/errortopic写入BigQuery Table/errortopic。 我们希望捕获Elasticsearch中的错误,以免错过任何数据。 专门针对Elasticsearch中的错误类型,以及如何将其重定向到BigQuery来记录所有错误。

例外可以是可回收/不可回收类型的。 例如,相互连接时间,密钥库负载错误等。

这些错误需要登录BigQuery,并且还发布到了错误主题。

我试图找到答案,但我找不到有关此主题的任何东西。

在代码段下方,我们初始化并写入弹性搜索 - 我们创建的自定义模板是从PubSub读取并写入ES。

/* *步骤#1:从PubSub订阅中阅读。 */

pcollection消息= null;

if (options.getUseSubscription()) {

  messages =

          pipeline.apply(

                  "ReadPubSubSubscription",

                  PubsubIO.readMessagesWithAttributes()
                          .fromSubscription(options.getInputSubscription()));

}

/* *步骤#2:将pubsubmessages转换为JSON文档。 * /

pcollectionTuple convertedPubSubMessages = message.apply(

      "ConvertMessageToJsonDocument",new PubSubMessageToJsonDocument(options));

/*

  • 步骤#3A:使用{@link elasticsearchtransforms.writetoelasticsearch}将json文档写入elasticsearch。 */

convertedPubsubmessages

      .get(TRANSFORM_OUT)

      .apply(
              "GetJsonDocuments",
              MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload)

) 。申请(

              "WriteToElasticsearch",

              ElasticsearchTransforms.WriteToElasticsearch.newBuilder()
                      .setOptions(options.as(WriteToElasticsearchOptions.class))

                      .build());

Handle ElasticSearch errors to be written to BigQuery table/errortopic using Dataflow / apache beam framework/GCP.
We are looking to capture the errors in elasticSearch so that we don't miss any data .
Specifically for types of errors in elasticSearch and how to redirect it to bigquery for logging all errors.

Exception can be recoverable/non-recoverable type.
eg-ConnectTimeOut,Keystore load error etc

These errors needs to be logged in bigquery and also published to error topic.

I tried to find the answer but I wasn't able to find anything around this topic.

Below Code snippet where we initialize and write to our elasticsearch -
This custom template we created is to read from pubsub and write to ES.

/*
* Step #1: Read from a PubSub subscription.
*/

PCollection messages = null;

if (options.getUseSubscription()) {

  messages =

          pipeline.apply(

                  "ReadPubSubSubscription",

                  PubsubIO.readMessagesWithAttributes()
                          .fromSubscription(options.getInputSubscription()));

}

/*
* Step #2: Transform the PubsubMessages into Json documents.
*/

PCollectionTuple convertedPubsubMessages = messages.apply(

      "ConvertMessageToJsonDocument",new PubSubMessageToJsonDocument(options));

/*

  • Step #3a: Write Json documents into Elasticsearch using {@link ElasticsearchTransforms.WriteToElasticsearch}.
    */

convertedPubsubMessages

      .get(TRANSFORM_OUT)

      .apply(
              "GetJsonDocuments",
              MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload)

)
.apply(

              "WriteToElasticsearch",

              ElasticsearchTransforms.WriteToElasticsearch.newBuilder()
                      .setOptions(options.as(WriteToElasticsearchOptions.class))

                      .build());

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文