如何从插入中生成弹性新文档?
目前,我使用 Jaeger 监控我的应用程序,它使用弹性搜索作为后端,我希望能够根据某些条件将这些信息聚合到新文档中,以便生成一些报告。
从插入带有“phase=end”标签的文档,如下所示:
{
"_index": "jaeger-span-2022-03-30",
"_type": "_doc",
"_id": "7fso238BngwX41T6Cr9y",
"_version": 1,
"_score": null,
"fields": {
"traceID": [
"0cefe26bed7464436c43519e7fcbf6c2"
],
"duration": [
1898679
],
"spanID": [
"74b463687cfaf503"
],
"startTimeMillis": [
"2022-03-30T14:08:011.642Z"
],
"references": [
{
"spanID": [
"8657c748a0508e8b"
],
"traceID": [
"0cefe26bed7464436c43519e7fcbf6c2"
],
"refType": [
"CHILD_OF"
]
}
],
"process.serviceName": [
"a09-002"
],
"startTime": [
1648649289644801
],
"operationName": [
"br.com.flow.items.FinalOperation.execute"
],
"tags": [
{
"type": [
"string"
],
"value": [
"end"
],
"key": [
"phase"
]
}
]
}
}
以及在跟踪开头创建的文档,如下所示:
{
"_index": "jaeger-span-2022-03-30",
"_type": "_doc",
"_id": "7fso238BngwX41T6Cr9y",
"_version": 1,
"_score": null,
"fields": {
"traceID": [
"0cefe26bed7464436c43519e7fcbf6c2"
],
"duration": [
1898679
],
"spanID": [
"74b463687cfaf503"
],
"startTimeMillis": [
"2022-03-30T14:08:09.642Z"
],
"references": [
{
"spanID": [
"8657c748a0508e8b"
],
"traceID": [
"0cefe26bed7464436c43519e7fcbf6c2"
],
"refType": [
"CHILD_OF"
]
}
],
"process.serviceName": [
"a09-002"
],
"startTime": [
1648649289642801
],
"operationName": [
"br.com.flow.items.InitialOperation.execute"
]
}
}
我想加入数据以形成一个新文档,如下所示:
{
"fields": {
"traceID": [
"0cefe26bed7464436c43519e7fcbf6c2"
],
"duration": [
2000
],
"startTime": [
1648649289642801
],
"endTime": [
1648649289644801
],
"process.serviceName": [
"a09-002"
]
}
}
对于我需要执行以下步骤:
1 - 观察带有上述标签的文档插入事件
2 - 计算 endTime ,它基本上是包含 Phase=end 标签的文档的 startTime + 持续时间
3 - 计算持续时间有必要捕获开始时间第一个插入的文档与该标签的文档具有相同的traceID,然后从中减去之前计算的endTime值。
我如何执行这些任务并生成新文档?最初我想过使用CDC来完成这个过程,但是elasticsearch似乎不支持这个功能。
Currently I monitor my applications with Jaeger and as a backend it uses elastic search, I would like to be able to aggregate this information into new documents based on some criteria in order to generate some reports.
From the insertion of a document that has the tag “phase=end” as below:
{
"_index": "jaeger-span-2022-03-30",
"_type": "_doc",
"_id": "7fso238BngwX41T6Cr9y",
"_version": 1,
"_score": null,
"fields": {
"traceID": [
"0cefe26bed7464436c43519e7fcbf6c2"
],
"duration": [
1898679
],
"spanID": [
"74b463687cfaf503"
],
"startTimeMillis": [
"2022-03-30T14:08:011.642Z"
],
"references": [
{
"spanID": [
"8657c748a0508e8b"
],
"traceID": [
"0cefe26bed7464436c43519e7fcbf6c2"
],
"refType": [
"CHILD_OF"
]
}
],
"process.serviceName": [
"a09-002"
],
"startTime": [
1648649289644801
],
"operationName": [
"br.com.flow.items.FinalOperation.execute"
],
"tags": [
{
"type": [
"string"
],
"value": [
"end"
],
"key": [
"phase"
]
}
]
}
}
And a document created at the beginning of the trace like the one below:
{
"_index": "jaeger-span-2022-03-30",
"_type": "_doc",
"_id": "7fso238BngwX41T6Cr9y",
"_version": 1,
"_score": null,
"fields": {
"traceID": [
"0cefe26bed7464436c43519e7fcbf6c2"
],
"duration": [
1898679
],
"spanID": [
"74b463687cfaf503"
],
"startTimeMillis": [
"2022-03-30T14:08:09.642Z"
],
"references": [
{
"spanID": [
"8657c748a0508e8b"
],
"traceID": [
"0cefe26bed7464436c43519e7fcbf6c2"
],
"refType": [
"CHILD_OF"
]
}
],
"process.serviceName": [
"a09-002"
],
"startTime": [
1648649289642801
],
"operationName": [
"br.com.flow.items.InitialOperation.execute"
]
}
}
I would like to join data to form a new document like the one below:
{
"fields": {
"traceID": [
"0cefe26bed7464436c43519e7fcbf6c2"
],
"duration": [
2000
],
"startTime": [
1648649289642801
],
"endTime": [
1648649289644801
],
"process.serviceName": [
"a09-002"
]
}
}
For that I need to do the following steps:
1 - Observe the document insertion event with the mentioned tag
2 - Calculate the endTime which is basically the startTime + duration of the document that contains the phase=end tag
3 - Calculate the duration , for that it would be necessary to capture the startTime of the first inserted document that has the same traceID of the document with the tag and then subtract the value of the endTime calculated previously from it.
How can I perform these tasks and generate a new document? Initially I thought about using CDC to do this process, but it seems elasticsearch doesn't support this feature.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我最终采取了另一种方式,我将跟踪发送到 Kafka 并创建一个流来分析数据,然后将合并的结果保存在数据库中。
I ended up going the other way, I sent the traces to Kafka and made a stream to analyze the data and then save the consolidated result in a database.