如何在磁通中找到第一匹匹配并停止在反应堆中加工
无法弄清楚如何在第一次比赛中停止处理通量。
这是我现在所拥有的:
findAll(): Flux<Object>
findStorageId(Relation r): Mono<Long> | Mono.empty()
isPassing(Relation r): boolean
findAll().flatMap(p -> {
return Flux.fromStream(p.getRelations().stream()).flatMap(r -> {
return isPassing(r) ? findStorageId(r) : Mono.empty();
});
})
.handle((Long storageId, SynchronousSink<Long> sink) -> {
if (storageId != null) {
sink.next(storageId);
sink.complete();
}
})
.next()
.switchIfEmpty(Mono.error(new RuntimeException("Can't find storageId.")));
我正在尝试了解第一次存储ID时如何中断通量的处理。现在我看到,第一个flatmap
在找到第一匹配后继续工作。
Can't figure out how to stop processing Flux on first match.
This what I have right now:
findAll(): Flux<Object>
findStorageId(Relation r): Mono<Long> | Mono.empty()
isPassing(Relation r): boolean
findAll().flatMap(p -> {
return Flux.fromStream(p.getRelations().stream()).flatMap(r -> {
return isPassing(r) ? findStorageId(r) : Mono.empty();
});
})
.handle((Long storageId, SynchronousSink<Long> sink) -> {
if (storageId != null) {
sink.next(storageId);
sink.complete();
}
})
.next()
.switchIfEmpty(Mono.error(new RuntimeException("Can't find storageId.")));
I'm trying to understand how I can interrupt processing of flux when first storageId is found. Right now I see, that first flatMap
continues to work after finding first match.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
问题在于FlatMap使用并发,预取大于1。
在这种情况下,如果您不想多次调用数据库,但需要一个与prefatch 1一起使用ConcatMap。
在这种情况下,与Prefetch 1 Concatmap 1将要求元素1逐一请求元素,并且它将等待响应。
The problem is that flatmap is using using concurrency and prefetch is more than 1.
In this case if you dont want to call the database many times but one by one you need to use concatmap with prefatch 1.
in this case concatmap with prefetch 1 will request elements 1 by one and it will wait for the response.
对我来说,它使用
flatmap→Next→OnError
,不需要handle
。flatmap
:该方法返回字符串
或empty
next
的单声道:如果,请返回第一个或空的FlatMap
始终返回空OnError
:根据您的示例进行错误处理,这意味着您的示例应该像您发布的那样工作,甚至不需要调用
handle
示例代码:
输出
to_be_found之后 是否进一步处理流。 =“ B”
:输出当
to_be_found =“ x”
:For me it worked out using
flatMap → next → onError
, thehandle
is not needed.flatMap
: the method returns a Mono ofString
orempty
next
: returns the first or empty ifflatMap
always returned emptyonError
: error handling according to your examplethis means that your example should work like you posted it and you don't even need to call
handle
Example code:
Output when
TO_BE_FOUND = "B"
:Output when
TO_BE_FOUND = "X"
: