- Logstash
- Logstash - 入门示例
- 入门示例 - 下载安装
- 入门示例 - hello world
- 入门示例 - 配置语法
- 入门示例 - plugin的安装
- 入门示例 - 长期运行
- Logstash - 插件配置
- 插件配置 - input配置
- input配置 - file
- input配置 - stdin
- input配置 - syslog
- input配置 - tcp
- 插件配置 - codec配置
- codec配置 - json
- codec配置 - multiline
- codec配置 - collectd
- codec配置 - netflow
- 插件配置 - filter配置
- filter配置 - date
- filter配置 - grok
- filter配置 - dissect
- filter配置 - geoip
- filter配置 - json
- filter配置 - kv
- filter配置 - metrics
- filter配置 - mutate
- filter配置 - ruby
- filter配置 - split
- filter配置 - elapsed
- 插件配置 - output配置
- output配置 - elasticsearch
- output配置 - email
- output配置 - exec
- output配置 - file
- output配置 - nagios
- output配置 - statsd
- output配置 - stdout
- output配置 - tcp
- output配置 - hdfs
- Logstash - 场景示例
- 场景示例 - nginx访问日志
- 场景示例 - nginx错误日志
- 场景示例 - postfix日志
- 场景示例 - ossec日志
- 场景示例 - windows系统日志
- 场景示例 - Java日志
- 场景示例 - MySQL慢查询日志
- Logstash - 性能与测试
- 性能与测试 - generator方式
- 性能与测试 - 监控方案
- 监控方案 - logstash-input-heartbeat方式
- 监控方案 - jmx启动参数方式
- 监控方案 - API方式
- Logstash - 扩展方案
- 扩展方案 - 通过redis传输
- 扩展方案 - 通过kafka传输
- 扩展方案 - AIX 平台上的logstash-forwarder-java
- 扩展方案 - rsyslog
- 扩展方案 - nxlog
- 扩展方案 - heka
- 扩展方案 - fluent
- 扩展方案 - Message::Passing
- Logstash - 源码解析
- 源码解析 - pipeline流程
- 源码解析 - Event的生成
- Logstash - 插件开发
- 插件开发 - utmp插件示例
- Beats
- Beats - filebeat
- Beats - packetbeat网络流量分析
- Beats - metricbeat
- Beats - winlogbeat
- ElasticSearch
- ElasticSearch - 架构原理
- 架构原理 - segment、buffer和translog对实时性的影响
- 架构原理 - segment merge对写入性能的影响
- 架构原理 - routing和replica的读写过程
- 架构原理 - shard的allocate控制
- 架构原理 - 自动发现的配置
- ElasticSearch - 接口使用示例
- 接口使用示例 - 增删改查操作
- 接口使用示例 - 搜索请求
- 接口使用示例 - Painless脚本
- 接口使用示例 - reindex接口
- ElasticSearch - 性能优化
- 性能优化 - bulk提交
- 性能优化 - gateway配置
- 性能优化 - 集群状态维护
- 性能优化 - 缓存
- 性能优化 - fielddata
- 性能优化 - curator工具
- 性能优化 - profile接口
- ElasticSearch - rally测试方案
- ElasticSearch - 多集群互联
- ElasticSearch - 别名的应用
- ElasticSearch - 映射与模板的定制
- ElasticSearch - puppet-elasticsearch模块的使用
- ElasticSearch - 计划内停机升级的操作流程
- ElasticSearch - 镜像备份
- ElasticSearch - rollover和shrink
- ElasticSearch - Ingest节点
- ElasticSearch - Hadoop 集成
- Hadoop 集成 - spark streaming交互
- ElasticSearch - 权限管理
- 权限管理 - Shield
- 权限管理 - Search-Guard 在 Elasticsearch 2.x 上的运用
- ElasticSearch - 监控方案
- 监控方案 - 监控相关接口
- 监控相关接口 - 集群健康状态
- 监控相关接口 - 节点状态
- 监控相关接口 - 索引状态
- 监控相关接口 - 任务管理
- 监控相关接口 - cat 接口的命令行使用
- 监控方案 - 日志记录
- 监控方案 - 实时bigdesk方案
- 监控方案 - cerebro
- 监控方案 - zabbix trapper方案
- ElasticSearch - ES在运维监控领域的其他玩法
- ES在运维监控领域的其他玩法 - percolator接口
- ES在运维监控领域的其他玩法 - watcher报警
- ES在运维监控领域的其他玩法 - ElastAlert
- ES在运维监控领域的其他玩法 - 时序数据库
- ES在运维监控领域的其他玩法 - Grafana
- ES在运维监控领域的其他玩法 - juttle
- ES在运维监控领域的其他玩法 - Etsy的Kale异常检测
- Kibana 5
- Kibana 5 - 安装、配置和运行
- Kibana 5 - 生产环境部署
- Kibana 5 - discover功能
- Kibana 5 - 各visualize功能
- 各visualize功能 - area
- 各visualize功能 - table
- 各visualize功能 - line
- 各visualize功能 - markdown
- 各visualize功能 - metric
- 各visualize功能 - pie
- 各visualize功能 - tile map
- 各visualize功能 - vertical bar
- Kibana 5 - dashboard功能
- Kibana 5 - timelion 介绍
- Kibana 5 - console 介绍
- Kibana 5 - setting功能
- Kibana 5 - 常用sub agg示例
- 常用sub agg示例 - 函数堆栈链分析
- 常用sub agg示例 - 分图统计
- 常用sub agg示例 - TopN的时序趋势图
- 常用sub agg示例 - 响应时间的百分占比趋势图
- 常用sub agg示例 - 响应时间的概率分布在不同时段的相似度对比
- Kibana 5 - 源码解析
- 源码解析 - .kibana索引的数据结构
- 源码解析 - 主页入口
- 源码解析 - discover解析
- 源码解析 - visualize解析
- 源码解析 - dashboard解析
- Kibana 5 - 插件
- 插件 - 可视化开发示例
- 插件 - 后端开发示例
- 插件 - 完整app开发示例
- Kibana 5 - Kibana报表
- 竞品对比
Logstash - 插件开发
前面已经提过在运行 logstash 的时候,可以通过 --pluginpath
参数来加载自己写的插件。那么,插件又该怎么写呢?
插件格式
一个标准的 logstash 输入插件格式如下:
require 'logstash/namespace'
require 'logstash/inputs/base'
class LogStash::Inputs::MyPlugin < LogStash::Inputs::Base
config_name 'myplugin'
default :codec, "line"
config :myoption_key, :validate => :string, :default => 'myoption_value'
public def register
end
public def run(queue)
end
end
其中大多数语句在过滤器和输出阶段是共有的。
- config_name 用来定义该插件写在 logstash 配置文件里的名字;
- config 可以定义很多个,即该插件在 logstash 配置文件中的可配置参数。logstash 很温馨的提供了验证方法,确保接收的数据是你期望的数据类型;
- register logstash 在启动的时候运行的函数,一些需要常驻内存的数据,可以在这一步先完成。比如对象初始化,filters/ruby 插件中的
init
语句等。
插件的关键方法
输入插件独有的是 run 方法。在 run 方法中,必须实现一个长期运行的程序(最简单的就是 loop 指令)。然后在每次收到数据并处理成 event
,这段示例在上一节演示 Logstash::Event 的生成时已经介绍过。最后,一定要调用 queue << event
语句。一个输入流程就算是完成了。
而如果是过滤器插件,对应修改成:
require 'logstash/filters/base'
class LogStash::Filters::MyPlugin < LogStash::Filters::Base
config_name 'myplugin'
public def register
end
public def filter(event)
filter_matched(event)
end
end
其中,filter_matched
是在 filter 函数完成本插件自己的处理逻辑之后一定要调用的。
输出插件则是:
require 'logstash/outputs/base'
class LogStash::Outputs::MyPlugin < LogStash::Outputs::Base
config_name 'myplugin'
concurrency :single
public def register
end
public def multi_receive(events)
end
end
这里和过去的版本最明显的差别是,处理方法改成了 multi_receive
,而不是 receive
。因为新的 pipeline 机制是批量传递数据给输出插件的。不过为了兼容过去的插件,LogStash::Outputs::Base
基类中的 multi_receive
实现继续迭代调用了 receive
。
另一个是新出现的配置 concurrency
,代表着本插件是否 threadsafe,并由此取代了过去的 workers 选项。可选项为:single
和 shared
。
- single 表示本插件是非线程安全的,必须在各 pipeline workers 之间同一时刻只有一个运行。
- shared 表示本插件是线程安全的,每个 pipeline workers 之间可以独立运行,这也就意味着插件作者要自己在
multi_receive
里调用 Mutexes。
推荐阅读
插件打包
Logstash 从 1.5.0-GA 版开始,对插件规范做了重大变更。放弃了 milestone 定义,去除了 --pluginpath
命令行参数。统一改成 bin/plugin
管理的 rubygem 包插件。那么,我们自己写的 Logstash 插件,也同样需要适应这个新规则,写完 Ruby 代码,还要打包成 gem 才能使用。
为了我们更方便的完成工作,logstash 针对 4 种插件形态提供了 4 个示例库,可以按照自己所需克隆使用。比如要写一个 logstash-filter-mything 插件:
git clone https://github.com/logstash-plugins/logstash-filter-example
cd logstash-filter-example
mv logstash-filter-example.gemspec logstash-filter-mything.gemspec
mv lib/logstash/filters/example.rb lib/logstash/filters/mything.rb
mv spec/filters/example_spec.rb spec/filters/mything_spec.rb
然后把代码写在 lib/logstash/filters/mything.rb
里即可。
代码部分完成。然后就是定义 gem 打包需要的额外文件和库依赖了。
目录中有两个文件,Gemfile
和 logstash-filter-mything.gemspec
。
Gemfile
文件就是标准格式,用来运行 bundler install
时下载 rubygems 包的。默认情况下,最基础的内容是:
source 'https://rubygems.org' # 国内建议改成 'http://ruby.taobao.org'
gemspec
gem "logstash", :github => "elastic/logstash", :branch => "1.5"
gemspec
文件则是用来定义软件包本身规范,不单限于 rubygems。示例如下:
Gem::Specification.new do |s|
s.name = 'logstash-filter-mything'
s.version = '1.1.0'
s.licenses = ['Apache License (2.0)']
s.summary = "This mything filter is just for Elastic Stack Guide example"
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
s.authors = ["Chenryn"]
s.email = 'chenlin7@staff.sina.com.cn'
s.homepage = "http://kibana.logstash.es"
s.require_paths = ["lib"]
s.files = `find . -type f ! -wholename '*.svn*'`.split($)
s.test_files = s.files.grep(%r{^(test|spec|features)/})
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "filter" }
s.requirements << "jar 'org.elasticsearch:elasticsearch', '1.4.0'"
s.add_runtime_dependency "jar-dependencies"
s.add_runtime_dependency "logstash-core", '>= 1.4.0', '< 2.0.0'
s.add_development_dependency 'logstash-devutils'
end
其中:
s.version
就是 milestone 的替代品,0.1.x 相当于是 milestone 0;0.9.x 相当于是 milestone 2;1.x.x 相当于是 milestone 3。s.file
默认写法是git ls-files
,因为默认是 git 库,如果你本身采用了 svn,或者 cvs 库,都不要紧,只要命令列出的是你需要打包进去的文件即可。s.metadata
是 logstash 的 plugin 命令在 install 的时候会提前 verify 的特殊信息,一定要保留。s.add_runtime_dependency
是定义插件依赖库的指令。如果有 jar 包依赖,则额外再加s.requirements
。
好了,全部完毕。下面打包:
gem build logstash-filter-mything.gemspec
运行完就会生成一个 logstash-filter-mything-1.1.0.gem 软件包,可以安装使用了。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论