如何计算在应用程序端生成的时间日志消息与弹性搜索摄入的时间之间的滞后?
Elasticsearch专家,需要您的帮助才能实现以下提及的目标。
目标:
试图找到一种计算时间之间滞后的方法,在应用程序结束时生成日志消息( @timestamp 字段),并且时间将其摄入到弹性搜索中( ingest_time 字段)?
当前设置:
我正在使用Fluentd捕获日志并发送到Kafka。然后,我使用Kafka Connect(Elasticsearch Connector)将日志进一步发送到Elasticsearch。由于我在Fluentd和Elasticsearch之间有一层Kafka,因此我想计算日志消息生成时间和摄入时间之间的滞后。
日志消息生成时间存储在日志的时间戳字段中,并在应用程序生成日志时完成。 PFB如何在kafka主题结束的日志消息。
{
"message": "ServiceResponse - Throwing non 2xx response",
"log_level": "ERROR",
"thread_id": "http-nio-9033-exec-21",
"trace_id": "86d39fbc237ef7f8",
"user_id": "85355139",
"tag": "feedaggregator-secondary",
"@timestamp": "2022-06-18T23:30:06+0530"
}
- 我创建了一个Ingest管道,将INGEST_TIME字段添加到插入Elasticsearch索引的每个文档中。
PUT _ingest/pipeline/ingest_time
{
"description": "Add an ingest timestamp",
"processors": [
{
"set": {
"field": "_source.ingest_time",
"value": "{{_ingest.timestamp}}"
}
}]
}
- 一旦使用Kafka Connect(ES接收器连接器)将文档从Kafka插入索引,这就是我的消息以JSON格式在Kibana上的外观。
{
"_index": "feedaggregator-secondary-2022-06-18",
"_type": "_doc",
"_id": "feedaggregator-secondary-2022-06-18+2+7521337",
"_version": 1,
"_score": null,
"_source": {
"thread_id": "http-nio-9033-exec-21",
"trace_id": "86d39fbc237ef7f8",
"@timestamp": "2022-06-18T23:30:06+0530",
"ingest_time": "2022-06-18T18:00:09.038032Z",
"user_id": "85355139",
"log_level": "ERROR",
"tag": "feedaggregator-secondary",
"message": "ServiceResponse - Throwing non 2xx response"
},
"fields": {
"@timestamp": [
"2022-06-18T18:00:06.000Z"
]
},
"sort": [
1655574126000
]
}
- 现在,我想计算 @timestamp 字段和 ingest_time 字段之间的差异。为此,我在Ingest Pipeline中添加了一个脚本,该脚本添加了一个字段 lag_seconds ,并将其设置为INT INGEST_TIME和@timestamp字段之间的差异。
PUT _ingest/pipeline/calculate_lag
{
"description": "Add an ingest timestamp and calculate ingest lag",
"processors": [
{
"set": {
"field": "_source.ingest_time",
"value": "{{_ingest.timestamp}}"
}
},
{
"script": {
"lang": "painless",
"source": """
if(ctx.containsKey("ingest_time") && ctx.containsKey("@timestamp")) {
ctx['lag_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['@timestamp']), ZonedDateTime.parse(ctx['ingest_time']))/1000;
}
"""
}
}
]
}
错误: 但是,由于我的 ingest_time 和 @timestamp 字段的格式不同,因此给出了错误的dateTimeParseException。
{
"error": {
"root_cause": [
{
"type": "exception",
"reason": "java.lang.IllegalArgumentException: ScriptException[runtime error]; nested: DateTimeParseException[Text '2022-06-18T23:30:06+0530' could not be parsed, unparsed text found at index 22];",
"header": {
"processor_type": "script"
}
}
],
"type": "exception",
"reason": "java.lang.IllegalArgumentException: ScriptException[runtime error]; nested: DateTimeParseException[Text '2022-06-18T23:30:06+0530' could not be parsed, unparsed text found at index 22];",
"caused_by": {
"type": "illegal_argument_exception",
"reason": "ScriptException[runtime error]; nested: DateTimeParseException[Text '2022-06-18T23:30:06+0530' could not be parsed, unparsed text found at index 22];",
"caused_by": {
"type": "script_exception",
"reason": "runtime error",
"script_stack": [
"java.base/java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:2049)",
"java.base/java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1948)",
"java.base/java.time.ZonedDateTime.parse(ZonedDateTime.java:598)",
"java.base/java.time.ZonedDateTime.parse(ZonedDateTime.java:583)",
"ctx['lag_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['@timestamp']), ZonedDateTime.parse(ctx['ingest_time']))/1000;\n }",
" ^---- HERE"
],
"script": " if(ctx.containsKey(\"ingest_time\") && ctx.containsKey(\"@timestamp\")) {\n ctx['lag_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['@timestamp']), ZonedDateTime.parse(ctx['ingest_time']))/1000;\n }",
"lang": "painless",
"caused_by": {
"type": "date_time_parse_exception",
"reason": "Text '2022-06-18T23:30:06+0530' could not be parsed, unparsed text found at index 22"
}
}
},
"header": {
"processor_type": "script"
}
},
"status": 500
}
因此,需要您的帮助才能在@Timestamp和Ingest_time字段之间找到lag_seconds。
使用AWS(OpenSearch)Elasticsearch版本的托管Elasticsearch -7.1
Elasticsearch Experts, need your help to achieve the below mention goal.
Goal:
Trying to find a way to calculate lag between the time, log message was generated at application end (@timestamp field) and the time, it was ingested to Elastic Search (ingest_time field)?
Current Setup:
I am using FluentD to capture the logs and send to Kafka. Then I use Kafka connect (Elasticsearch connector) to send the logs further to Elasticsearch. Since I have a layer of Kafka in between FluentD and Elasticsearch, I want to calculate the lag between the log message generation time and ingestion time.
Log message generation time is stored in timestamp field of the log and is done at when the application generates log. PFB how log message looks at Kafka topic end.
{
"message": "ServiceResponse - Throwing non 2xx response",
"log_level": "ERROR",
"thread_id": "http-nio-9033-exec-21",
"trace_id": "86d39fbc237ef7f8",
"user_id": "85355139",
"tag": "feedaggregator-secondary",
"@timestamp": "2022-06-18T23:30:06+0530"
}
- I have created an ingest pipeline to add the ingest_time field to every doc inserted to the Elasticsearch index.
PUT _ingest/pipeline/ingest_time
{
"description": "Add an ingest timestamp",
"processors": [
{
"set": {
"field": "_source.ingest_time",
"value": "{{_ingest.timestamp}}"
}
}]
}
- Once document gets inserted to the index from Kafka using Kafka connect (ES sink connector), this is how my message looks on Kibana in JSON format.
{
"_index": "feedaggregator-secondary-2022-06-18",
"_type": "_doc",
"_id": "feedaggregator-secondary-2022-06-18+2+7521337",
"_version": 1,
"_score": null,
"_source": {
"thread_id": "http-nio-9033-exec-21",
"trace_id": "86d39fbc237ef7f8",
"@timestamp": "2022-06-18T23:30:06+0530",
"ingest_time": "2022-06-18T18:00:09.038032Z",
"user_id": "85355139",
"log_level": "ERROR",
"tag": "feedaggregator-secondary",
"message": "ServiceResponse - Throwing non 2xx response"
},
"fields": {
"@timestamp": [
"2022-06-18T18:00:06.000Z"
]
},
"sort": [
1655574126000
]
}
- Now, I wanted to calculate the difference between @timestamp field and ingest_time field. For this I added a script in the ingest pipeline, which adds a field lag_seconds and sets it value as the difference between ingest_time and @timestamp fields.
PUT _ingest/pipeline/calculate_lag
{
"description": "Add an ingest timestamp and calculate ingest lag",
"processors": [
{
"set": {
"field": "_source.ingest_time",
"value": "{{_ingest.timestamp}}"
}
},
{
"script": {
"lang": "painless",
"source": """
if(ctx.containsKey("ingest_time") && ctx.containsKey("@timestamp")) {
ctx['lag_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['@timestamp']), ZonedDateTime.parse(ctx['ingest_time']))/1000;
}
"""
}
}
]
}
Error:
But since my ingest_time and @timestamp fields are in different format it gave error DateTimeParseException.
{
"error": {
"root_cause": [
{
"type": "exception",
"reason": "java.lang.IllegalArgumentException: ScriptException[runtime error]; nested: DateTimeParseException[Text '2022-06-18T23:30:06+0530' could not be parsed, unparsed text found at index 22];",
"header": {
"processor_type": "script"
}
}
],
"type": "exception",
"reason": "java.lang.IllegalArgumentException: ScriptException[runtime error]; nested: DateTimeParseException[Text '2022-06-18T23:30:06+0530' could not be parsed, unparsed text found at index 22];",
"caused_by": {
"type": "illegal_argument_exception",
"reason": "ScriptException[runtime error]; nested: DateTimeParseException[Text '2022-06-18T23:30:06+0530' could not be parsed, unparsed text found at index 22];",
"caused_by": {
"type": "script_exception",
"reason": "runtime error",
"script_stack": [
"java.base/java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:2049)",
"java.base/java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1948)",
"java.base/java.time.ZonedDateTime.parse(ZonedDateTime.java:598)",
"java.base/java.time.ZonedDateTime.parse(ZonedDateTime.java:583)",
"ctx['lag_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['@timestamp']), ZonedDateTime.parse(ctx['ingest_time']))/1000;\n }",
" ^---- HERE"
],
"script": " if(ctx.containsKey(\"ingest_time\") && ctx.containsKey(\"@timestamp\")) {\n ctx['lag_in_seconds'] = ChronoUnit.MILLIS.between(ZonedDateTime.parse(ctx['@timestamp']), ZonedDateTime.parse(ctx['ingest_time']))/1000;\n }",
"lang": "painless",
"caused_by": {
"type": "date_time_parse_exception",
"reason": "Text '2022-06-18T23:30:06+0530' could not be parsed, unparsed text found at index 22"
}
}
},
"header": {
"processor_type": "script"
}
},
"status": 500
}
So, need your help to find the lag_seconds, between the @timestamp and ingest_time fields.
Using managed Elasticsearch by AWS (Opensearch) Elasticsearch Version - 7.1
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我可以看到@timestamp字段的Java日期解析问题。
ctx ['@Timestamp']
将返回值“ 2022-06-18T23:30:06+0530”,这是iso_offset_date_date_time
。您需要解析这是使用offsetDateTime.parse(ctx ['@timestamp'])
。另外,您可以尝试从字段块访问@timestamp。 You can read up on date parsing in Java at https://howtodoinjava.com/java /日期时间/ZonedDateTime-Parse/。I can see a Java date parsing problem for the @timestamp field.
ctx['@timestamp']
will return the value "2022-06-18T23:30:06+0530", which is aISO_OFFSET_DATE_TIME
. You would need to parse this is usingOffsetDateTime.parse(ctx['@timestamp'])
. Alternatively, you could try to access the @timestamp from the fields block. You can read up on date parsing in Java at https://howtodoinjava.com/java/date-time/zoneddatetime-parse/.