反应堆Netty中的线性化(Spring Boot WebFlux)

发布于 2025-02-04 10:50:26 字数 2009 浏览 5 评论 0 原文

我如何保证反应堆Netty请求的线性化性?

理论:

给定:
请求A想要写x = 2,y = 0
请求B想要阅读X,Y和写X = X+2,Y = Y+1
请求C想要阅读X并写Y = X
所有请求均不同步处理,并立即返回客户,并接受状态。

示例:
按顺序发送请求A,B,C。

示例日志输出:(请求,线程名称,x,y)
请求A,NioEventloopGroup-2-0,x = 2,y = 0
请求C,NioEventloopgroup-2-2,x = 2,y = 2
请求B,NioEventloopGroup-2-1,X = 4,Y = 3

业务逻辑需要A后所有读取来查看x = 2和y = 0。
并请求b查看x = 2,y = 0并设置y = 1。
并请求C查看x = 4并设置y = 4

。否则操作不是可逆的。

示例代码

文档:

@Document
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Event {

    @Id
    private String id;

    private int data;

    public Event withNewId() {
        setId(UUID.randomUUID().toString());
        return this;
    }

}

repo:

public interface EventRepository extends ReactiveMongoRepository<Event, String> {}

控制器:

@RestController
@RequestMapping(value = "/api/event")
@RequiredArgsConstructor
public class EventHandler {
    private final EventRepository repo;

    @PostMapping
    public Mono<String> create(Event event) {
        return Mono.just(event.withNewId().getId())
                   .doOnNext(id -> 
                       // do query based on some logic depending on event data
                       Mono.just(someQuery)
                           .flatMap(query ->                        
                               repo.find(query)
                                   .map(e -> event.setData(event.getData() + e.getData())))
                           .switchIfEmpty(Mono.just(event))
                           .flatMap(e -> repo.save(e))
                           .subscribeOn(Schedulers.single())
                           .subscribe());
    }

}

它不起作用,但是使用 sisscribeon 我试图保证可线度性能。这意味着并发请求A和B始终按服务器接收的顺序写入DB。因此,如果另一个并发请求c是第一个读取的化合物,而不是写入,则它将读取DB的更改,该更改反映了请求B,而不是A的更改,并写下了基于B的更改

。执行者没有FIFO队列,以便我可以不同步但按顺序处理请求?

How can I guarantee linearizability of requests in Reactor Netty?

Theory:

Given:
Request A wants to write x=2, y=0
Request B wants to read x, y and write x=x+2, y=y+1
Request C wants to read x and write y=x
All Requests are processed asynchronously and return to the client immediately with status ACCEPTED.

Example:
Send requests A, B, C in order.

Example Log Output: (request, thread name, x, y)
Request A, nioEventLoopGroup-2-0, x=2, y=0
Request C, nioEventLoopGroup-2-2, x=2, y=2
Request B, nioEventLoopGroup-2-1, x=4, y=3

Business logic requires all reads after A to see x=2 and y=0.
And request B to see x=2, y=0 and set y=1.
And request C to see x=4 and set y=4.

In short: The business logic makes every next write operation dependent on the previous write operation to be completed. Otherwise the operations are not reversible.

Example Code

Document:

@Document
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Event {

    @Id
    private String id;

    private int data;

    public Event withNewId() {
        setId(UUID.randomUUID().toString());
        return this;
    }

}

Repo:

public interface EventRepository extends ReactiveMongoRepository<Event, String> {}

Controller:

@RestController
@RequestMapping(value = "/api/event")
@RequiredArgsConstructor
public class EventHandler {
    private final EventRepository repo;

    @PostMapping
    public Mono<String> create(Event event) {
        return Mono.just(event.withNewId().getId())
                   .doOnNext(id -> 
                       // do query based on some logic depending on event data
                       Mono.just(someQuery)
                           .flatMap(query ->                        
                               repo.find(query)
                                   .map(e -> event.setData(event.getData() + e.getData())))
                           .switchIfEmpty(Mono.just(event))
                           .flatMap(e -> repo.save(e))
                           .subscribeOn(Schedulers.single())
                           .subscribe());
    }

}

It does not work, but with subscribeOn I try to guarantee linearizability. Meaning that concurrent requests A and B will always write their payload to the DB in the order in which they are received by the server. Therefore if another concurrent request C is a compound of first read than write, it will read changes from the DB that reflect those of request B, not A, and write its own changes based of B.

Is there a way in Reactor Netty to schedule executors with an unbound FIFO queue, so that I can process the requests asynchronously but in order?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

掐死时间 2025-02-11 10:50:26

