使用kafka的自动删除行?

发布于 2025-02-06 06:18:49 字数 1194 浏览 3 评论 0原文

我有一个内存数据库,我正在使用KAFKA + JDBCSINKCONNECTOR将下游Postgres数据库与内存数据库同步。内存数据库用于有效的计算,而Postgres则用于查询。在开发中,我经常破坏并重新创建内存数据库。每次,我还会重新创建Kafka水槽连接器。

如果添加了新行或在内存数据库中更改了现有行,我认为JDBCSinkConnector能够将Postgres与新数据同步。但是,如果删除行,JDBCSinkConnector不会删除Postgres中的行。

JDBCSinkConnector是否可以检查下游数据库中的哪个行不再在上游数据库中,然后删除它们?如果没有,每次更新上游数据库时,我都必须销毁下游数据库。

配置:

{
  'connector.class': 'io.confluent.connect.jdbc.JdbcSinkConnector',
  'dialect.name': 'PostgreSqlDatabaseDialect',
  'key.converter': 'io.confluent.connect.avro.AvroConverter',
  'key.converter.schema.registry.url': `http://schema-registry:${process.env.SCHEMA_REGISTRY_PORT}`,
  'value.converter': 'io.confluent.connect.avro.AvroConverter',
  'value.converter.schema.registry.url': `http://schema-registry:${process.env.SCHEMA_REGISTRY_PORT}`,
  'insert.mode': 'upsert',
  'delete.enabled': 'true',
  'auto.create': 'true',
  'auto.evolve': 'false',
  'errors.retry.timeout': -1,
  'connection.url': `jdbc:postgresql://${process.env.INTERNAL_DOCKER_HOST}:${process.env.PG_PORT}/${process.env.PG_DB}`,
  'connection.user': process.env.PG_USER,
  'connection.password': process.env.PG_PASS,
  'pk.mode': 'record_key',
}

I have an in-memory database and I'm using Kafka + JdbcSinkConnector to sync a downstream Postgres database with the in-memory database. The in-memory database is for efficient computations and Postgres is for querying. In development, I frequently destroy and recreate the in-memory database. Each time, I also recreate the Kafka sink connectors.

If new rows were added or existing rows were changed in the in-memory database, I think JdbcSinkConnector is able to sync Postgres with the new data. However, if rows were deleted, JdbcSinkConnector doesn't delete the rows in Postgres.

Is it possible for JdbcSinkConnector to check which of the rows in the downstream database are no longer in the upstream database, then delete them? If not, I'd have to destroy the downstream database every time I update the upstream database.

Config:

{
  'connector.class': 'io.confluent.connect.jdbc.JdbcSinkConnector',
  'dialect.name': 'PostgreSqlDatabaseDialect',
  'key.converter': 'io.confluent.connect.avro.AvroConverter',
  'key.converter.schema.registry.url': `http://schema-registry:${process.env.SCHEMA_REGISTRY_PORT}`,
  'value.converter': 'io.confluent.connect.avro.AvroConverter',
  'value.converter.schema.registry.url': `http://schema-registry:${process.env.SCHEMA_REGISTRY_PORT}`,
  'insert.mode': 'upsert',
  'delete.enabled': 'true',
  'auto.create': 'true',
  'auto.evolve': 'false',
  'errors.retry.timeout': -1,
  'connection.url': `jdbc:postgresql://${process.env.INTERNAL_DOCKER_HOST}:${process.env.PG_PORT}/${process.env.PG_DB}`,
  'connection.user': process.env.PG_USER,
  'connection.password': process.env.PG_PASS,
  'pk.mode': 'record_key',
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

沩ん囻菔务 2025-02-13 06:18:49

JDBCSinkConnector可能不再在上游数据库中检查下游数据库中的哪个行

与上游系统的任何知识完全脱钩,所以没有。

每次更新上游数据库时,我都必须销毁下游数据库。

截断,不要销毁/掉落。除非您的上游数据库架构更改。您需要一些外部通知事件来触发这一点;可能是REST API,不一定是Kafka事件


不清楚,为什么当Kafka流ktables还允许主题快速存储和支持时,为什么需要使用物质化,并且它们支持写入JDBC接收器所需的墓碑事件来删除数据。

possible for JdbcSinkConnector to check which of the rows in the downstream database are no longer in the upstream database

It's fully decoupled from any knowledge of the upstream system, so no.

I'd have to destroy the downstream database every time I update the upstream database.

Truncate, not destroy/drop. Unless your upstream database schema is changed. You'd need some external notification event to trigger that; could be a REST API, not necessarily a Kafka event


Unclear why you need to use Materialize when Kafka Streams KTables would also allow for quick storage and backing by topics, and they support writing tombstone events that are needed for the JDBC sink to delete data.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文