ElasticSearch 关于使用 Logstash-input-jdbc 导入MySQL 一对多关系JSON结构

发布于 2022-09-11 14:28:30 字数 5832 浏览 18 评论 0

问题描述

用logstash-input-jdbc 将 mysql 一对多结构 数据导入 ElasticSearch 时出现连表查询的数据丢失没有正确存入数组的的现象,且偶尔正常;

问题出现的平台版本及自己尝试过哪些方法

系统环境:MacOS Mojave 10.14
Mysql 版本: 5.6.23
ElasticSearch 版本:5.5.2
logstash 版本:6.4.2

相关代码

logstash.conf 配置

input {
    stdin{
    }
    jdbc {
        # Mysql数据库地址
        jdbc_connection_string => "jdbc:mysql://localhost:3306/yii2basic"
        
        # 开启连接验证
        jdbc_validate_connection => true
        
        # 数据库用户
        jdbc_user => "root"
        
        # 数据库密码
        jdbc_password => "******"
        
        # JDBC驱动库
        jdbc_driver_library => "/Users/simon/logstash/mysql-connector-java-5.1.46.jar"
        
        # JDBC 驱动类
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        
        # 声明要导入数据库 SQL 语句 如果要将 SQL 独立存放则使用 statement_filepath 参数
        # statement => "select * from users"
        
        # 存放导入数据 SQL 的 SQL文件
        statement_filepath => "/Users/simon/logstash/mysql_users.sql"
        
        # CronJob 自动定时执行默认一分钟执行一次 类似 Crontab 定时规则
        # 定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
        schedule => "* * * * *"
        
        # ElasticSearch Domcument type 自 ES 6.x 版本开始 output 中不再支持 document_type 配置 7.x版本将彻底废弃
        #type => "users"  

        # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
        record_last_run => "true"
        last_run_metadata_path => "/Users/simon/logstash/sync_last_id"

        # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
        # clean_run => "false"

        # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值
        use_column_value => true

        # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
        tracking_column => "id"

        # 是否将 字段(column) 名称转小写
        #lowercase_column_names => "false"
    }
}

filter {
    aggregate {
        task_id => "%{id}"
        code => "
            #代码注释
            map['id'] = event.get('id')
            map['name'] = event.get('name')
            map['todo_list'] ||=[]
            map['todos'] ||=[]

            if (event.get('todo_id') != nil)
                if !(map['todo_list'].include? event.get('todo_id'))
                    map['todo_list'] << event.get('todo_id')        
                    map['todos'] << {
                        'todo_id' => event.get('todo_id'),
                        'title' => event.get('text'),
                    }
                end
            end

            event.cancel()
        "
        push_previous_map_as_event => true
    }
    json {
        source => "message"
        remove_field => ["message"]
        #remove_field => ["message", "type", "@timestamp", "@version"]
    }
    mutate  {
        #将不需要的JSON字段过滤,且不会被存入 ES 中
        remove_field => ["tags", "@timestamp", "@version"]
    }
}

# 从 MySQL导入数据到 ElasticSearch 保存
output {
    elasticsearch {
        # ES 服务URL地址
        hosts => "127.0.0.1:9200"
        
        # ES 索引名称
        index => "mysql_users"

        # document_type 自 ES 6.x 版本开始 output 中不再支持 document_type 配置 7.x版本将彻底废弃
        document_type => "users"
        
        # 文档ID 对应数据库 ID
        document_id => "%{id}"
    }
    stdout {
        #codec => json_lines
    }
}

mysql_users.sql

SELECT 
`users`.`id` AS `id`,
`users`.`name` AS `name`,
`todo`.`id` AS `todo_id`,
IFNULL(`todo`.`text`, "") AS `text`,
IFNULL(`todo`.`is_done`, 0) AS `is_done`,
`todo`.`user_id` AS `user_id`
FROM `users` 
LEFT OUTER JOIN `todo` ON `todo`.`user_id` = `users`.`id` 
WHERE `todo`.id > 0
-- WHERE `users`.`id` > :sql_last_value
ORDER BY `id` 

从数据库查询的结果

clipboard.png

导入 ElasticSearch 后的内容

请求参数
GET mysql_users/users/_search
{
  "size": 200,
  "sort" : [
    {"id" : "desc"}  
  ]
}

响应结果:

{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  },
  "hits": {
    "total": 3,
    "max_score": null,
    "hits": [
      {
        "_index": "mysql_users",
        "_type": "users",
        "_id": "11",
        "_score": null,
        "_source": {
          "id": 11,
          "name": "Lily",
          "todos": [
            {
              "todo_id": 8,
              "title": "举头望明月,低头思故乡"
            }
          ],
          "todo_list": [
            8
          ]
        },
        "sort": [
          11
        ]
      },
      {
        "_index": "mysql_users",
        "_type": "users",
        "_id": "2",
        "_score": null,
        "_source": {
          "id": 2,
          "name": "Jerry",
          "todos": [
            {
              "todo_id": 5,
              "title": "dddddd"
            }
          ],
          "todo_list": [
            5
          ]
        },
        "sort": [
          2
        ]
      },
      {
        "_index": "mysql_users",
        "_type": "users",
        "_id": "1",
        "_score": null,
        "_source": {
          "id": 1,
          "name": "Simon",
          "todos": [
            {
              "todo_id": 3,
              "title": "bbbbb"
            },
            {
              "todo_id": 4,
              "title": "cccccc"
            }
          ],
          "todo_list": [
            3,
            4
          ]
        },
        "sort": [
          1
        ]
      }
    ]
  }
}

你期待的结果是什么?实际看到的错误信息又是什么?

希望 ES 中存储的 每个 DOM 内容 todos 和 todo_list 都能对应数据库查询的结果条数 并与之保持一致 现在只有 id = 1 的数据是正常 2 和 11 都缺少了一条

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

夕色琉璃 2022-09-18 14:28:30

参照你的方式,我写的一对多的子节点只会有一条数据(事实上应该有多条)

与往事干杯 2022-09-18 14:28:30

兄弟 如果一对多的关系中,多的那一方还存在一对多的关系 怎么写啊

沉溺在你眼里的海 2022-09-18 14:28:30

第一,官方申明logstash和elasticsearch版本要一致;
第二,logstash是否有warn或error日志?

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文