处理弹性搜索错误将被重定向到BigQuery Table/errortopic,用于使用Java中的DataFlow/Beam记录
使用DataFlow/Apache Beam Framework/GCP,将要写入BigQuery Table/errortopic写入BigQuery Table/errortopic。 我们希望捕获Elasticsearch中的错误,以免错过任何数据。 专门针对Elasticsearch中的错误类型,以及如何将其重定向到BigQuery来记录所有错误。
例外可以是可回收/不可回收类型的。 例如,相互连接时间,密钥库负载错误等。
这些错误需要登录BigQuery,并且还发布到了错误主题。
我试图找到答案,但我找不到有关此主题的任何东西。
在代码段下方,我们初始化并写入弹性搜索 - 我们创建的自定义模板是从PubSub读取并写入ES。
/* *步骤#1:从PubSub订阅中阅读。 */
pcollection消息= null;
if (options.getUseSubscription()) {
messages =
pipeline.apply(
"ReadPubSubSubscription",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()));
}
/* *步骤#2:将pubsubmessages转换为JSON文档。 * /
pcollectionTuple convertedPubSubMessages = message.apply(
"ConvertMessageToJsonDocument",new PubSubMessageToJsonDocument(options));
/*
- 步骤#3A:使用{@link elasticsearchtransforms.writetoelasticsearch}将json文档写入elasticsearch。 */
convertedPubsubmessages
.get(TRANSFORM_OUT)
.apply(
"GetJsonDocuments",
MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload)
) 。申请(
"WriteToElasticsearch",
ElasticsearchTransforms.WriteToElasticsearch.newBuilder()
.setOptions(options.as(WriteToElasticsearchOptions.class))
.build());
Handle ElasticSearch errors to be written to BigQuery table/errortopic using Dataflow / apache beam framework/GCP.
We are looking to capture the errors in elasticSearch so that we don't miss any data .
Specifically for types of errors in elasticSearch and how to redirect it to bigquery for logging all errors.
Exception can be recoverable/non-recoverable type.
eg-ConnectTimeOut,Keystore load error etc
These errors needs to be logged in bigquery and also published to error topic.
I tried to find the answer but I wasn't able to find anything around this topic.
Below Code snippet where we initialize and write to our elasticsearch -
This custom template we created is to read from pubsub and write to ES.
/*
* Step #1: Read from a PubSub subscription.
*/
PCollection messages = null;
if (options.getUseSubscription()) {
messages =
pipeline.apply(
"ReadPubSubSubscription",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(options.getInputSubscription()));
}
/*
* Step #2: Transform the PubsubMessages into Json documents.
*/
PCollectionTuple convertedPubsubMessages = messages.apply(
"ConvertMessageToJsonDocument",new PubSubMessageToJsonDocument(options));
/*
- Step #3a: Write Json documents into Elasticsearch using {@link ElasticsearchTransforms.WriteToElasticsearch}.
*/
convertedPubsubMessages
.get(TRANSFORM_OUT)
.apply(
"GetJsonDocuments",
MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload)
)
.apply(
"WriteToElasticsearch",
ElasticsearchTransforms.WriteToElasticsearch.newBuilder()
.setOptions(options.as(WriteToElasticsearchOptions.class))
.build());
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论