如何使用LogStash过滤Elasticsearch上的不规则数据
我只是在学习使用弹性,并且有这样的日志。
*************************************************************************************
Task with ID = 119 is waiting for the message to arrive on the queue 1294598175.
=>bpc_wait_for_event (comm_utils.c)
=>bpc_select (pipe_utils.c)
Using select() to wait for an event to occur at 10:00:02
Event selector just had 1 fd(s) triggered at 10:00:02
=>process_incoming_data (tcp_main.c)
=>reset_device_idle_timers (tcp_main.c)
=>receive_incoming_message (tcp_xfer.c)
=>receive_ncr_message (tcp_xfer.c)
=>tcp_receive_data (tcp_xfer.c)
=>bpc_sock_recv (comm_utils.c)
=>tcp_receive_data (tcp_xfer.c)
=>bpc_sock_recv (comm_utils.c)
30.32.31.30.46.33.33.38.34.30.30.39.38.41.38.31 0210F33840098A81
38.34.30.30.30.30.30.30.30.30.30.30.30.34.30.30 8400000000000400
30.30.30.34.31.36.36.30.33.34.39.34.38.38.31.31 0004166034948811
30.34.31.32.37.33.30.31.31.30.30.30.30.30.30.30 0412730110000000
33.30.30.30.30.30.30.30.30.32.32.34.31.30.30.30 3000000002241000
30.31.30.30.30.30.30.30.30.30.34.31.36.39.34.39 0100000000416949
31.30.30.30.30.31.30.32.32.34.36.30.31.31.30.31 1000010224601101
38.31.30.30.31.31.30.32.31.34.30.36.34.35.30.30 8100110214064500
30.35.30.30.39.39.33.37.20.20.20.20.20.20.30.30 05009937 00
53.31.47.39.39.38.32.36.30.31.32.46.54.32.30.30 S1G99826012FT200
35.35.4B.52.58.56.44.33.36.30.30.32.30.31.30.30 55KRXVD360020100
32.33.36.30.43.30.30.30.30.36.30.35.31.36.34.39 2360C00006051649
36.31.30.37.31.30.38.39.38.31.33.33.39.30.30.33 6107108981339003
34.35.31 451
Received 211 bytes of data from device #600, sending to CROUTer
=>raw_msgx2sv_msg (tag_utils.c)
************************ Header Fields *************************
orgdev: 600 utrnno: 0
orgid: 0 reversal: 0
destid: 0 repeat: 0
last_task_id: 119 fintran: 0
msgtype: 51 phase: 0
task_msgtype: 0 balances: 0
respreq: 0 stood_in_for: 0
resp_qid: -1 issuer_posted: 0
smsgno: 0 sv_trace: 0
nwindicator: 0 timestamp: 0
devinfo: []
hpan: []
fld_flags: 00000000000000000000
sys_msg_no: 0
msgsnd_w_retry [dst task: COMMSINT, time: 24/02/2020 10:00:02.0468]: Msg sent to queue 1293844488
我已经尝试使用我创建的过滤器,但是它不起作用,只是将每一行插入这样的弹性。
logstash文件
input {
beats {
port => "5044"
} } filter { ruby { code => " event.set('msgfilter', event.get('message').scan(/\s{7}[a-zA-Z0-9]+(?=\r\n|\s*\r\nReceived)/) ) " id => "ruby-counter"
}
# merge array of strings
mutate { join => { "msgfilter" => "" } }
# remove spaces
mutate { gsub => [ "msgfilter", " " , "" ] } } output {
elasticsearch {
hosts => [ "localhost:9200" ]
} }
,这是我想要结果的 红色框部分并将其合并成一条线。 结果我想要
I'm just learning to use elastic and I have a log like this.
*************************************************************************************
Task with ID = 119 is waiting for the message to arrive on the queue 1294598175.
=>bpc_wait_for_event (comm_utils.c)
=>bpc_select (pipe_utils.c)
Using select() to wait for an event to occur at 10:00:02
Event selector just had 1 fd(s) triggered at 10:00:02
=>process_incoming_data (tcp_main.c)
=>reset_device_idle_timers (tcp_main.c)
=>receive_incoming_message (tcp_xfer.c)
=>receive_ncr_message (tcp_xfer.c)
=>tcp_receive_data (tcp_xfer.c)
=>bpc_sock_recv (comm_utils.c)
=>tcp_receive_data (tcp_xfer.c)
=>bpc_sock_recv (comm_utils.c)
30.32.31.30.46.33.33.38.34.30.30.39.38.41.38.31 0210F33840098A81
38.34.30.30.30.30.30.30.30.30.30.30.30.34.30.30 8400000000000400
30.30.30.34.31.36.36.30.33.34.39.34.38.38.31.31 0004166034948811
30.34.31.32.37.33.30.31.31.30.30.30.30.30.30.30 0412730110000000
33.30.30.30.30.30.30.30.30.32.32.34.31.30.30.30 3000000002241000
30.31.30.30.30.30.30.30.30.30.34.31.36.39.34.39 0100000000416949
31.30.30.30.30.31.30.32.32.34.36.30.31.31.30.31 1000010224601101
38.31.30.30.31.31.30.32.31.34.30.36.34.35.30.30 8100110214064500
30.35.30.30.39.39.33.37.20.20.20.20.20.20.30.30 05009937 00
53.31.47.39.39.38.32.36.30.31.32.46.54.32.30.30 S1G99826012FT200
35.35.4B.52.58.56.44.33.36.30.30.32.30.31.30.30 55KRXVD360020100
32.33.36.30.43.30.30.30.30.36.30.35.31.36.34.39 2360C00006051649
36.31.30.37.31.30.38.39.38.31.33.33.39.30.30.33 6107108981339003
34.35.31 451
Received 211 bytes of data from device #600, sending to CROUTer
=>raw_msgx2sv_msg (tag_utils.c)
************************ Header Fields *************************
orgdev: 600 utrnno: 0
orgid: 0 reversal: 0
destid: 0 repeat: 0
last_task_id: 119 fintran: 0
msgtype: 51 phase: 0
task_msgtype: 0 balances: 0
respreq: 0 stood_in_for: 0
resp_qid: -1 issuer_posted: 0
smsgno: 0 sv_trace: 0
nwindicator: 0 timestamp: 0
devinfo: []
hpan: []
fld_flags: 00000000000000000000
sys_msg_no: 0
msgsnd_w_retry [dst task: COMMSINT, time: 24/02/2020 10:00:02.0468]: Msg sent to queue 1293844488
I've tried using the filter I created but it doesn't work and just inserts every single line into the elastic like this.
And this is the logstash file
input {
beats {
port => "5044"
} } filter { ruby { code => " event.set('msgfilter', event.get('message').scan(/\s{7}[a-zA-Z0-9]+(?=\r\n|\s*\r\nReceived)/) ) " id => "ruby-counter"
}
# merge array of strings
mutate { join => { "msgfilter" => "" } }
# remove spaces
mutate { gsub => [ "msgfilter", " " , "" ] } } output {
elasticsearch {
hosts => [ "localhost:9200" ]
} }
I want the result looks like this, I want to take the red box part and combined it into one line.
Result I want
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您可以尝试使用此正则提取所有字节:
^(?:[\ da-f] {2} \。?){1,} {7,}([\ w \ s] 16})$
you may try to extract all the bytes with this regex:
^(?:[\dA-F]{2}\.?){1,} {7,}([\w\s]{1,16})$