- Logstash
- Logstash - 入门示例
- 入门示例 - 下载安装
- 入门示例 - hello world
- 入门示例 - 配置语法
- 入门示例 - plugin的安装
- 入门示例 - 长期运行
- Logstash - 插件配置
- 插件配置 - input配置
- input配置 - file
- input配置 - stdin
- input配置 - syslog
- input配置 - tcp
- 插件配置 - codec配置
- codec配置 - json
- codec配置 - multiline
- codec配置 - collectd
- codec配置 - netflow
- 插件配置 - filter配置
- filter配置 - date
- filter配置 - grok
- filter配置 - dissect
- filter配置 - geoip
- filter配置 - json
- filter配置 - kv
- filter配置 - metrics
- filter配置 - mutate
- filter配置 - ruby
- filter配置 - split
- filter配置 - elapsed
- 插件配置 - output配置
- output配置 - elasticsearch
- output配置 - email
- output配置 - exec
- output配置 - file
- output配置 - nagios
- output配置 - statsd
- output配置 - stdout
- output配置 - tcp
- output配置 - hdfs
- Logstash - 场景示例
- 场景示例 - nginx访问日志
- 场景示例 - nginx错误日志
- 场景示例 - postfix日志
- 场景示例 - ossec日志
- 场景示例 - windows系统日志
- 场景示例 - Java日志
- 场景示例 - MySQL慢查询日志
- Logstash - 性能与测试
- 性能与测试 - generator方式
- 性能与测试 - 监控方案
- 监控方案 - logstash-input-heartbeat方式
- 监控方案 - jmx启动参数方式
- 监控方案 - API方式
- Logstash - 扩展方案
- 扩展方案 - 通过redis传输
- 扩展方案 - 通过kafka传输
- 扩展方案 - AIX 平台上的logstash-forwarder-java
- 扩展方案 - rsyslog
- 扩展方案 - nxlog
- 扩展方案 - heka
- 扩展方案 - fluent
- 扩展方案 - Message::Passing
- Logstash - 源码解析
- 源码解析 - pipeline流程
- 源码解析 - Event的生成
- Logstash - 插件开发
- 插件开发 - utmp插件示例
- Beats
- Beats - filebeat
- Beats - packetbeat网络流量分析
- Beats - metricbeat
- Beats - winlogbeat
- ElasticSearch
- ElasticSearch - 架构原理
- 架构原理 - segment、buffer和translog对实时性的影响
- 架构原理 - segment merge对写入性能的影响
- 架构原理 - routing和replica的读写过程
- 架构原理 - shard的allocate控制
- 架构原理 - 自动发现的配置
- ElasticSearch - 接口使用示例
- 接口使用示例 - 增删改查操作
- 接口使用示例 - 搜索请求
- 接口使用示例 - Painless脚本
- 接口使用示例 - reindex接口
- ElasticSearch - 性能优化
- 性能优化 - bulk提交
- 性能优化 - gateway配置
- 性能优化 - 集群状态维护
- 性能优化 - 缓存
- 性能优化 - fielddata
- 性能优化 - curator工具
- 性能优化 - profile接口
- ElasticSearch - rally测试方案
- ElasticSearch - 多集群互联
- ElasticSearch - 别名的应用
- ElasticSearch - 映射与模板的定制
- ElasticSearch - puppet-elasticsearch模块的使用
- ElasticSearch - 计划内停机升级的操作流程
- ElasticSearch - 镜像备份
- ElasticSearch - rollover和shrink
- ElasticSearch - Ingest节点
- ElasticSearch - Hadoop 集成
- Hadoop 集成 - spark streaming交互
- ElasticSearch - 权限管理
- 权限管理 - Shield
- 权限管理 - Search-Guard 在 Elasticsearch 2.x 上的运用
- ElasticSearch - 监控方案
- 监控方案 - 监控相关接口
- 监控相关接口 - 集群健康状态
- 监控相关接口 - 节点状态
- 监控相关接口 - 索引状态
- 监控相关接口 - 任务管理
- 监控相关接口 - cat 接口的命令行使用
- 监控方案 - 日志记录
- 监控方案 - 实时bigdesk方案
- 监控方案 - cerebro
- 监控方案 - zabbix trapper方案
- ElasticSearch - ES在运维监控领域的其他玩法
- ES在运维监控领域的其他玩法 - percolator接口
- ES在运维监控领域的其他玩法 - watcher报警
- ES在运维监控领域的其他玩法 - ElastAlert
- ES在运维监控领域的其他玩法 - 时序数据库
- ES在运维监控领域的其他玩法 - Grafana
- ES在运维监控领域的其他玩法 - juttle
- ES在运维监控领域的其他玩法 - Etsy的Kale异常检测
- Kibana 5
- Kibana 5 - 安装、配置和运行
- Kibana 5 - 生产环境部署
- Kibana 5 - discover功能
- Kibana 5 - 各visualize功能
- 各visualize功能 - area
- 各visualize功能 - table
- 各visualize功能 - line
- 各visualize功能 - markdown
- 各visualize功能 - metric
- 各visualize功能 - pie
- 各visualize功能 - tile map
- 各visualize功能 - vertical bar
- Kibana 5 - dashboard功能
- Kibana 5 - timelion 介绍
- Kibana 5 - console 介绍
- Kibana 5 - setting功能
- Kibana 5 - 常用sub agg示例
- 常用sub agg示例 - 函数堆栈链分析
- 常用sub agg示例 - 分图统计
- 常用sub agg示例 - TopN的时序趋势图
- 常用sub agg示例 - 响应时间的百分占比趋势图
- 常用sub agg示例 - 响应时间的概率分布在不同时段的相似度对比
- Kibana 5 - 源码解析
- 源码解析 - .kibana索引的数据结构
- 源码解析 - 主页入口
- 源码解析 - discover解析
- 源码解析 - visualize解析
- 源码解析 - dashboard解析
- Kibana 5 - 插件
- 插件 - 可视化开发示例
- 插件 - 后端开发示例
- 插件 - 完整app开发示例
- Kibana 5 - Kibana报表
- 竞品对比
扩展方案 - rsyslog
Rsyslog 是 RHEL6 开始的默认系统 syslog 应用软件(当然,RHEL 自带的版本较低,实际官方稳定版本已经到 v8 了)。官网地址:http://www.rsyslog.com
目前 Rsyslog 本身也支持多种输入输出方式,内部逻辑判断和模板处理。
常用模块介绍
不同模块插件在 rsyslog 流程中发挥作用的原理,可以阅读:http://www.rsyslog.com/doc/master/configuration/modules/workflow.html
流程中可以使用 mmnormalize 组件来完成数据的切分(相当于 logstash 的 filters/grok 功能)。
rsyslog 从 v7 版本开始带有 omelasticsearch 插件可以直接写入数据到 elasticsearch 集群,配合 mmnormalize 的使用示例见: http://puppetlabs.com/blog/use-rsyslog-and-elasticsearch-powerful-log-aggregation
而 normalize 语法说明见: http://www.liblognorm.com/files/manual/index.html?sampledatabase.htm
类似的还有 mmfields 和 mmjsonparse 组件。注意,mmjsonparse 要求被解析的 MSG 必须以 @CEE:
开头,解析之后的字符串为 JSON。使用示例见:http://blog.sematext.com/2013/05/28/structured-logging-with-rsyslog-and-elasticsearch/
此外,rsyslog 从 v6 版本开始,设计了一套 rainerscript 作为配置中的 DSL。利用 rainerscript 中的函数,也可以做到一些数据解析和逻辑判断:
- tolower
- cstr
- cnum
- wrap
- replace
- field
- re_extract
- re_match
- contains
- if-else
- foreach
- lookup
- set/reset/unset
详细说明见:http://www.rsyslog.com/doc/v8-stable/rainerscript/functions.html
rsyslog 与 logstash 合作
虽然 Rsyslog 很早就支持直接输出数据给 elasticsearch,但如果你使用的是 v8.4 以下的版本,我们这里并不推荐这种方式。因为normalize 语法还是比较简单,只支持时间,字符串,数字,ip 地址等几种。在复杂条件下远比不上完整的正则引擎。
那么,怎么使用 rsyslog 作为日志收集和传输组件,来配合 logstash 工作呢?
如果只是简单的 syslog 数据,直接单个 logstash 运行即可,配置方式见本书 2.4 章节。
如果你运行着一个高负荷运行的 rsyslog 系统,每秒传输的数据远大过单个 logstash 能处理的能力,你可以运行多个 logstash 在多个端口,然后让 rsyslog 做轮训转发(事实上,单个 omfwd 本身的转发能力也有限,所以推荐这种做法):
Ruleset( name="forwardRuleSet" ) {
Action ( type="mmsequence" mode="instance" from="0" to="4" var="$.seq" )
if $.seq == "0" then {
action (type="omfwd" Target="127.0.0.1" Port="5140" Protocol="tcp" queue.size="150000" queue.dequeuebatchsize="2000" )
}
if $.seq == "1" then {
action (type="omfwd" Target="127.0.0.1" Port="5141" Protocol="tcp" queue.size="150000" queue.dequeuebatchsize="2000" )
}
if $.seq == "2" then {
action (type="omfwd" Target="127.0.0.1" Port="5142" Protocol="tcp" queue.size="150000" queue.dequeuebatchsize="2000" )
}
if $.seq == "3" then {
action (type="omfwd" Target="127.0.0.1" Port="5143" Protocol="tcp" queue.size="150000" queue.dequeuebatchsize="2000" )
}
}
如果 rsyslog 仅是作为 shipper 角色运行,环境中有单独的消息队列可用,rsyslog 也有对应的 omkafka, omredis, omzmq 插件可用。
rsyslog v8 版的 mmexternal 模块
如果你使用的是 v8.4 及以上版本的 rsyslog,其中有一个新加入的 mmexternal 模块。该模块是在 v7 的 omprog 模块基础上发展出来的,可以让你使用任意脚本,接收标准输入,自行处理以后再输出回来,而 rsyslog 接收到这个输出再进行下一步处理,这就解决了前面提到的 “normalize 语法太简单”的问题!
下面是使用 rsyslog 的 mmexternal 和 omelasticsearch 完成 Nginx 访问日志直接解析存储的配置。
rsyslog 配置如下:
module(load="imuxsock" SysSock.RateLimit.Interval="0")
module(load="mmexternal")
module(load="omelasticsearch")
template(name="logstash-index" type="list") {
constant(value="logstash-")
property(name="timereported" dateFormat="rfc3339" position.from="1" position.to="4")
constant(value=".")
property(name="timereported" dateFormat="rfc3339" position.from="6" position.to="7")
constant(value=".")
property(name="timereported" dateFormat="rfc3339" position.from="9" position.to="10")
}
template( name="nginx-log" type="string" string="%msg%n" )
if ( $syslogfacility-text == 'local6' and $programname startswith 'wb-www-access-' and not ($msg contains '/2/remind/unread_count' or $msg contains '/2/remind/group_unread') ) then
{
action( type="mmexternal" binary="/usr/local/bin/rsyslog-nginx-elasticsearch.py" interface.input="fulljson" forcesingleinstance="on" )
action( type="omelasticsearch"
template="nginx-log"
server="eshost.example.com"
bulkmode="on"
dynSearchIndex="on"
searchIndex="logstash-index"
searchType="nginxaccess"
queue.type="linkedlist"
queue.size="50000"
queue.dequeuebatchsize="5000"
queue.dequeueslowdown="100000"
)
stop
}
其中调用的 python 脚本示例如下(注意只是做示例,脚本中的 split 功能其实可以用 rsyslog 的 mmfields 插件完成):
#! /usr/bin/python
import sys
import json
import datetime
def nginxLog(data):
hostname = data['hostname']
logline = data['msg']
time_local, http_x_up_calling_line_id, request, http_user_agent, staTus, remote_addr, http_x_log_uid, http_referer, request_time, body_bytes_sent, http_x_forwarded_proto, http_x_forwarded_for, request_uid, http_host, http_cookie, upstream_response_time = logline.split('`')
try:
upstream_response_time = float(upstream_response_time)
except:
upstream_response_time = None
method, uri, verb = request.split(' ')
arg = {}
try:
url_path, url_args = uri.split('?')
for args in url_args.split('&'):
k, v = args.split('=')
arg[k] = v
except:
url_path = uri
# Why %z do not implement?
ret = {
"@timestamp": datetime.datetime.strptime(time_local, ' [%d/%b/%Y:%H:%M:%S +0800]').strftime('%FT%T+0800'),
"host": hostname,
"method": method.lstrip('"'),
"url_path": url_path,
"url_args": arg,
"verb": verb.rstrip('"'),
"http_x_up_calling_line_id": http_x_up_calling_line_id,
"http_user_agent": http_user_agent,
"status": int(staTus),
"remote_addr": remote_addr.strip('[]'),
"http_x_log_uid": http_x_log_uid,
"http_referer": http_referer,
"request_time": float(request_time),
"body_bytes_sent": int(body_bytes_sent),
"http_x_forwarded_proto": http_x_forwarded_proto,
"http_x_forwarded_for": http_x_forwarded_for,
"request_uid": request_uid,
"http_host": http_host,
"http_cookie": http_cookie,
"upstream_response_time": upstream_response_time
}
return ret
def onInit():
""" Do everything that is needed to initialize processing
"""
def onReceive(msg):
data = json.loads(msg)
ret = nginxLog(data)
print json.dumps({'msg': ret})
def onExit():
""" Do everything that is needed to finish processing. This is being called immediately before exiting.
"""
# most often, nothing to do here
onInit()
keepRunning = 1
while keepRunning == 1:
msg = sys.stdin.readline()
if msg:
msg = msg[:len(msg)-1]
onReceive(msg)
sys.stdout.flush()
else:
keepRunning = 0
onExit()
sys.stdout.flush()
注意输出的时候,顶层的 key 是不能变的,msg 还得叫 msg,如果是 hostname 还得叫 hostname ,等等。否则,rsyslog 会当做处理无效,直接传递原有数据内容给下一步。
慎用提示
mmexternal 是基于 direct mode 的,所以如果你发送的数据量较大时,rsyslog 并不会像 linkedlist mode 那样缓冲在磁盘队列上,而是持续 fork 出新的 mmexternal 程序,几千个进程后,你的服务器就挂了!!所以,务必开启 forcesingleinstance
选项。
rsyslog 的 mmgrok 模块
Rsyslog 8.15.0 开始,附带了 mmgrok 模块,系笔者贡献。利用该模块,可以将 Logstash 的 Grok 规则,运用在 Rsyslog 中。欢迎有兴趣的读者试用。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论