LogStash JDBC插件比实际数据填充Elasticsearch中更多的数据,继续运行

发布于 2025-02-09 13:18:34 字数 730 浏览 1 评论 0原文

LogStash在无限的循环中运行,我必须停止该过程,基本上会在Elasticsearch索引中保持填充值。我需要与DB表中的行完全相同的文档。

这是我的logstash配置:

input {
  jdbc {
    jdbc_driver_library => "/correct_path/java/mysql-connector-java-8.0.27.jar" 
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/my_db"
    jdbc_user => "user" 
    jdbc_password => "password" 
    jdbc_paging_enabled => true
    schedule => "*/5 * * * * *"
    statement => 'select * from my_table'
  }
}

output {
    elasticsearch {
      user => "test"
      password => "test"
      hosts => ["localhost:9200"] 
      index => "my_index"
    }
    stdout { codec => "rubydebug" }
}

Logstash is running in an infinite loop and I'm having to stop the process, basically keeps filling values in the elasticsearch index. I need exact same number of documents as there are rows in my db table.

Here's my logstash config:

input {
  jdbc {
    jdbc_driver_library => "/correct_path/java/mysql-connector-java-8.0.27.jar" 
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/my_db"
    jdbc_user => "user" 
    jdbc_password => "password" 
    jdbc_paging_enabled => true
    schedule => "*/5 * * * * *"
    statement => 'select * from my_table'
  }
}

output {
    elasticsearch {
      user => "test"
      password => "test"
      hosts => ["localhost:9200"] 
      index => "my_index"
    }
    stdout { codec => "rubydebug" }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

漫雪独思 2025-02-16 13:18:35

这是因为查询每次执行CRON作业时都会获取所有数据。另外,您尚未在elasticsearch输出中提供自定义id,因此它将为每个文档创建动态ID,因此,索引中将有更多数据(具有不同唯一ID的重复数据)。

您可以使用sql_last_value param 哪个存储了最后一个爬网日期,并使用create_date或updated_date上的条件更新查询。这将首次获得DB的所有数据,并且仅次于新创建或更新的数据。

input {
  jdbc {
    jdbc_driver_library => "/correct_path/java/mysql-connector-java-8.0.27.jar" 
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/my_db"
    jdbc_user => "user" 
    jdbc_password => "password" 
    jdbc_paging_enabled => true
    schedule => "*/5 * * * * *"
    statement => 'select * from my_table where created_date > :sql_last_value or updated_date > :sql_last_value'
  }
}

output {
    elasticsearch {
      user => "test"
      password => "test"
      hosts => ["localhost:9200"] 
      index => "my_index"
    }
    stdout { codec => "rubydebug" }
}

PS:我不是SQL中的专业人士,因此我的查询可能会出现问题。但是我希望你能得到这个主意。

This is happening because query will get all the data every time when the cron job will be executed. Also, you have not provided custom id in elasticsearch output so it will create dynamic id for each document and due to that there will be more data in index (duplicate data with different unique id).

You can use sql_last_value param which store the last crawl date and update your query with where condition on created_date or updated_date. This will get first time all the data from DB and second time onward only data which are newly created or updated.

input {
  jdbc {
    jdbc_driver_library => "/correct_path/java/mysql-connector-java-8.0.27.jar" 
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/my_db"
    jdbc_user => "user" 
    jdbc_password => "password" 
    jdbc_paging_enabled => true
    schedule => "*/5 * * * * *"
    statement => 'select * from my_table where created_date > :sql_last_value or updated_date > :sql_last_value'
  }
}

output {
    elasticsearch {
      user => "test"
      password => "test"
      hosts => ["localhost:9200"] 
      index => "my_index"
    }
    stdout { codec => "rubydebug" }
}

PS: I am not pro in SQL so my query might have issue. But I hope you will get the idea.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文