- Logstash
- Logstash - 入门示例
- 入门示例 - 下载安装
- 入门示例 - hello world
- 入门示例 - 配置语法
- 入门示例 - plugin的安装
- 入门示例 - 长期运行
- Logstash - 插件配置
- 插件配置 - input配置
- input配置 - file
- input配置 - stdin
- input配置 - syslog
- input配置 - tcp
- 插件配置 - codec配置
- codec配置 - json
- codec配置 - multiline
- codec配置 - collectd
- codec配置 - netflow
- 插件配置 - filter配置
- filter配置 - date
- filter配置 - grok
- filter配置 - dissect
- filter配置 - geoip
- filter配置 - json
- filter配置 - kv
- filter配置 - metrics
- filter配置 - mutate
- filter配置 - ruby
- filter配置 - split
- filter配置 - elapsed
- 插件配置 - output配置
- output配置 - elasticsearch
- output配置 - email
- output配置 - exec
- output配置 - file
- output配置 - nagios
- output配置 - statsd
- output配置 - stdout
- output配置 - tcp
- output配置 - hdfs
- Logstash - 场景示例
- 场景示例 - nginx访问日志
- 场景示例 - nginx错误日志
- 场景示例 - postfix日志
- 场景示例 - ossec日志
- 场景示例 - windows系统日志
- 场景示例 - Java日志
- 场景示例 - MySQL慢查询日志
- Logstash - 性能与测试
- 性能与测试 - generator方式
- 性能与测试 - 监控方案
- 监控方案 - logstash-input-heartbeat方式
- 监控方案 - jmx启动参数方式
- 监控方案 - API方式
- Logstash - 扩展方案
- 扩展方案 - 通过redis传输
- 扩展方案 - 通过kafka传输
- 扩展方案 - AIX 平台上的logstash-forwarder-java
- 扩展方案 - rsyslog
- 扩展方案 - nxlog
- 扩展方案 - heka
- 扩展方案 - fluent
- 扩展方案 - Message::Passing
- Logstash - 源码解析
- 源码解析 - pipeline流程
- 源码解析 - Event的生成
- Logstash - 插件开发
- 插件开发 - utmp插件示例
- Beats
- Beats - filebeat
- Beats - packetbeat网络流量分析
- Beats - metricbeat
- Beats - winlogbeat
- ElasticSearch
- ElasticSearch - 架构原理
- 架构原理 - segment、buffer和translog对实时性的影响
- 架构原理 - segment merge对写入性能的影响
- 架构原理 - routing和replica的读写过程
- 架构原理 - shard的allocate控制
- 架构原理 - 自动发现的配置
- ElasticSearch - 接口使用示例
- 接口使用示例 - 增删改查操作
- 接口使用示例 - 搜索请求
- 接口使用示例 - Painless脚本
- 接口使用示例 - reindex接口
- ElasticSearch - 性能优化
- 性能优化 - bulk提交
- 性能优化 - gateway配置
- 性能优化 - 集群状态维护
- 性能优化 - 缓存
- 性能优化 - fielddata
- 性能优化 - curator工具
- 性能优化 - profile接口
- ElasticSearch - rally测试方案
- ElasticSearch - 多集群互联
- ElasticSearch - 别名的应用
- ElasticSearch - 映射与模板的定制
- ElasticSearch - puppet-elasticsearch模块的使用
- ElasticSearch - 计划内停机升级的操作流程
- ElasticSearch - 镜像备份
- ElasticSearch - rollover和shrink
- ElasticSearch - Ingest节点
- ElasticSearch - Hadoop 集成
- Hadoop 集成 - spark streaming交互
- ElasticSearch - 权限管理
- 权限管理 - Shield
- 权限管理 - Search-Guard 在 Elasticsearch 2.x 上的运用
- ElasticSearch - 监控方案
- 监控方案 - 监控相关接口
- 监控相关接口 - 集群健康状态
- 监控相关接口 - 节点状态
- 监控相关接口 - 索引状态
- 监控相关接口 - 任务管理
- 监控相关接口 - cat 接口的命令行使用
- 监控方案 - 日志记录
- 监控方案 - 实时bigdesk方案
- 监控方案 - cerebro
- 监控方案 - zabbix trapper方案
- ElasticSearch - ES在运维监控领域的其他玩法
- ES在运维监控领域的其他玩法 - percolator接口
- ES在运维监控领域的其他玩法 - watcher报警
- ES在运维监控领域的其他玩法 - ElastAlert
- ES在运维监控领域的其他玩法 - 时序数据库
- ES在运维监控领域的其他玩法 - Grafana
- ES在运维监控领域的其他玩法 - juttle
- ES在运维监控领域的其他玩法 - Etsy的Kale异常检测
- Kibana 5
- Kibana 5 - 安装、配置和运行
- Kibana 5 - 生产环境部署
- Kibana 5 - discover功能
- Kibana 5 - 各visualize功能
- 各visualize功能 - area
- 各visualize功能 - table
- 各visualize功能 - line
- 各visualize功能 - markdown
- 各visualize功能 - metric
- 各visualize功能 - pie
- 各visualize功能 - tile map
- 各visualize功能 - vertical bar
- Kibana 5 - dashboard功能
- Kibana 5 - timelion 介绍
- Kibana 5 - console 介绍
- Kibana 5 - setting功能
- Kibana 5 - 常用sub agg示例
- 常用sub agg示例 - 函数堆栈链分析
- 常用sub agg示例 - 分图统计
- 常用sub agg示例 - TopN的时序趋势图
- 常用sub agg示例 - 响应时间的百分占比趋势图
- 常用sub agg示例 - 响应时间的概率分布在不同时段的相似度对比
- Kibana 5 - 源码解析
- 源码解析 - .kibana索引的数据结构
- 源码解析 - 主页入口
- 源码解析 - discover解析
- 源码解析 - visualize解析
- 源码解析 - dashboard解析
- Kibana 5 - 插件
- 插件 - 可视化开发示例
- 插件 - 后端开发示例
- 插件 - 完整app开发示例
- Kibana 5 - Kibana报表
- 竞品对比
扩展方案 - 通过kafka传输
本节作者:jingbli
Kafka 是一个高吞吐量的分布式发布订阅日志服务,具有高可用、高性能、分布式、高扩展、持久性等特性。目前已经在各大公司中广泛使用。和之前采用 Redis 做轻量级消息队列不同,Kafka 利用磁盘作队列,所以也就无所谓消息缓冲时的磁盘问题。此外,如果公司内部已有 Kafka 服务在运行,logstash 也可以快速接入,免去重复建设的麻烦。
如果打算新建 Kafka 系统的,请参考 Kafka 官方入门文档:http://kafka.apache.org/documentation.html
kafka 基本概念
以下仅对相关基本概念说明,更多概念见官方文档:
- Topic
主题,声明一个主题,producer指定该主题发布消息,订阅该主题的consumer对该主题进行消费 - Partition
每个主题可以分为多个分区,每个分区对应磁盘上一个目录,分区可以分布在不同broker上,producer在发布消息时,可以通过指定partition key映射到对应分区,然后向该分区发布消息,在无partition key情况下,随机选取分区,一段时间内触发一次(比如10分钟),这样就保证了同一个producer向同一partition发布的消息是顺序的。
消费者消费时,可以指定partition进行消费,也可以使用high-level-consumer api,自动进行负载均衡,并将partition分给consumer,一个partition只能被一个consumer进行消费。 Consumer
消费者,可以多实例部署,可以批量拉取,有两类API可供选择:一个simpleConsumer,暴露所有的操作给用户,可以提交offset、fetch offset、指定partition fetch message;另外一个high-level-consumer(ZookeeperConsumerConnector),帮助用户做基于partition自动分配的负载均衡,定期提交offset,建立消费队列等。simpleConsumer相当于手动挡,high-level-consumer相当于自动挡。simpleConsumer:无需像high-level-consumer那样向zk注册brokerid、owner,甚至不需要提交offset到zk,可以将offset提交到任意地方比如(mysql,本地文件等)。
high-level-consumer:一个进程中可以启多个消费线程,一个消费线程即是一个consumer,假设A进程里有2个线程(consumerid分别为1,2),B进程有2个线程(consumerid分别为1,2),topic1的partition有5个,那么partition分配是这样的:
partition1 ---> A进程consumerid1
partition2 ---> A进程consumerid1
partition3 ---> A进程consumerid2
partition4 ---> B进程consumer1
partition5 ---> B进程consumer2
Group
High-level-consumer可以声明group,每个group可以有多个consumer,每group各自管理各自的消费offset,各个不同group之间互不关联影响。由于目前版本消费的offset、owner、group都是consumer自己通过zk管理,所以group对于broker和producer并不关心,一些监控工具需要通过group来监控,simpleComsumer无需声明group。
小提示
以上概念是 logstash 的 kafka 插件的必要参数,请理解阅读,对后续使用 kafka 插件有重要作用。logstash-kafka-input 插件使用的是 High-level-consumer API
。
插件安装
logstash-1.4 安装
如果你使用的还是 1.4 版本,需要自己单独安装 logstash-kafka 插件。插件地址见:https://github.com/joekiller/logstash-kafka。
插件本身内容非常简单,其主要依赖同一作者写的 jruby-kafka 模块。需要注意的是:该模块仅支持 Kafka-0.8 版本。如果是使用 0.7 版本 kafka 的,将无法直接使 jruby-kafka 该模块和 logstash-kafka 插件。
安装按照官方文档完全自动化的安装。或是可以通过以下方式手动自己安装插件,不过重点注意的是 kafka 的版本,上面已经指出了。
- 下载 logstash 并解压重命名为
./logstash-1.4.0
文件目录。 - 下载 kafka 相关组件,以下示例选的为 kafka_2.8.0-0.8.1.1-src,并解压重命名为
./kafka_2.8.0-0.8.1.1
。 - 从 releases 页下载 logstash-kafka v0.4.2 版,并解压重命名为
./logstash-kafka-0.4.2
。 - 从
./kafka_2.8.0-0.8.1.1/libs
目录下复制所有的 jar 文件拷贝到./logstash-1.4.0/vendor/jar/kafka_2.8.0-0.8.1.1/libs
下,其中你需要创建kafka_2.8.0-0.8.1.1/libs
相关文件夹及目录。 - 分别复制
./logstash-kafka-0.4.2/logstash
里的inputs
和outputs
下的kafka.rb
,拷贝到对应的./logstash-1.4.0/lib/logstash
里的inputs
和outputs
对应目录下。 - 切换到
./logstash-1.4.0
目录下,现在需要运行 logstash-kafka 的 gembag.rb 脚本去安装 jruby-kafka 库,执行以下命令:GEM_HOME=vendor/bundle/jruby/1.9 GEM_PATH= java -jar vendor/jar/jruby-complete-1.7.11.jar --1.9 ../logstash-kafka-0.4.2/gembag.rb ../logstash-kafka-0.4.2/logstash-kafka.gemspec
。 - 现在可以使用 logstash-kafka 插件运行 logstash 了。
logstash-1.5 安装
logstash 从 1.5 版本开始才集成了 Kafka 支持。1.5 版本开始所有插件的目录和命名都发生了改变,插件发布地址见:https://github.com/logstash-plugins。
安装和更新插件都可以使用官方提供的方式:
$bin/plugin install OR $bin/plugin update
小贴士
对于插件的安装和更新,默认走的Gem源为 https://rubygems.org,对于咱们国内网络来说是出奇的慢或是根本无法访问(爬梯子除外),在安装或是更新插件是,可以尝试修改目录下 Gemfile
文件中的 source
为淘宝源https://ruby.taobao.org,这样会使你的安装或是更新顺畅很多。
插件配置
Input 配置示例
以下配置可以实现对 kafka 读取端(consumer)的基本使用。
消费端更多详细的配置请查看 http://kafka.apache.org/documentation.html kafka 官方文档的消费者部分配置文档。
input {
kafka {
zk_connect => "localhost:2181"
group_id => "logstash"
topic_id => "test"
codec => plain
reset_beginning => false # boolean (optional), default: false
consumer_threads => 5 # number (optional), default: 1
decorate_events => true # boolean (optional), default: false
}
}
Input 解释
作为 Consumer 端,插件使用的是 High-level-consumer API
,请结合上述 kafka 基本概念进行设置:
- group_id
消费者分组,可以通过组 ID 去指定,不同的组之间消费是相互不受影响的,相互隔离。
- topic_id
指定消费话题,也是必填项目,指定消费某个 topic
,这个其实就是订阅某个主题,然后去消费。
- reset_beginning
logstash 启动后从什么位置开始读取数据,默认是结束位置,也就是说 logstash 进程会以从上次读取结束时的偏移量开始继续读取,如果之前没有消费过,那么就开始从头读取.如果你是要导入原有数据,把这个设定改成 “true”, logstash 进程就从头开始读取.有点类似 cat
,但是读到最后一行不会终止,而是变成 tail -F
,继续监听相应数据。
- decorate_events
在输出消息的时候会输出自身的信息包括:消费消息的大小, topic 来源以及 consumer 的 group 信息。
- rebalance_max_retries
当有新的 consumer(logstash) 加入到同一 group 时,将会 reblance
,此后将会有 partitions
的消费端迁移到新的 consumer
上,如果一个 consumer
获得了某个 partition
的消费权限,那么它将会向 zookeeper
注册, Partition Owner registry
节点信息,但是有可能此时旧的 consumer
尚没有释放此节点,此值用于控制,注册节点的重试次数。
- consumer_timeout_ms
指定时间内没有消息到达就抛出异常,一般不需要改。
以上是相对重要参数的使用示例,更多参数可以选项可以跟据 https://github.com/joekiller/logstash-kafka/blob/master/README.md 查看 input 默认参数。
注意
1.想要使用多个 logstash 端协同消费同一个 topic
的话,那么需要把两个或是多个 logstash 消费端配置成相同的 group_id
和 topic_id
, 但是前提是要把相应的 topic 分多个 partitions (区),多个消费者消费是无法保证消息的消费顺序性的。
这里解释下,为什么要分多个 partitions(区), kafka 的消息模型是对 topic 分区以达到分布式效果。每个
topic
下的不同的 partitions (区)只能有一个 Owner 去消费。所以只有多个分区后才能启动多个消费者,对应不同的区去消费。其中协调消费部分是由 server 端协调而成。不必使用者考虑太多。只是消息的消费则是无序的。
总结:保证消息的顺序,那就用一个 partition。 kafka 的每个 partition 只能同时被同一个 group 中的一个 consumer 消费。
Output 配置
以下配置可以实现对 kafka 写入端 (producer) 的基本使用。
生产端更多详细的配置请查看 http://kafka.apache.org/documentation.html kafka 官方文档的生产者部分配置文档。
output {
kafka {
bootstrap_servers => "localhost:9092"
topic_id => "test"
compression_codec => "snappy" # string (optional), one of ["none", "gzip", "snappy"], default: "none"
}
}
Output 解释
作为 Producer 端使用,以下仅为重要概念解释,请结合上述 kafka 基本概念进行设置:
- compression_codec
消息的压缩模式,默认是 none,可以有 gzip 和 snappy (暂时还未测试开启压缩与不开启的性能,数据传输大小等对比)。
- compressed_topics
可以针对特定的 topic 进行压缩,设置这个参数为 topic
,表示此 topic
进行压缩。
- request_required_acks
消息的确认模式:
可以设置为 0: 生产者不等待 broker 的回应,只管发送.会有最低能的延迟和最差的保证性(在服务器失败后会导致信息丢失)
可以设置为 1: 生产者会收到 leader 的回应在 leader 写入之后.(在当前 leader 服务器为复制前失败可能会导致信息丢失)
可以设置为 -1: 生产者会收到 leader 的回应在全部拷贝完成之后。
- partitioner_class
分区的策略,默认是 hash 取模
- send_buffer_bytes
socket 的缓存大小设置,其实就是缓冲区的大小
消息模式相关
- serializer_class
消息体的系列化处理类,转化为字节流进行传输,请注意 encoder 必须和下面的 key_serializer_class
使用相同的类型。
- key_serializer_class
默认的是与 serializer_class
相同
- producer_type
生产者的类型 async
异步执行消息的发送 sync
同步执行消息的发送
- queue_buffering_max_ms
异步模式下,那么就会在设置的时间缓存消息,并一次性发送
- queue_buffering_max_messages
异步的模式下,最长等待的消息数
- queue_enqueue_timeout_ms
异步模式下,进入队列的等待时间,若是设置为0,那么要么进入队列,要么直接抛弃
- batch_num_messages
异步模式下,每次发送的最大消息数,前提是触发了 queue_buffering_max_messages
或是 queue_enqueue_timeout_ms
的限制
以上是相对重要参数的使用示例,更多参数可以选项可以跟据 https://github.com/joekiller/logstash-kafka/blob/master/README.md 查看 output 默认参数。
小贴士
logstash-kafka 插件输入和输出默认 codec
为 json 格式。在输入和输出的时候注意下编码格式。消息传递过程中 logstash 默认会为消息编码内加入相应的时间戳和 hostname 等信息。如果不想要以上信息(一般做消息转发的情况下),可以使用以下配置,例如:
output {
kafka {
codec => plain {
format => "%{message}"
}
}
}
作为 Consumer 从kafka中读数据,如果为非 json 格式的话需要进行相关解码,例如:
input {
kafka {
zk_connect => "xxx:xxx"
group_id => "test"
topic_id => "test-topic"
codec => "line"
...........
}
}
性能
队列监控
其实 logstash 的 kafka 插件性能并不是很突出,可以通过使用以下命令查看队列积压消费情况:
$/bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group test
队列积压严重,性能跟不上的情况下,结合实际服务器资源,可以适当增加 topic 的 partition 多实例化 Consumer 进行消费处理消息。
input-kafka 的 JSON 序列化性能
此外,跟 logstash-input-syslog 改在 filter 阶段 grok 的优化手段类似,也可以将 logstash-input-kafka 的默认 JSON 序列化操作从 codec 阶段后移到 filter 阶段。如下:
input {
kafka {
codec => plain
}
}
filter {
json {
source => "message"
}
}
然后通过 bin/logstash -w $num_cpus
运行,利用多核计算优势,可以获得大约一倍左右的性能提升。
其他方案
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论