反应堆Netty中的线性化(Spring Boot WebFlux)
我如何保证反应堆Netty请求的线性化性?
理论:
给定:
请求A想要写x = 2,y = 0
请求B想要阅读X,Y和写X = X+2,Y = Y+1
请求C想要阅读X并写Y = X
所有请求均不同步处理,并立即返回客户,并接受状态。
示例:
按顺序发送请求A,B,C。
示例日志输出:(请求,线程名称,x,y)
请求A,NioEventloopGroup-2-0,x = 2,y = 0
请求C,NioEventloopgroup-2-2,x = 2,y = 2
请求B,NioEventloopGroup-2-1,X = 4,Y = 3
业务逻辑需要A后所有读取来查看x = 2和y = 0。
并请求b查看x = 2,y = 0并设置y = 1。
并请求C查看x = 4并设置y = 4
。否则操作不是可逆的。
示例代码
文档:
@Document
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Event {
@Id
private String id;
private int data;
public Event withNewId() {
setId(UUID.randomUUID().toString());
return this;
}
}
repo:
public interface EventRepository extends ReactiveMongoRepository<Event, String> {}
控制器:
@RestController
@RequestMapping(value = "/api/event")
@RequiredArgsConstructor
public class EventHandler {
private final EventRepository repo;
@PostMapping
public Mono<String> create(Event event) {
return Mono.just(event.withNewId().getId())
.doOnNext(id ->
// do query based on some logic depending on event data
Mono.just(someQuery)
.flatMap(query ->
repo.find(query)
.map(e -> event.setData(event.getData() + e.getData())))
.switchIfEmpty(Mono.just(event))
.flatMap(e -> repo.save(e))
.subscribeOn(Schedulers.single())
.subscribe());
}
}
它不起作用,但是使用 sisscribeon
我试图保证可线度性能。这意味着并发请求A和B始终按服务器接收的顺序写入DB。因此,如果另一个并发请求c是第一个读取的化合物,而不是写入,则它将读取DB的更改,该更改反映了请求B,而不是A的更改,并写下了基于B的更改
。执行者没有FIFO队列,以便我可以不同步但按顺序处理请求?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我认为这不是特别针对Netty或反应堆,而是针对一个更广泛的主题 - 如何处理顺序的消息传递和超过一通用的消息传递。一些问题:
我会尝试以单个请求的方式重新设计操作,以便以所需的顺序执行后端的操作,并在必要时使用并发来加快该过程。
例如,如果不可能,例如,您将无法控制客户端,或更一般而言事件(请求)到达的顺序,则必须使用每封语义语义来实现应用程序级逻辑的订购来进行订购。您可以,例如存储或缓冲消息,等待所有内容到达,然后才能使用正确的顺序使用消息中的数据触发业务逻辑。这需要某种键(身份),可以将消息归因于同一实体和分类键,您知道如何以正确的顺序对消息进行排序。
编辑:
获得答案后,您绝对可以“反应堆方式”实现它。
参见:
有一些警告:
考虑到所有这些,我将使用持续的存储空间 - 例如,在数据库中保存传入事件,并在后台进行处理 - 为此,您不需要使用反应堆。在大多数情况下,基于简单的Servlet弹簧应用程序将更容易维护和开发,尤其是如果您以前没有功能反应性编程的经验。
I don't think that this is specific to Netty or Reactor in particular, but to a more broad topic - how to handle out-of-order message delivery and more-than-once message delivery. A few questions:
I'd try to redesign the operation in such a way that there's a single request executing the operations on the backend in the required order and using concurrency here if necessary to speed-up the process.
If it's not possible, for example, you don't control the client, or more generally the order in which the events (requests) arrive, you have to implement ordering on application-level logic using per-message semantics to do the ordering. You can, for example store or buffer the messages, waiting for all to arrive, and when they do, only then trigger the business logic using the data from the messages in the correct order. This requires some kind of a key (identity) which can attribute messages to the same entity, and a sorting-key, that you know how to sort the messages in the correct order.
EDIT:
After getting the answers, you can definitely implement it "the Reactor way".
See Gist: https://gist.github.com/tarczynskitomek/d9442ea679e3eed64e5a8470217ad96a
There are a few caveats:
Having all this in mind, I would go with a persistent storage - say saving the incoming events in the database, and doing the processing in background - for this you don't need to use Reactor. Most of the time a simple Servlets based Spring app will be far easier to maintain and develop, especially if you have no previous experience with Functional Reactive Programming.
查看提供的代码,我不会尝试在反应堆Netty级别上处理它。
最初,关于控制器实施的几条评论是因为它有多个违反反应性原则的问题。我建议花一些时间学习反应性API,但这里有一些提示
在您订阅之前,没有发生任何反应性。同时,请致电
subscribe
明确是一种反模式,应避免在创建类似于WebFlux的框架之前。。
。
并行
调度程序应用于运行非阻止逻辑,直到您有一些阻止代码为止。doon ...
是所谓的副作用操作员,不应用于构造反应流。现在,由于网络故障,可能的重试等,预定义序列中的处理请求可能很棘手。如果您从未得到请求B或请求C,该怎么办?您还应该坚持要求吗?
正如@ttarczynski在他的评论中提到的那样,最好的选择是重新设计API并发送单个请求。
如果不是一个选项,您需要将某些状态引入“推迟”请求处理,然后根据一致性语义,将它们作为“批次”处理时,或者在收到最后一个请求时将其处理为“批处理”,或者只是推迟请求C,直到您获得请求&amp; B
Looking at the provided code I would not try to handle it on Reactor Netty level.
At first, several comments regarding controller implementation because it has multiple issues that violate reactive principles. I would recommend to spend some time learning reactive API but here are some hints
In reactive nothing happens until you subscribe. At the same time calling
subscribe
explicitly is an anti-pattern and should be avoided until you are creating framework similar to WebFlux.parallel
scheduler should be used to run non-blocking logic until you have some blocking code.doOn...
are so-called side-effect operators and should not be used for constructing reactive flows.Now, processing requests in the predefined sequence could be tricky because of network failures, possible retries, etc. What if you never get Request B or Request C? Should you still persist Request A?
As @ttarczynski mentioned in his comment the best option is to redesign API and send single request.
In case it's not an option you would need to introduce some state to "postpone" request processing and then, depending on consistency semantic, process them as a "batch" when the last request is received or just defer Request C until you get Request A & B.