CQRS:同步写入和读取数据库

发布于 2024-09-08 21:16:29 字数 82 浏览 2 评论 0原文

任何人都可以给我一些关于各种方法的指导吗 同步写入和读取数据库?

有哪些不同的技术,以及您如何评估每种技术 可靠性、性能、实施成本等。

Can anyone please give me some direction in regards to various ways to
synchronize the Write and Read databases?

What are different technologies out there, and how do you evaluate each, in
terms of realiability, performance, cost to implement, etc.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

花开半夏魅人心 2024-09-15 21:16:29

通常在 CQRS 中,写入 DB 用于存储长时间运行的进程 (saga) 的过渡数据。如果您正在同步读取和写入数据库(我假设您指的是两种方式),那么您可能做错了什么。

对于服务需要多条消息的长时间运行的进程,它需要一种在所有消息到达之前临时存储数据的方法。一个例子是客户注册,需要经理的批准,需要一周的时间来处理。该服务需要一种在批准到达之前临时存储客户信息的方法。这里就是写DB用来存储这段临时数据的地方。请注意,在客户获得批准之前,尚未将任何内容写入读取数据库。

当批准最终到达时,服务将从写入数据库中获取客户信息,完成注册过程并将其写入读取数据库。此时,写DB中的临时客户信息已经完成其工作,可以从写DB中删除。请注意,不涉及任何双向同步。

对于更简单的过程,例如更改客户名字,可以立即将更改写入读取数据库。不需要写入写入 DB,因为在这种情况下没有临时数据。

Typically in CQRS, the write DB is used to store transitional data for long running processes (sagas). If you are synchronizing the read and write DB (I'm assuming you mean both ways), you might be doing something wrong.

For a long running process where a service expects multiple messages, it needs a way to temporary store data before the all the messages arrives. An example of this is customer registration where an approval from manager, which takes a week to process, is required. The service needs a way to temporarily store the customer information before the approval arrives. This is where the write DB is used to store this piece of temporary data. Note that before the customer is approved, nothing is written to the read DB yet.

When the approval finally arrives, the service will take the customer information from the write DB, complete the registration process and write it to the read DB. At this time, the temporary customer information in the write DB has done its job and can be removed from the write DB. Notice that there isn't any two-way sync'ing involved.

For simpler process such as change customer first name, the change can be written to the read DB right away. Writing to the write DB is not required because there is no temporary data in this case.

也只是曾经 2024-09-15 21:16:29

查询模型不需要一致。它需要最终一致。查询模型也是视图模型,即表已经根据用户界面的要求连接。所以你甚至可以使用内存缓存,或者像 Redis。
命令端就像命令对象,其中包含更新数据库的所有相关信息。这些对象可能会填满消息队列。命令对象由命令处理器处理,该命令处理器以事务方式更新查询缓存和写入数据库。写入数据库可以是 RDBMS。但很明显,应该像 MongoDB 一样进行写入优化。
您也可以通过消息系统更新读取数据库。
用于此目的的一些很好的消息系统是 RabbitMQ 和 0MQ。

Query model need not be consistent.. it needs to be eventually consistent. Query model is also the view model, i.e. tables are already joined as per requirement of user interface. So you can use even an in memory cache, or like Redis.
Command side is like command objects which contain all relevant information to update database. These objects may fill up a messaging queue. The command objects are processed by a command processor which transactionally updates the query cache and the write database. The write database can be an RDBMS.. but as is apparent, should be write optimized like MongoDB.
You can update read database via a messaging system too.
Some good messaging systems for this purpose are RabbitMQ and 0MQ.

关于从前 2024-09-15 21:16:29

如果您像我一样将读取存储视为查询服务使用的数据库(及其非规范化)
并将写入数据库作为存储域事件的数据库,那么如果您需要将它们同步到特定时刻,那么您可以做的就是重播您存储的事件。
如果您希望尽可能保持最新,那么您不需要按版本进行限制

如果您使用的是 CQRS,那么您可能会有一个看起来有点像这样的存储库

    public interface IRepository<T> where T : AggregateRoot, new()
    {
        void Save(AggregateRoot aggregate, int expectedVersion);
        T GetById(Guid id);
        T GetById(Guid id, int version);
    }

希望这会有所帮助
干杯

If you, like me see the read store as the db that the Query service use (and its denormalized)
and the write db as the database where the Domain events are stored , then if you need to Synch them to a particular moment then what you can do is just replay the events that you have stored.
In the case you want to be as up to date as possible then you need not to restrict by version

If you are using CQRS, then probably you will have a repository that looks somewhat like this

    public interface IRepository<T> where T : AggregateRoot, new()
    {
        void Save(AggregateRoot aggregate, int expectedVersion);
        T GetById(Guid id);
        T GetById(Guid id, int version);
    }

Hope this helps
Cheers

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文