我认为这不是特别针对Netty或反应堆,而是针对一个更广泛的主题 - 如何处理顺序的消息传递和超过一通用的消息传递。一些问题:

  1. 客户是否总是以相同的顺序发送相同数量的请求?由于网络问题,请求可能会过时,或者可能会丢失一个或多个。
  2. 客户会重试吗?如果相同的请求两次到达服务器,会发生什么?
  3. 如果订单很重要,为什么客户在发出nth请求之前,为什么客户不等待NTH-1请求的结果?换句话说,为什么有很多并发请求?

我会尝试以单个请求的方式重新设计操作,以便以所需的顺序执行后端的操作,并在必要时使用并发来加快该过程。

例如,如果不可能,例如,您将无法控制客户端,或更一般而言事件(请求)到达的顺序,则必须使用每封语义语义来实现应用程序级逻辑的订购来进行订购。您可以,例如存储或缓冲消息,等待所有内容到达,然后才能使用正确的顺序使用消息中的数据触发业务逻辑。这需要某种键(身份),可以将消息归因于同一实体和分类键,您知道如何以正确的顺序对消息进行排序。

编辑:
获得答案后,​​您绝对可以“反应堆方式”实现它。


    Sinks.Many<Event> sink = Sinks.many() // you creat a 'sink' where the events will go
            .multicast()                  // broads all messages to all subscribes of the stream 
            .directBestEffort();          // additional semantics - publishing will fail if no subscribers - doesn't really matter here

    Flux<Event> eventFlux = sink.asFlux(); // the 'view' of the sink as a flux you can subscribe to

      public void run() {
        subscribeAndProcess();
        sink.tryEmitNext(new Event("A", "A", "A"));
        sink.tryEmitNext(new Event("A", "C", "C"));
        sink.tryEmitNext(new Event("A", "B", "B"));

        sink.tryEmitNext(new Event("B", "A", "A"));
        sink.tryEmitNext(new Event("B", "C", "C"));
        sink.tryEmitNext(new Event("B", "B", "B"));
    }


    void subscribeAndProcess() {
        eventFlux.groupBy(Event::key)
                .flatMap(
                        groupedEvents -> groupedEvents.distinct(Event::type) // distinct to avoid duplicates
                                .buffer(3)                                   // there are three event types, so we buffer and wait for all to arrive
                                .flatMap(events ->                           // once all the events are there we can do the processing the way we need
                                        Mono.just(events.stream()
                                                .sorted(Comparator.comparing(Event::type))
                                                .map(e -> e.key + e.value)
                                                .reduce(String::concat)
                                                .orElse(""))
                                )
                )
                .subscribe(System.out::println);
    }
    // prints values concatenated in order per key: 
    // - AAABAC
    // - BABBBC

参见:

有一些警告:

  • 如果给定键的所有预期事件都不到达您浪费内存缓冲 - 除非您设置超时,否则
  • 如何确保给定键的所有事件都可以使用到同一应用程序实例?
  • 您将如何从遇到中期处理的失败中恢复过来?

考虑到所有这些,我将使用持续的存储空间 - 例如,在数据库中保存传入事件,并在后台进行处理 - 为此,您不需要使用反应堆。在大多数情况下,基于简单的Servlet弹簧应用程序将更容易维护和开发,尤其是如果您以前没有功能反应性编程的经验。

I don't think that this is specific to Netty or Reactor in particular, but to a more broad topic - how to handle out-of-order message delivery and more-than-once message delivery. A few questions:

  1. Does the client always sends the same number of requests in the same order? There's always a chance that, due to networking issues the requests may arrive out of order, or one or more may be lost.
  2. Does the client make retries? What happens if the same request reaches the server twice?
  3. If the order matters, why doesn't the client wait for the result of the nth-1 request, before issuing nth request? In other words, why there are many concurrent requests?

I'd try to redesign the operation in such a way that there's a single request executing the operations on the backend in the required order and using concurrency here if necessary to speed-up the process.

If it's not possible, for example, you don't control the client, or more generally the order in which the events (requests) arrive, you have to implement ordering on application-level logic using per-message semantics to do the ordering. You can, for example store or buffer the messages, waiting for all to arrive, and when they do, only then trigger the business logic using the data from the messages in the correct order. This requires some kind of a key (identity) which can attribute messages to the same entity, and a sorting-key, that you know how to sort the messages in the correct order.

