使用 logstash-input-jdbc 同步 mysql 数据 至 ES 最后一条数据未能保存
问题描述
我在使用 logstash-input-jdbc 插件 同步 MySQL 数据至 ES 的时候 SQL 查询满足的条件的数据有 13 条
但是保存至 ES 时 只有 12 条了 我在表里新增一条数据同步的是上次最后一条没有保存的数据,还有我在终端退出 (Ctrl + c) 到时候 它也会把最后一条记录先写入 ES 再退出进程
执行输出的日志:
当我 Ctrl + c 退出 logstash 时产生一条日志
[2018-10-31T11:40:07,422][WARN ][logstash.agent ] stopping pipeline {:id=>"main"}
{"name":"Kimi","id":14,"todos":[],"todo_list":[],"tags":["_aggregatefinalflush"]}
这条日志说明 logstash 又帮我把那条没有存入的记录写入ES才退出的 但是数据上多了一个 tags 字段 标注为 _aggregatefinalflush
问题出现的平台版本
ElasticSearch 版本 -> 5.5.2
Logstash 版本 -> 5.5.2
开发测试平台 MacOS Mojave 10.14
相关代码
logstash配置:
input {
stdin{
}
jdbc {
# Mysql数据库地址
jdbc_connection_string => "jdbc:mysql://localhost:3306/test"
# 开启连接验证
jdbc_validate_connection => true
# 数据库用户
jdbc_user => "root"
# 数据库密码
jdbc_password => "123456"
# JDBC驱动库
jdbc_driver_library => "/Users/simon/logstash/mysql-connector-java-5.1.36.jar"
# JDBC 驱动类
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 声明要导入数据库 SQL 语句 如果要将 SQL 独立存放则使用 statement_filepath 参数
# statement => "select * from users"
# 存放导入数据 SQL 的 SQL文件
statement_filepath => "/Users/simon/logstash/mysql_users.sql"
# CronJob 自动定时执行默认一分钟执行一次 类似 Crontab 定时规则
# 定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
# 启用后将导致将 sql 语句分解为多个查询。每个查询将使用限制和偏移量来共同检索完整的结果集。限制大小是用 jdbc_page_size 设置的。
# jdbc_paging_enabled => true
# 每页处理条数 jdbc_paging_enabled = true 时生效 默认 100000
# jdbc_page_size => 1
# ElasticSearch Domcument type 自 ES 6.x 版本开始 output 中不再支持 document_type 配置 7.x版本将彻底废弃
#type => "users"
# 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
record_last_run => "true"
last_run_metadata_path => "/Users/simon/logstash/sync_last_id"
# 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
# clean_run => "false"
# 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值
use_column_value => true
# 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
tracking_column => "id"
# 是否将 字段(column) 名称转小写
#lowercase_column_names => "false"
}
}
filter {
aggregate {
task_id => "%{id}"
code => "
#代码注释
map['id'] = event.get('id')
map['name'] = event.get('name')
map['todo_list'] ||=[]
map['todos'] ||=[]
if (event.get('todo_id') != nil)
if !(map['todo_list'].include? event.get('todo_id'))
map['todo_list'] << event.get('todo_id')
map['todos'] << {
'todo_id' => event.get('todo_id'),
'title' => event.get('text'),
}
end
end
event.cancel()
"
push_previous_map_as_event => true
}
json {
source => "message"
remove_field => ["message"]
#remove_field => ["message", "type", "@timestamp", "@version"]
}
mutate {
#将不需要的JSON字段过滤,且不会被存入 ES 中
remove_field => ["@timestamp", "@version"]
}
}
# 从 MySQL导入数据到 ElasticSearch 保存
output {
elasticsearch {
# ES 服务URL地址
hosts => ["127.0.0.1:9200"]
# ES 索引名称
index => "mysql_users"
# document_type 自 ES 6.x 版本开始 output 中不再支持 document_type 配置 7.x版本将彻底废弃
document_type => "users"
# 文档ID 对应数据库 ID
document_id => "%{id}"
codec => "json"
}
stdout {
codec => json_lines
}
}
执行的 SQL mysql_users.sql 内容:
SELECT
`users`.`id` AS `id`,
`users`.`name` AS `name`,
`todo`.`id` AS `todo_id`,
IFNULL(`todo`.`text`, "") AS `text`,
IFNULL(`todo`.`is_done`, 0) AS `is_done`,
`todo`.`user_id` AS `user_id`
FROM `users`
LEFT JOIN `todo` ON `users`.`id` = `todo`.`user_id`
WHERE `users`.`id` > :sql_last_value
ORDER BY `id` ASC
数据库建表脚本以及测试数据:
DROP TABLE IF EXISTS `todo`;
CREATE TABLE `todo` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`text` varchar(255) NOT NULL DEFAULT '' COMMENT '任务文本',
`is_done` tinyint(3) DEFAULT '0' COMMENT '是否完成',
`user_id` int(11) DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
LOCK TABLES `todo` WRITE;
/*!40000 ALTER TABLE `todo` DISABLE KEYS */;
INSERT INTO `todo` (`id`, `text`, `is_done`, `user_id`)
VALUES
(3,'bbbbb',0,1),
(4,'cccccc',0,1),
(5,'人生自古谁无死,留取丹心照汗青',0,2),
(6,'来自Vue的问候',0,2),
(7,'Hello world',0,11),
(8,'举头望明月,低头思故乡',0,11),
(10,'我欲乘风归去,又恐琼楼玉宇',0,1),
(11,'朝辞白帝彩云间,千里江陵一日还',0,1),
(12,'在天愿作比翼鸟,在地愿做连理枝',0,9),
(13,'天长地久有时尽,此恨绵绵无绝期',0,9);
/*!40000 ALTER TABLE `todo` ENABLE KEYS */;
UNLOCK TABLES;
DROP TABLE IF EXISTS `users`;
CREATE TABLE `users` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT '',
`version` int(11) DEFAULT '0',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
LOCK TABLES `users` WRITE;
/*!40000 ALTER TABLE `users` DISABLE KEYS */;
INSERT INTO `users` (`id`, `name`, `version`)
VALUES
(1,'Simon',0),
(2,'Jerry',0),
(4,'Jim',0),
(5,'Mary',0),
(6,'Amy',0),
(7,'Kaiven',0),
(8,'Bell',0),
(9,'Sky',0),
(10,'Sam',0),
(11,'Lily',0),
(12,'Lucy',0),
(13,'David',0),
(14,'Kimi',0);
/*!40000 ALTER TABLE `users` ENABLE KEYS */;
UNLOCK TABLES;
你期待的结果是什么?实际看到的错误信息又是什么?
我希望最后那条数据也能试试同步进入 ES 而不是要等进程结束再写入
执行过程中正常,没有错误提示与警告
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
再次感谢 @博弈 大神的解答。
这里对该问题进行一个简要的总结,filter aggregate 创建中 event map 并不知道我这次事件是不是应该结束,也就是它也不知道我到那一条才是最后一条, 因此 设置一个 timeout 告诉它 这个时间执行多少秒就结束继续执行第二个 但这样并不是很严谨 因为你也不确定你的 event map 到底要执行多久 因此我返回去看了下官方的文档 最好的方式是 我们应该给定一个 task end 的条件 ES官网关于 aggregate 的说明