- Logstash
- Logstash - 入门示例
- 入门示例 - 下载安装
- 入门示例 - hello world
- 入门示例 - 配置语法
- 入门示例 - plugin的安装
- 入门示例 - 长期运行
- Logstash - 插件配置
- 插件配置 - input配置
- input配置 - file
- input配置 - stdin
- input配置 - syslog
- input配置 - tcp
- 插件配置 - codec配置
- codec配置 - json
- codec配置 - multiline
- codec配置 - collectd
- codec配置 - netflow
- 插件配置 - filter配置
- filter配置 - date
- filter配置 - grok
- filter配置 - dissect
- filter配置 - geoip
- filter配置 - json
- filter配置 - kv
- filter配置 - metrics
- filter配置 - mutate
- filter配置 - ruby
- filter配置 - split
- filter配置 - elapsed
- 插件配置 - output配置
- output配置 - elasticsearch
- output配置 - email
- output配置 - exec
- output配置 - file
- output配置 - nagios
- output配置 - statsd
- output配置 - stdout
- output配置 - tcp
- output配置 - hdfs
- Logstash - 场景示例
- 场景示例 - nginx访问日志
- 场景示例 - nginx错误日志
- 场景示例 - postfix日志
- 场景示例 - ossec日志
- 场景示例 - windows系统日志
- 场景示例 - Java日志
- 场景示例 - MySQL慢查询日志
- Logstash - 性能与测试
- 性能与测试 - generator方式
- 性能与测试 - 监控方案
- 监控方案 - logstash-input-heartbeat方式
- 监控方案 - jmx启动参数方式
- 监控方案 - API方式
- Logstash - 扩展方案
- 扩展方案 - 通过redis传输
- 扩展方案 - 通过kafka传输
- 扩展方案 - AIX 平台上的logstash-forwarder-java
- 扩展方案 - rsyslog
- 扩展方案 - nxlog
- 扩展方案 - heka
- 扩展方案 - fluent
- 扩展方案 - Message::Passing
- Logstash - 源码解析
- 源码解析 - pipeline流程
- 源码解析 - Event的生成
- Logstash - 插件开发
- 插件开发 - utmp插件示例
- Beats
- Beats - filebeat
- Beats - packetbeat网络流量分析
- Beats - metricbeat
- Beats - winlogbeat
- ElasticSearch
- ElasticSearch - 架构原理
- 架构原理 - segment、buffer和translog对实时性的影响
- 架构原理 - segment merge对写入性能的影响
- 架构原理 - routing和replica的读写过程
- 架构原理 - shard的allocate控制
- 架构原理 - 自动发现的配置
- ElasticSearch - 接口使用示例
- 接口使用示例 - 增删改查操作
- 接口使用示例 - 搜索请求
- 接口使用示例 - Painless脚本
- 接口使用示例 - reindex接口
- ElasticSearch - 性能优化
- 性能优化 - bulk提交
- 性能优化 - gateway配置
- 性能优化 - 集群状态维护
- 性能优化 - 缓存
- 性能优化 - fielddata
- 性能优化 - curator工具
- 性能优化 - profile接口
- ElasticSearch - rally测试方案
- ElasticSearch - 多集群互联
- ElasticSearch - 别名的应用
- ElasticSearch - 映射与模板的定制
- ElasticSearch - puppet-elasticsearch模块的使用
- ElasticSearch - 计划内停机升级的操作流程
- ElasticSearch - 镜像备份
- ElasticSearch - rollover和shrink
- ElasticSearch - Ingest节点
- ElasticSearch - Hadoop 集成
- Hadoop 集成 - spark streaming交互
- ElasticSearch - 权限管理
- 权限管理 - Shield
- 权限管理 - Search-Guard 在 Elasticsearch 2.x 上的运用
- ElasticSearch - 监控方案
- 监控方案 - 监控相关接口
- 监控相关接口 - 集群健康状态
- 监控相关接口 - 节点状态
- 监控相关接口 - 索引状态
- 监控相关接口 - 任务管理
- 监控相关接口 - cat 接口的命令行使用
- 监控方案 - 日志记录
- 监控方案 - 实时bigdesk方案
- 监控方案 - cerebro
- 监控方案 - zabbix trapper方案
- ElasticSearch - ES在运维监控领域的其他玩法
- ES在运维监控领域的其他玩法 - percolator接口
- ES在运维监控领域的其他玩法 - watcher报警
- ES在运维监控领域的其他玩法 - ElastAlert
- ES在运维监控领域的其他玩法 - 时序数据库
- ES在运维监控领域的其他玩法 - Grafana
- ES在运维监控领域的其他玩法 - juttle
- ES在运维监控领域的其他玩法 - Etsy的Kale异常检测
- Kibana 5
- Kibana 5 - 安装、配置和运行
- Kibana 5 - 生产环境部署
- Kibana 5 - discover功能
- Kibana 5 - 各visualize功能
- 各visualize功能 - area
- 各visualize功能 - table
- 各visualize功能 - line
- 各visualize功能 - markdown
- 各visualize功能 - metric
- 各visualize功能 - pie
- 各visualize功能 - tile map
- 各visualize功能 - vertical bar
- Kibana 5 - dashboard功能
- Kibana 5 - timelion 介绍
- Kibana 5 - console 介绍
- Kibana 5 - setting功能
- Kibana 5 - 常用sub agg示例
- 常用sub agg示例 - 函数堆栈链分析
- 常用sub agg示例 - 分图统计
- 常用sub agg示例 - TopN的时序趋势图
- 常用sub agg示例 - 响应时间的百分占比趋势图
- 常用sub agg示例 - 响应时间的概率分布在不同时段的相似度对比
- Kibana 5 - 源码解析
- 源码解析 - .kibana索引的数据结构
- 源码解析 - 主页入口
- 源码解析 - discover解析
- 源码解析 - visualize解析
- 源码解析 - dashboard解析
- Kibana 5 - 插件
- 插件 - 可视化开发示例
- 插件 - 后端开发示例
- 插件 - 完整app开发示例
- Kibana 5 - Kibana报表
- 竞品对比
源码解析 - Event的生成
Logstash 中 Event 的生成
上一节大家可能注意到了,整个 pipeline 非常简单,无非就是一个多线程的线程间数据读写。但是,之前介绍的 codec 在哪里?这个问题,并不在 pipeline 中完成,而是 plugin 中。
Logstash 从 1.5 开始,把各个 plugin 拆分成了单独的 gem,主代码里只留下了几个 base.rb
类。所以,要了解详细情况,我们需要阅读一个实际跑数据的插件,比如 vendor/bundle/jruby/1.9/gems/logstash-input-stdin-3.2.0/lib/logstash/inputs/stdin.rb
。
可以看到其中最关键的读取数据部分代码如下:
@host = Socket.gethostname
while !stop?
if data = stdin_read
@codec.decode(data) do |event|
decorate(event)
event.set("host", @host) if !event.include?("host")
queue << event
end
end
这里两个关键函数:@codec.decode(line)
和 decorate(event)
。
@codec 在 stdin.rb
中默认为 line,那么我们就继续看 vendor/bundle/jruby/1.9/gems/logstash-codec-line-3.0.2/lib/logstash/codecs/line.rb
的相关部分:
def register
require "logstash/util/buftok"
@buffer = FileWatch::BufferedTokenizer.new(@delimiter)
@converter = LogStash::Util::Charset.new(@charset)
@converter.logger = @logger
end
public
def decode(data)
@buffer.extract(data).each do |line|
yield LogStash::Event.new("message" => @converter.convert(line))
end
end # def decode
超简短。就是在这个 @codec.decode(data)
里,生成了 LogStash::Event
对象。那么,我们通过 output { codec => rubydebug }
看到的除了 message 字段以外的那些数据,又是怎么来的呢?尤其是那个 @timestamp
是怎么出来的?
在 5.0 之前,我们可以通过 lib/logstash/event.rb
看到相关属性的定义和操作。5.0 之后,Logstash 为了提高性能,对 Event 部分采用 Java 语言进行了重构,现在你在 logstash-core-event-java/lib/logstash/event.rb
里只能看到通过 JRuby 的专属 require 指令加载 jar 的语句了。
想要了解 Logstash::Event 的实际定义,需要去 Git 仓库下载,然后阅读 Java 源代码,你也可以直接通过网页阅读,地址是:https://github.com/elastic/logstash/blob/master/logstash-core-event-java/src/main/java/org/logstash/Event.java:
public static final String METADATA = "@metadata";
public static final String METADATA_BRACKETS = "[" + METADATA + "]";
public static final String TIMESTAMP = "@timestamp";
public static final String TIMESTAMP_FAILURE_TAG = "_timestampparsefailure";
public static final String TIMESTAMP_FAILURE_FIELD = "_@timestamp";
public static final String VERSION = "@version";
public static final String VERSION_ONE = "1";
public Event()
{
this.metadata = new HashMap<String, Object>();
this.data = new HashMap<String, Object>();
this.data.put(VERSION, VERSION_ONE);
this.cancelled = false;
this.timestamp = new Timestamp();
this.data.put(TIMESTAMP, this.timestamp);
this.accessors = new Accessors(this.data);
this.metadata_accessors = new Accessors(this.metadata);
}
现在就清楚了,这个特殊的 @timestamp
是在 event 对象初始化的时候加上的,其实现同样在这个 Java 源码中,见https://github.com/elastic/logstash/blob/master/logstash-core-event-java/src/main/java/org/logstash/Timestamp.java:
public class Timestamp implements Cloneable {
private DateTime time;
public Timestamp() {
this.time = new DateTime(DateTimeZone.UTC);
}
}
这就是我们看到 Logstash 生成的事件总是 UTC 时区时间的原因。
至于如果一开始就传入了 @timestamp
数据的处理,则是这样:
public Timestamp(String iso8601) {
this.time = ISODateTimeFormat.dateTimeParser().parseDateTime(iso8601).toDateTime(DateTimeZone.UTC);
}
public Timestamp(long epoch_milliseconds) {
this.time = new DateTime(epoch_milliseconds, DateTimeZone.UTC);
}
同样会利用 joda 库做一次解析,还是转换成 UTC 时区。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论