ElasticSearch 关于使用 Logstash-input-jdbc 导入MySQL 一对多关系JSON结构
问题描述
用logstash-input-jdbc 将 mysql 一对多结构 数据导入 ElasticSearch 时出现连表查询的数据丢失没有正确存入数组的的现象,且偶尔正常;
问题出现的平台版本及自己尝试过哪些方法
系统环境:MacOS Mojave 10.14
Mysql 版本: 5.6.23
ElasticSearch 版本:5.5.2
logstash 版本:6.4.2
相关代码
logstash.conf 配置
input {
stdin{
}
jdbc {
# Mysql数据库地址
jdbc_connection_string => "jdbc:mysql://localhost:3306/yii2basic"
# 开启连接验证
jdbc_validate_connection => true
# 数据库用户
jdbc_user => "root"
# 数据库密码
jdbc_password => "******"
# JDBC驱动库
jdbc_driver_library => "/Users/simon/logstash/mysql-connector-java-5.1.46.jar"
# JDBC 驱动类
jdbc_driver_class => "com.mysql.jdbc.Driver"
# 声明要导入数据库 SQL 语句 如果要将 SQL 独立存放则使用 statement_filepath 参数
# statement => "select * from users"
# 存放导入数据 SQL 的 SQL文件
statement_filepath => "/Users/simon/logstash/mysql_users.sql"
# CronJob 自动定时执行默认一分钟执行一次 类似 Crontab 定时规则
# 定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
# ElasticSearch Domcument type 自 ES 6.x 版本开始 output 中不再支持 document_type 配置 7.x版本将彻底废弃
#type => "users"
# 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
record_last_run => "true"
last_run_metadata_path => "/Users/simon/logstash/sync_last_id"
# 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
# clean_run => "false"
# 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值
use_column_value => true
# 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
tracking_column => "id"
# 是否将 字段(column) 名称转小写
#lowercase_column_names => "false"
}
}
filter {
aggregate {
task_id => "%{id}"
code => "
#代码注释
map['id'] = event.get('id')
map['name'] = event.get('name')
map['todo_list'] ||=[]
map['todos'] ||=[]
if (event.get('todo_id') != nil)
if !(map['todo_list'].include? event.get('todo_id'))
map['todo_list'] << event.get('todo_id')
map['todos'] << {
'todo_id' => event.get('todo_id'),
'title' => event.get('text'),
}
end
end
event.cancel()
"
push_previous_map_as_event => true
}
json {
source => "message"
remove_field => ["message"]
#remove_field => ["message", "type", "@timestamp", "@version"]
}
mutate {
#将不需要的JSON字段过滤,且不会被存入 ES 中
remove_field => ["tags", "@timestamp", "@version"]
}
}
# 从 MySQL导入数据到 ElasticSearch 保存
output {
elasticsearch {
# ES 服务URL地址
hosts => "127.0.0.1:9200"
# ES 索引名称
index => "mysql_users"
# document_type 自 ES 6.x 版本开始 output 中不再支持 document_type 配置 7.x版本将彻底废弃
document_type => "users"
# 文档ID 对应数据库 ID
document_id => "%{id}"
}
stdout {
#codec => json_lines
}
}
mysql_users.sql
SELECT
`users`.`id` AS `id`,
`users`.`name` AS `name`,
`todo`.`id` AS `todo_id`,
IFNULL(`todo`.`text`, "") AS `text`,
IFNULL(`todo`.`is_done`, 0) AS `is_done`,
`todo`.`user_id` AS `user_id`
FROM `users`
LEFT OUTER JOIN `todo` ON `todo`.`user_id` = `users`.`id`
WHERE `todo`.id > 0
-- WHERE `users`.`id` > :sql_last_value
ORDER BY `id`
从数据库查询的结果
导入 ElasticSearch 后的内容
请求参数
GET mysql_users/users/_search
{
"size": 200,
"sort" : [
{"id" : "desc"}
]
}
响应结果:
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 3,
"max_score": null,
"hits": [
{
"_index": "mysql_users",
"_type": "users",
"_id": "11",
"_score": null,
"_source": {
"id": 11,
"name": "Lily",
"todos": [
{
"todo_id": 8,
"title": "举头望明月,低头思故乡"
}
],
"todo_list": [
8
]
},
"sort": [
11
]
},
{
"_index": "mysql_users",
"_type": "users",
"_id": "2",
"_score": null,
"_source": {
"id": 2,
"name": "Jerry",
"todos": [
{
"todo_id": 5,
"title": "dddddd"
}
],
"todo_list": [
5
]
},
"sort": [
2
]
},
{
"_index": "mysql_users",
"_type": "users",
"_id": "1",
"_score": null,
"_source": {
"id": 1,
"name": "Simon",
"todos": [
{
"todo_id": 3,
"title": "bbbbb"
},
{
"todo_id": 4,
"title": "cccccc"
}
],
"todo_list": [
3,
4
]
},
"sort": [
1
]
}
]
}
}
你期待的结果是什么?实际看到的错误信息又是什么?
希望 ES 中存储的 每个 DOM 内容 todos 和 todo_list 都能对应数据库查询的结果条数 并与之保持一致 现在只有 id = 1 的数据是正常 2 和 11 都缺少了一条
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
参照你的方式,我写的一对多的子节点只会有一条数据(事实上应该有多条)
兄弟 如果一对多的关系中,多的那一方还存在一对多的关系 怎么写啊
第一,官方申明logstash和elasticsearch版本要一致;
第二,logstash是否有warn或error日志?