EDIT:
After getting the answers, you can definitely implement it "the Reactor way".


    Sinks.Many<Event> sink = Sinks.many() // you creat a 'sink' where the events will go
            .multicast()                  // broads all messages to all subscribes of the stream 
            .directBestEffort();          // additional semantics - publishing will fail if no subscribers - doesn't really matter here

    Flux<Event> eventFlux = sink.asFlux(); // the 'view' of the sink as a flux you can subscribe to

      public void run() {
        subscribeAndProcess();
        sink.tryEmitNext(new Event("A", "A", "A"));
        sink.tryEmitNext(new Event("A", "C", "C"));
        sink.tryEmitNext(new Event("A", "B", "B"));

        sink.tryEmitNext(new Event("B", "A", "A"));
        sink.tryEmitNext(new Event("B", "C", "C"));
        sink.tryEmitNext(new Event("B", "B", "B"));
    }


    void subscribeAndProcess() {
        eventFlux.groupBy(Event::key)
                .flatMap(
                        groupedEvents -> groupedEvents.distinct(Event::type) // distinct to avoid duplicates
                                .buffer(3)                                   // there are three event types, so we buffer and wait for all to arrive
                                .flatMap(events ->                           // once all the events are there we can do the processing the way we need
                                        Mono.just(events.stream()
                                                .sorted(Comparator.comparing(Event::type))
                                                .map(e -> e.key + e.value)
                                                .reduce(String::concat)
                                                .orElse(""))
                                )
                )
                .subscribe(System.out::println);
    }
    // prints values concatenated in order per key: 
    // - AAABAC
    // - BABBBC

See Gist: https://gist.github.com/tarczynskitomek/d9442ea679e3eed64e5a8470217ad96a

There are a few caveats:

  • If all of the expected events for the given key don't arrive you waste memory buffering - unless you set a timeout
  • How will you ensure that all the events for a given key go to the same application instance?
  • How will you recover from failures encountered mid-processing?

Having all this in mind, I would go with a persistent storage - say saving the incoming events in the database, and doing the processing in background - for this you don't need to use Reactor. Most of the time a simple Servlets based Spring app will be far easier to maintain and develop, especially if you have no previous experience with Functional Reactive Programming.

私藏温柔 2025-02-11 10:50:26

查看提供的代码,我不会尝试在反应堆Netty级别上处理它。

最初,关于控制器实施的几条评论是因为它有多个违反反应性原则的问题。我建议花一些时间学习反应性API,但这里有一些提示

  1. 在您订阅之前,没有发生任何反应性。同时,请致电 subscribe 明确是一种反模式,应避免在创建类似于WebFlux的框架之前。

  2. 并行调度程序应用于运行非阻止逻辑,直到您有一些阻止代码为止。

  3. doon ... 是所谓的副作用操作员,不应用于构造反应流。

@PostMapping
public Mono<String> create(Event event) {
    // do query based on some logic depending on event data
    return repo.find(query)
            .map(e -> event.setData(event.getData() + e.getData()))
            .switchIfEmpty(Mono.just(event))
            .flatMap(e -> repo.save(e));
}

现在,由于网络故障,可能的重试等,预定义序列中的处理请求可能很棘手。如果您从未得到请求B或请求C,该怎么办?您还应该坚持要求吗?

正如@ttarczynski在他的评论中提到的那样,最好的选择是重新设计API并发送单个请求。

如果不是一个选项,您需要将某些状态引入“推迟”请求处理,然后根据一致性语义,将它们作为“批次”处理时,或者在收到最后一个请求时将其处理为“批处理”,或者只是推迟请求C,直到您获得请求&amp; B

Looking at the provided code I would not try to handle it on Reactor Netty level.

At first, several comments regarding controller implementation because it has multiple issues that violate reactive principles. I would recommend to spend some time learning reactive API but here are some hints

  1. In reactive nothing happens until you subscribe. At the same time calling subscribe explicitly is an anti-pattern and should be avoided until you are creating framework similar to WebFlux.

  2. parallel scheduler should be used to run non-blocking logic until you have some blocking code.

  3. doOn... are so-called side-effect operators and should not be used for constructing reactive flows.

@PostMapping
public Mono<String> create(Event event) {
    // do query based on some logic depending on event data
    return repo.find(query)
            .map(e -> event.setData(event.getData() + e.getData()))
            .switchIfEmpty(Mono.just(event))
            .flatMap(e -> repo.save(e));
}

Now, processing requests in the predefined sequence could be tricky because of network failures, possible retries, etc. What if you never get Request B or Request C? Should you still persist Request A?

As @ttarczynski mentioned in his comment the best option is to redesign API and send single request.

In case it's not an option you would need to introduce some state to "postpone" request processing and then, depending on consistency semantic, process them as a "batch" when the last request is received or just defer Request C until you get Request A & B.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文