如何在磁通中找到第一匹匹配并停止在反应堆中加工

发布于 2025-01-22 05:36:19 字数 666 浏览 1 评论 0原文

无法弄清楚如何在第一次比赛中停止处理通量。

这是我现在所拥有的:

findAll(): Flux<Object>
findStorageId(Relation r): Mono<Long> | Mono.empty()
isPassing(Relation r): boolean

findAll().flatMap(p -> {
  return Flux.fromStream(p.getRelations().stream()).flatMap(r -> {
    return isPassing(r) ? findStorageId(r) : Mono.empty();
  });
})
.handle((Long storageId, SynchronousSink<Long> sink) -> {
  if (storageId != null) {
    sink.next(storageId);
    sink.complete();
  }
})
.next()
.switchIfEmpty(Mono.error(new RuntimeException("Can't find storageId.")));

我正在尝试了解第一次存储ID时如何中断通量的处理。现在我看到,第一个flatmap在找到第一匹配后继续工作。

Can't figure out how to stop processing Flux on first match.

This what I have right now:

findAll(): Flux<Object>
findStorageId(Relation r): Mono<Long> | Mono.empty()
isPassing(Relation r): boolean

findAll().flatMap(p -> {
  return Flux.fromStream(p.getRelations().stream()).flatMap(r -> {
    return isPassing(r) ? findStorageId(r) : Mono.empty();
  });
})
.handle((Long storageId, SynchronousSink<Long> sink) -> {
  if (storageId != null) {
    sink.next(storageId);
    sink.complete();
  }
})
.next()
.switchIfEmpty(Mono.error(new RuntimeException("Can't find storageId.")));

I'm trying to understand how I can interrupt processing of flux when first storageId is found. Right now I see, that first flatMap continues to work after finding first match.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

み格子的夏天 2025-01-29 05:36:19

问题在于FlatMap使用并发,预取大于1。
在这种情况下,如果您不想多次调用数据库,但需要一个与prefatch 1一起使用ConcatMap。

  public static final String TO_BE_FOUND = "B";

  @Override
  public void run(String... args) throws Exception {
    Mono<String> storageId =
        Flux.just("A", "B", "C", "D", "A")
            .doOnNext(id -> System.out.printf("processing: %s\n", id))
            .concatMap(s -> findStorageId(s),1)
            .next()
            .switchIfEmpty(Mono.error(new RuntimeException("Can't find storageId.")));
    storageId.subscribe();
  }

  private static Mono<String> findStorageId(String s) {
    return TO_BE_FOUND.equals(s)
        ? Mono.just(s + UUID.randomUUID()).delayElement(Duration.ofSeconds(1))
        : Mono.delay(Duration.ofSeconds(1)).flatMap(aLong -> Mono.empty());
  }

在这种情况下,与Prefetch 1 Concatmap 1将要求元素1逐一请求元素,并且它将等待响应。

The problem is that flatmap is using using concurrency and prefetch is more than 1.
In this case if you dont want to call the database many times but one by one you need to use concatmap with prefatch 1.

  public static final String TO_BE_FOUND = "B";

  @Override
  public void run(String... args) throws Exception {
    Mono<String> storageId =
        Flux.just("A", "B", "C", "D", "A")
            .doOnNext(id -> System.out.printf("processing: %s\n", id))
            .concatMap(s -> findStorageId(s),1)
            .next()
            .switchIfEmpty(Mono.error(new RuntimeException("Can't find storageId.")));
    storageId.subscribe();
  }

  private static Mono<String> findStorageId(String s) {
    return TO_BE_FOUND.equals(s)
        ? Mono.just(s + UUID.randomUUID()).delayElement(Duration.ofSeconds(1))
        : Mono.delay(Duration.ofSeconds(1)).flatMap(aLong -> Mono.empty());
  }

in this case concatmap with prefetch 1 will request elements 1 by one and it will wait for the response.

悲念泪 2025-01-29 05:36:19

对我来说,它使用flatmap→Next→OnError,不需要handle

  • flatmap:该方法返回字符串empty
  • next的单声道:如果,请返回第一个或空的FlatMap始终返回空
  • OnError:根据您的示例进行错误处理,

这意味着您的示例应该像您发布的那样工作,甚至不需要调用handle

示例代码:

我们在将其传递给flatmap之前登录,这样我们就可以检查第一个非空映射mono

之后是否进一步处理流。

    public static final String TO_BE_FOUND = "B";

    public static void main(String[] args) {
        Mono<String> storageId = Flux.just("A", "B", "C", "D", "A")
                .doOnNext(id -> System.out.printf("processing: %s\n", id))
                .flatMap(s -> findStorageId(s))
                .next()
                .switchIfEmpty(
                        Mono.error(new RuntimeException("Can't find storageId."))
                );

        storageId.subscribe(id -> System.out.printf("storageId found: %s\n", id));
    }

    private static Mono<String> findStorageId(String s) {
        return TO_BE_FOUND.equals(s) ? Mono.just(s + UUID.randomUUID()) : Mono.empty();
    }

输出to_be_found之后 是否进一步处理流。 =“ B”

找到 flux 在找到firs storegationId之后不会进一步处理。

processing: A
processing: B
storageId found: B85bcdbcb-2903-4962-96ab-b3a97b0c091f

输出当to_be_found =“ x”

processing: A
processing: B
processing: C
processing: D
processing: A
12:52:22.555 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Can't find storageId.
Caused by: java.lang.RuntimeException: Can't find storageId.

For me it worked out using flatMap → next → onError, the handle is not needed.

  • flatMap: the method returns a Mono of String or empty
  • next: returns the first or empty if flatMap always returned empty
  • onError: error handling according to your example

this means that your example should work like you posted it and you don't even need to call handle

Example code:

we log before we pass it to flatMap, that way we can check if the stream is processed further after the first non empty mapped Mono

    public static final String TO_BE_FOUND = "B";

    public static void main(String[] args) {
        Mono<String> storageId = Flux.just("A", "B", "C", "D", "A")
                .doOnNext(id -> System.out.printf("processing: %s\n", id))
                .flatMap(s -> findStorageId(s))
                .next()
                .switchIfEmpty(
                        Mono.error(new RuntimeException("Can't find storageId."))
                );

        storageId.subscribe(id -> System.out.printf("storageId found: %s\n", id));
    }

    private static Mono<String> findStorageId(String s) {
        return TO_BE_FOUND.equals(s) ? Mono.just(s + UUID.randomUUID()) : Mono.empty();
    }

Output when TO_BE_FOUND = "B":

The Flux will not be processed further after the firs storageId was found.

processing: A
processing: B
storageId found: B85bcdbcb-2903-4962-96ab-b3a97b0c091f

Output when TO_BE_FOUND = "X":

processing: A
processing: B
processing: C
processing: D
processing: A
12:52:22.555 [main] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Can't find storageId.
Caused by: java.lang.RuntimeException: Can't find storageId.
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文