- Logstash
- Logstash - 入门示例
- 入门示例 - 下载安装
- 入门示例 - hello world
- 入门示例 - 配置语法
- 入门示例 - plugin的安装
- 入门示例 - 长期运行
- Logstash - 插件配置
- 插件配置 - input配置
- input配置 - file
- input配置 - stdin
- input配置 - syslog
- input配置 - tcp
- 插件配置 - codec配置
- codec配置 - json
- codec配置 - multiline
- codec配置 - collectd
- codec配置 - netflow
- 插件配置 - filter配置
- filter配置 - date
- filter配置 - grok
- filter配置 - dissect
- filter配置 - geoip
- filter配置 - json
- filter配置 - kv
- filter配置 - metrics
- filter配置 - mutate
- filter配置 - ruby
- filter配置 - split
- filter配置 - elapsed
- 插件配置 - output配置
- output配置 - elasticsearch
- output配置 - email
- output配置 - exec
- output配置 - file
- output配置 - nagios
- output配置 - statsd
- output配置 - stdout
- output配置 - tcp
- output配置 - hdfs
- Logstash - 场景示例
- 场景示例 - nginx访问日志
- 场景示例 - nginx错误日志
- 场景示例 - postfix日志
- 场景示例 - ossec日志
- 场景示例 - windows系统日志
- 场景示例 - Java日志
- 场景示例 - MySQL慢查询日志
- Logstash - 性能与测试
- 性能与测试 - generator方式
- 性能与测试 - 监控方案
- 监控方案 - logstash-input-heartbeat方式
- 监控方案 - jmx启动参数方式
- 监控方案 - API方式
- Logstash - 扩展方案
- 扩展方案 - 通过redis传输
- 扩展方案 - 通过kafka传输
- 扩展方案 - AIX 平台上的logstash-forwarder-java
- 扩展方案 - rsyslog
- 扩展方案 - nxlog
- 扩展方案 - heka
- 扩展方案 - fluent
- 扩展方案 - Message::Passing
- Logstash - 源码解析
- 源码解析 - pipeline流程
- 源码解析 - Event的生成
- Logstash - 插件开发
- 插件开发 - utmp插件示例
- Beats
- Beats - filebeat
- Beats - packetbeat网络流量分析
- Beats - metricbeat
- Beats - winlogbeat
- ElasticSearch
- ElasticSearch - 架构原理
- 架构原理 - segment、buffer和translog对实时性的影响
- 架构原理 - segment merge对写入性能的影响
- 架构原理 - routing和replica的读写过程
- 架构原理 - shard的allocate控制
- 架构原理 - 自动发现的配置
- ElasticSearch - 接口使用示例
- 接口使用示例 - 增删改查操作
- 接口使用示例 - 搜索请求
- 接口使用示例 - Painless脚本
- 接口使用示例 - reindex接口
- ElasticSearch - 性能优化
- 性能优化 - bulk提交
- 性能优化 - gateway配置
- 性能优化 - 集群状态维护
- 性能优化 - 缓存
- 性能优化 - fielddata
- 性能优化 - curator工具
- 性能优化 - profile接口
- ElasticSearch - rally测试方案
- ElasticSearch - 多集群互联
- ElasticSearch - 别名的应用
- ElasticSearch - 映射与模板的定制
- ElasticSearch - puppet-elasticsearch模块的使用
- ElasticSearch - 计划内停机升级的操作流程
- ElasticSearch - 镜像备份
- ElasticSearch - rollover和shrink
- ElasticSearch - Ingest节点
- ElasticSearch - Hadoop 集成
- Hadoop 集成 - spark streaming交互
- ElasticSearch - 权限管理
- 权限管理 - Shield
- 权限管理 - Search-Guard 在 Elasticsearch 2.x 上的运用
- ElasticSearch - 监控方案
- 监控方案 - 监控相关接口
- 监控相关接口 - 集群健康状态
- 监控相关接口 - 节点状态
- 监控相关接口 - 索引状态
- 监控相关接口 - 任务管理
- 监控相关接口 - cat 接口的命令行使用
- 监控方案 - 日志记录
- 监控方案 - 实时bigdesk方案
- 监控方案 - cerebro
- 监控方案 - zabbix trapper方案
- ElasticSearch - ES在运维监控领域的其他玩法
- ES在运维监控领域的其他玩法 - percolator接口
- ES在运维监控领域的其他玩法 - watcher报警
- ES在运维监控领域的其他玩法 - ElastAlert
- ES在运维监控领域的其他玩法 - 时序数据库
- ES在运维监控领域的其他玩法 - Grafana
- ES在运维监控领域的其他玩法 - juttle
- ES在运维监控领域的其他玩法 - Etsy的Kale异常检测
- Kibana 5
- Kibana 5 - 安装、配置和运行
- Kibana 5 - 生产环境部署
- Kibana 5 - discover功能
- Kibana 5 - 各visualize功能
- 各visualize功能 - area
- 各visualize功能 - table
- 各visualize功能 - line
- 各visualize功能 - markdown
- 各visualize功能 - metric
- 各visualize功能 - pie
- 各visualize功能 - tile map
- 各visualize功能 - vertical bar
- Kibana 5 - dashboard功能
- Kibana 5 - timelion 介绍
- Kibana 5 - console 介绍
- Kibana 5 - setting功能
- Kibana 5 - 常用sub agg示例
- 常用sub agg示例 - 函数堆栈链分析
- 常用sub agg示例 - 分图统计
- 常用sub agg示例 - TopN的时序趋势图
- 常用sub agg示例 - 响应时间的百分占比趋势图
- 常用sub agg示例 - 响应时间的概率分布在不同时段的相似度对比
- Kibana 5 - 源码解析
- 源码解析 - .kibana索引的数据结构
- 源码解析 - 主页入口
- 源码解析 - discover解析
- 源码解析 - visualize解析
- 源码解析 - dashboard解析
- Kibana 5 - 插件
- 插件 - 可视化开发示例
- 插件 - 后端开发示例
- 插件 - 完整app开发示例
- Kibana 5 - Kibana报表
- 竞品对比
监控相关接口 - 任务管理
任务是 Elasticsearch 中早就有的一个概念。不过最新的 5.0 版对此重构之前,我们只能看到对于 master 来说等待执行的集群级别的任务。这个是一个非常狭隘的概念。重构以后,和数据相关的一些操作,也可以以任务形态存在,从而也就有了针对性的管理操作。目前,还只有 recovery、snapshot、reindex 等操作是基于任务式的。未来的 6.0 版,可能把整个 query 检索都改为基于任务式的。困扰用户多年的资源隔离问题,可能就可以得到大大缓解。
等待执行的任务列表
首先我们还是先了解一下狭义的任务,即过去就有的 master 节点的等待执行任务。之前章节已经讲过,master 节点负责处理的任务其实很少,只有集群状态的数据维护。所以绝大多数情况下,这个任务列表应该都是空的。
# curl -XGET http://127.0.0.1:9200/_cluster/pending_tasks
{
"tasks": []
}
但是如果你碰上集群有异常,比如频繁有索引映射更新,数据恢复啊,分片分配或者初始化的时候反复出错啊这种时候,就会看到一些任务列表了:
{
"tasks" : [ {
"insert_order": 767003,
"priority": "URGENT",
"source": "create-index [logstash-2015.06.01], cause [api]",
"time_in_queue_millis": 86,
"time_in_queue": "86ms"
}, {
"insert_order" : 767004,
"priority" : "HIGH",
"source" : "shard-failed ([logstash-2015.05.31][50], node[F3EGSRWGSvWGFlcZ6K9pRA], [P], s[INITIALIZING]), reason [shard failure [failed recovery][IndexShardGatewayRecoveryException[[logstash-2015.05.31][50] failed to fetch index version after copying it over]; nested: CorruptIndexException[[logstash-2015.05.31][50] Preexisting corrupted index [corrupted_nC9t_ramRHqsbQqZO78KVg] caused by: CorruptIndexException[did not read all bytes from file: read 269 vs size 307 (resource: BufferedChecksumIndexInput(NIOFSIndexInput(path="/data1/elasticsearch/data/es1003/nodes/0/indices/logstash-2015.05.31/50/index/_16c.si")))]norg.apache.lucene.index.CorruptIndexException: did not read all bytes from file: read 269 vs size 307 (resource: BufferedChecksumIndexInput(NIOFSIndexInput(path="/data1/elasticsearch/data/es1003/nodes/0/indices/logstash-2015.05.31/50/index/_16c.si")))ntat org.apache.lucene.codecs.CodecUtil.checkFooter(CodecUtil.java:216)ntat org.apache.lucene.codecs.lucene46.Lucene46SegmentInfoReader.read(Lucene46SegmentInfoReader.java:73)ntat org.apache.lucene.index.SegmentInfos.read(SegmentInfos.java:359)ntat org.apache.lucene.index.SegmentInfos$1.doBody(SegmentInfos.java:462)ntat org.apache.lucene.index.SegmentInfos$FindSegmentsFile.run(SegmentInfos.java:923)ntat org.apache.lucene.index.SegmentInfos$FindSegmentsFile.run(SegmentInfos.java:769)ntat org.apache.lucene.index.SegmentInfos.read(SegmentInfos.java:458)ntat org.elasticsearch.common.lucene.Lucene.readSegmentInfos(Lucene.java:89)ntat org.elasticsearch.index.store.Store.readSegmentsInfo(Store.java:158)ntat org.elasticsearch.index.store.Store.readLastCommittedSegmentsInfo(Store.java:148)ntat org.elasticsearch.index.engine.InternalEngine.flush(InternalEngine.java:675)ntat org.elasticsearch.index.engine.InternalEngine.flush(InternalEngine.java:593)ntat org.elasticsearch.index.shard.IndexShard.flush(IndexShard.java:675)ntat org.elasticsearch.index.translog.TranslogService$TranslogBasedFlush$1.run(TranslogService.java:203)ntat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)ntat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)ntat java.lang.Thread.run(Thread.java:745)n]; ]]",
"executing" : true,
"time_in_queue_millis" : 2813,
"time_in_queue" : "2.8s"
}, {
"insert_order" : 767005,
"priority" : "HIGH",
"source" : "refresh-mapping [logstash-2015.06.03][[curl_debuginfo]]",
"executing" : false,
"time_in_queue_millis" : 2787,
"time_in_queue" : "2.7s"
}, {
"insert_order" : 767006,
"priority" : "HIGH",
"source" : "refresh-mapping [logstash-2015.05.29][[curl_debuginfo]]",
"executing" : false,
"time_in_queue_millis" : 448,
"time_in_queue" : "448ms"
} ]
}
可以看到列表中的任务都有各自的优先级,URGENT 优先于 HIGH。然后是任务在队列中的排队时间,任务的具体内容等。
在上例中,由于磁盘文件损坏,一个分片中某个 segment 的实际内容和长度对不上,导致分片数据恢复无法正常完成,堵塞了后续的索引映射更新操作。这个错误一般来说不太常见,也只能是关闭索引,或者放弃这部分数据。更常见的可能,是集群存储长期数据导致索引映射数据确实大到了 master 节点内存不足以快速处理的地步。
这时候,根据实际情况,可以有以下几种选择:
- 索引就是特别多:给 master 加内存。
- 索引里字段太多:改用 nested object 方式节省字段数量。
- 索引多到内存就是不够了:把一部分数据拆出来另一个集群。
新版任务管理
新版本的任务并没有独立的创建接口,你发起的具体某次 snapshot、reindex 等操作,自动就成为了一个任务。而任务的列表可以通过 /_tasks
或者 /_cat/tasks
接口来获取。和其他接口一样,手工操作选用 cat ,写程序的时候选用 JSON 接口。
curl -XGET 'localhost:9200/_cat/tasks?v'
action task_id parent_task_id type start_time timestamp running_time ip node
cluster:monitor/tasks/lists -ANcpn_JTI-Zs93fGAfhjw:779 - transport 1477891751674 13:29:11 170.2micros 127.0.0.1 -ANcpn_
cluster:monitor/tasks/lists[n] -ANcpn_JTI-Zs93fGAfhjw:780 -ANcpn_JTI-Zs93fGAfhjw:779 direct 1477891751674 13:29:11 60.6micros 127.0.0.1 -ANcpn_
indices:data/write/reindex r1A2WoRbTwKZ516z6NEs5A:916 - transport 1477891751674 13:29:11 212.5micros 127.0.0.1 r1A2WoR
上面是一个正常运行中的集群的任务列表。除了一个 reindex 任务,没有什么 recovery 的麻烦事儿,很好。
如果想要取消某个任务,比如上面的 reindex,可以像这样运行:
curl -XPOST 'localhost:9200/_tasks/task_id:916/_cancel'
目前来说,能做的只有这些了。Elasticsearch 还不支持诸如挂起、暂停之类更复杂的任务操作。让我们期待未来的发展吧。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论