MongoDb 实时(或接近实时)流式传输插入的数据
我有许多 MongoDB 集合,它们从各种流媒体源获取大量 JSON 文档。换句话说,有许多进程不断地将数据插入一组 MongoDB 集合中。
我需要一种方法将数据从 MongoDB 流式传输到下游应用程序。所以我想要一个概念上看起来像这样的系统:
App Stream1 -->
App Stream2 --> MONGODB ---> Aggregated Stream
App Stream3 -->
或者这样:
App Stream1 --> ---> MongoD Stream1
App Stream2 --> MONGODB ---> MongoD Stream2
App Stream3 --> ---> MongoD Stream3
问题是如何将数据从 Mongo 中流出,而不必不断轮询/查询数据库?
明显的问题答案是“为什么不更改这些应用程序流处理以将消息发送到像 Rabbit、Zero 或 ActiveMQ 这样的队列,然后将它们像这样立即发送到您的 Mongo 流处理和 Mongo”:
MONGODB
/|\
|
App Stream1 --> | ---> MongoD Stream1
App Stream2 --> SomeMQqueue ---> MongoD Stream2
App Stream3 --> ---> MongoD Stream3
在理想的世界中是的那很好,但是我们需要 Mongo 来确保首先保存消息,避免重复并确保 ID 全部生成等。Mongo 必须作为持久层坐在中间。
那么我如何将消息从 Mongo 集合(不使用 GridFS 等)流式传输到这些下游应用程序中。基本思想是轮询新文档,收集到的每个文档通过向数据库中存储的 JSON 文档添加另一个字段来更新它,就像 SQL 表中存储已处理时间戳的进程标志一样。即每 1 秒轮询一次已处理 == null.... 添加已处理 = now().... 更新文档。
有没有更简洁/计算效率更高的方法?
仅供参考 - 这些都是 Java 进程。
干杯!
I have a number of MongoDB collections which take a number of JSON documents from various streaming sources. In other-words there a a number of processes which are continually inserting data into a set of MongoDB collections.
I need a way to stream the data out of MongoDB into downstream applications. So I want an system that conceptually looks like this:
App Stream1 -->
App Stream2 --> MONGODB ---> Aggregated Stream
App Stream3 -->
OR this:
App Stream1 --> ---> MongoD Stream1
App Stream2 --> MONGODB ---> MongoD Stream2
App Stream3 --> ---> MongoD Stream3
The question is how do I stream data out of Mongo without having to continually poll/query the database?
The obvious question answer would be "why dont you change those app streaming processes to send messages to a Queue like Rabbit, Zero or ActiveMQ which then has them send to your Mongo Streaming processes and Mongo at once like this":
MONGODB
/|\
|
App Stream1 --> | ---> MongoD Stream1
App Stream2 --> SomeMQqueue ---> MongoD Stream2
App Stream3 --> ---> MongoD Stream3
In an ideal world yes that would be good, but we need Mongo to ensure that messages are saved first, to avoid duplicates and ensure that IDs are all generated etc. Mongo has to sit in the middle as the persistence layer.
So how do I stream messages out of a Mongo collection (not using GridFS etc) into these down stream apps. The basic school of thought has been to just poll for new documents and each document that is collected update it by adding another field to the JSON documents stored in the database, much like a process flag in a SQL table that stores a processed time stamp. I.e. every 1 second poll for documents where processed == null.... add processed = now().... update document.
Is there a neater/more computationally efficient method?
FYI - These are all Java processes.
Cheers!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
如果您要写入上限集合(或多个集合),则可以使用 tailablecursor 将新数据推送到流中,或者推送到可以将其流出的消息队列中。然而,这对于无上限的集合不起作用。
If you are writing to a capped collection (or collections), you can use a tailablecursor to push new data on the stream, or on a message queue from where it can be streamed out. However this will not work for a non-capped collection though.