如何有条件地从 Kafka 主题轮询消息
我在 MongoDB 数据库中有一些任务通知。每个任务都有一个 due_date 和提醒标志。我正在将这些任务推送到 Kafka 主题。有一个 Node JS 应用程序从该主题进行轮询,并根据 due_date 和提醒标志将通知推送到前端应用程序。 due_date 可能已过或即将到来。
每当这些基于时间的条件发生时,我们需要从 Kafka 向正在侦听的节点应用程序发送通知:
- 提醒 = true 并且距离到期日还有 X 时间
- 到期日 = 现在
- 任务仍然存在并且已逾期
这怎么可能卡夫卡就完了吗?
I have a few task notifications in a MongoDB database. Each task has a due_date and reminder flag. I am pushing these tasks to a Kafka Topic. There is a Node JS app that polls from this topic and pushes notifications to a frontend app based on the due_date and reminder flag. The due_date could be past dated or upcoming.
From Kafka we need to send notifications to the Node App that is listening whenever those conditions time-based conditions occur:
- Reminder = true and it is X time before the Due Date
- Due Date = now
- The Task still exists and is Past Due
How can this be done with Kafka?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
DB 到 Kafka 的交互应该通过源连接器进行。每当基础表发生变化时,DB Connector 就可以将事件发布到 Kafka。因此,如果创建新行或更新任何列。
因此,理想的解决方案是在表中引入更多列或一个新的实用程序表,其中包含用于识别上述条件的列。可能是像“IsDueDate”这样的列,它可以是布尔类型。在数据库中创建一个调度程序(不确定 Mongo,但大多数数据库都有此选项)或任何批处理系统(如 Spring 批处理/启动应用程序)来验证数据并填充这些列。
一旦这些列被更新,它将通过连接器向 Kafka 触发一条消息,并且您的前端应用程序会轮询 Kafka 以获取新消息,并最终可以在有效负载中使用这些标志来识别触发此消息的条件,并且您可以在前端执行这些操作。
DB to Kafka interaction should be via source connector. DB Connectors can publish events to Kafka whenever there is a change in underlying table. So if new rows are created or any column is updated.
So ideal solution would be to introduce some more columns in table OR a new utility table with columns to identify the conditions you mentioned above. May be a column like "IsDueDate" which can be a boolean type. Create a scheduler in DB (not sure of Mongo but most DBs have option for this) Or any batch system (like Spring batch/boot app) to validate your data and populate these columns.
Once these columns are updated, it will trigger a message to Kafka via connector and your front end apps polls Kafka for new messages and ultimately can use these flags in payload to identify which condition triggered this and you can do the stuffs in front end.