嵌入式 Debezium 配置用于动态表和过滤器添加,无需重新启动连接器

发布于 2025-01-12 00:43:49 字数 1557 浏览 0 评论 0原文

我在 Spring Boot 应用程序中使用嵌入式 Debezium 引擎(v1.8.1)。我们的应用程序允许用户为其业务创建工作流程,并且他们可以创建用户定义的触发器来启动工作流程。用户可以根据数据库中的某些数据变化定义触发器来启动业务工作流程。在我们的应用程序中,用户可以监视数据库中的任何表,并在表中的任何列上创建任何过滤条件。我们使用嵌入式 Debezium 来捕获该表的所有数据更改事件 (CRUD),当通过 CRUD 操作列条件与任何行匹配时,就会启动触发器来启动工作流程。例如,在订单和交付系统的情况下,将监视订单表中的特定列“状态”,当它与值“完成”匹配时,将为订单处于已完成状态的所有客户启动交付工作流程。

为了实现上述功能,我有以下疑问:

  1. 哪种快照模式/设置对于上述功能有效/最佳?

  2. 如何进行动态添加表? - 用户可以添加任何新表以随时观看。我阅读了有关信号表以及实现相同目标所需的以下配置的信息。

"signal.data.collection":"schemaname.debezium_signal" “table.include.list”,“schemaname.tb1,schemaname.tb2,schemaname.debezium_signal”

我计划创建信号表,并在连接器启动之前将其条目添加到 table.include.list 中第一次。从那时起,每当用户添加要监视的新表(例如 schemaname.tb3)时,我们可能需要从应用程序在信号表中添加一个条目(插入 debezium_signal 值 (1,'execute-snapshot','{ "data-collections": ["schemaname.tb3"]}')) 用于选取该表进行增量快照并监视其中的任何数据更改事件。但似乎配置 table.include.list 也应该更新以包含新表 (schemaname.tb3) 以捕获需要重新启动连接器的更改事件。有没有办法动态添加表而不重新启动连接器?

  1. 如何为每个表实现不同的转换/过滤器​​ - 我希望为每个表匹配不同的过滤器或标准(用户定义)以选择行。例如,对于 tb1,schemaname.tb1.STATUS == 'COMPLETED',schemaname.tb2.delivered_date > tb2 的 some_date。目前,我们在配置中定义的过滤器适用于所有表,我们没有选项为特定表配置它们。此外,只要用户在应用程序中定义这些条件或过滤器,就应该动态添加它们。我们如何在不重新启动连接器的情况下实现这一目标?此外,文档指出搜索条件列应该是索引列(为了提高性能)。

  2. Debezium 服务有休息端点来更新连接器配置。我们是否有类似的方法可以实现相同的更新连接器配置?或者我们是否需要拥有自己的 REST API 来实现相同的目的?

  3. 由于我们将使用嵌入式 debezium 引擎,因此要配置哪些最佳堆内存和其他 JVM 选项来处理具有数百万条记录的数据库中的更改事件,而不会使连接器崩溃。

  4. 如果有多个行满足用户定义的条件,则计划使用handleBatch() 处理程序来处理批处理事件。使用通常的handleEvent()中的批处理程序有什么缺点吗?

请指教。

I'm using an embedded debezium engine (v1.8.1) within my spring boot application. Our application allows the user to create workflows for their businesses and they can create user-defined triggers to start the workflow. The user can define triggers based on some data change in the database to start business workflows. In our application, users can watch any table from DB and create any filter criteria on any of the columns in the table. We are using Embedded Debezium to capture all data change events (CRUD) for that table and when the column condition matches for any of the rows through CRUD operations, a trigger is set off to start the workflow. For e.g., In the case of an Order and Delivery system, the orders table is watched for the specific column 'STATUS' and when it matches the value 'COMPLETED', the delivery workflow will be started for all customers whose orders are in completed status.

To achieve the above functionality, I'm having the following queries,

  1. Which snapshot mode/settings are efficient/optimal for the above functionality?

  2. How to do dynamic table addition? - Users can add any new table to be watched at any point in time. I read about signal tables and the following configurations required to achieve the same.

"signal.data.collection":"schemaname. debezium_signal"
"table.include.list", "schemaname.tb1,schemaname.tb2,schemaname.debezium_signal"

I planned to have the signal table created, and to add its entry in the table.include.list before the connector is brought up for the first time. From thereon, whenever a user adds a new table (say schemaname.tb3) to be watched, we may need to add an entry in the signal table from our application (insert into debezium_signal values (1,'execute-snapshot','{"data-collections": ["schemaname.tb3"]}')) for that table to be picked up for incremental snapshot and watched for any data change events from thereon. But it seems like the configuration table.include.list should also be updated to include the new table (schemaname.tb3) to capture its change events which require a connector restart. Is there a way to add tables dynamically without restarting connectors?

  1. How to achieve different transforms/filters for each table - I want to have different filters or criteria (user-defined) to be matched for each table for selecting rows. For eg, schemaname.tb1.STATUS == 'COMPLETED' for tb1, schemaname.tb2.delivered_date > some_date for tb2. Currently, the filters that we define in the configuration are applied for all the tables and we don't have the option to configure them for specific tables. Also, these criteria or filters should be added dynamically whenever the user defines them in the application. How do we achieve this without connector restart? Moreover, the documentation says that the search criteria column should be an indexed column (for efficient performance).

  2. Debezium services have rest end-points to update connector configuration. Do we have anything similar to achieve the same for updating connector configuration? Or do we need to have our own rest APIs to achieve the same?

  3. Since we are going to use an embedded debezium engine, what are the optimal heap memory and other JVM options to configure to process change events from a database having millions of records without crashing the connectors.

  4. Planning to use handleBatch() handler to handle batch events if there is more than one row satisfying the user-defined criteria. Is there any drawback using the batch handlers from the usual handleEvent().

Kindly advise.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文