在后台等待单声道完成的最简单方法是什么?

发布于 2025-01-27 16:44:02 字数 408 浏览 2 评论 0原文

我们有一个单声道,正在处理一些操作(例如数据库更新),并返回一个值。
我们想将单声道(转换)添加到一个特殊列表中,其中包含要完成的操作,例如在关闭过程中完成。
在添加到列表后,立即开始处理或.subscribe()可能不称为单声道可能会急切地订阅。

在关闭期间,我们可以通过以下方式在列表中迭代:

for (Mono mono : specialList) {
  Object value = mono.block(); // (do something with value)
}

如何转换原始单声道,以使关闭代码执行,并且以前订阅了单声道(),该操作将不会再次触发,但是它将等待它等待它才能等待它完整还是重播其存储的返回值?

We are given a Mono, that's handling some action(say a database update), and returns a value.
We want to add that Mono(transformed) to a special list that contains actions to be completed for example during shutdown.
That mono may be eagerly subscribed after adding to the list, to start processing now, or .subscribe() might not be called meaning it will be only subscribed during shutdown.

During shutdown we can iterate on the list in the following way:

for (Mono mono : specialList) {
  Object value = mono.block(); // (do something with value)
}

How to transform the original Mono such that when shutdown code executes, and Mono was previously subscribed(), the action will not be triggered again but instead it will either wait for it to complete or replay it's stored return value?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

梦魇绽荼蘼 2025-02-03 16:44:02

好的,看起来它就像调用mono.cache()一样简单,所以这就是我在实践中使用它的方式

    public Mono<Void> addShutdownMono(Mono<Void> mono) {
        mono = mono.cache();
        Mono<Void> newMono = mono.doFinally(signal -> shutdownMonos.remove(mono));
        shutdownMonos.add(mono);

        return newMono;
    }

    public Function<Mono<Void>,Mono<Void>> asShutdownAwaitable() {
        return mono -> addShutdownMono(mono);
    }
database.doSomeAction()
  .as(asShutdownAwaitable)
  .subscribe() // Or don't subscribe at all, deferring until shutdown

,这是实际的关闭代码。
对我来说,也很重要的是,如果用户选择不急切地订阅它们,则他们执行执行,这就是flux.concat而不是flux.merge的原因。

public void shutdown() {
        Flux.concat(Lists.transform(new ArrayList<>(shutdownMonos), mono -> mono.onErrorResume(err -> {
                    logger.error("Async exception during shutdown, ignoring", err);
                    return Mono.empty();
                }))
        ).blockLast();
    }

OK, looks like it is as simple as calling mono.cache(), so this is how I used it in practice

    public Mono<Void> addShutdownMono(Mono<Void> mono) {
        mono = mono.cache();
        Mono<Void> newMono = mono.doFinally(signal -> shutdownMonos.remove(mono));
        shutdownMonos.add(mono);

        return newMono;
    }

    public Function<Mono<Void>,Mono<Void>> asShutdownAwaitable() {
        return mono -> addShutdownMono(mono);
    }
database.doSomeAction()
  .as(asShutdownAwaitable)
  .subscribe() // Or don't subscribe at all, deferring until shutdown

Here is the actual shutdown code.
It was also important to me that they execute in order of being added, if user chose not to eagerly subscribe them, that's reason for Flux.concat instead of Flux.merge.

public void shutdown() {
        Flux.concat(Lists.transform(new ArrayList<>(shutdownMonos), mono -> mono.onErrorResume(err -> {
                    logger.error("Async exception during shutdown, ignoring", err);
                    return Mono.empty();
                }))
        ).blockLast();
    }

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文