反应堆项目中的条件平行性
在某些流中,必须基于特定字段并平行解析消息。
使用 GroupBy
不是一个选项,因为它会收集大量结果然后对它们进行分组(它引入了延迟)。
另外,如果使用 subscribeon(Schedulers.Single())
,则结果还可以,但它引入了饥饿问题。
作为示例订单,应顺序执行具有相同 batchid
的batchid 。因此,订单1和2应顺序处理,并且可以平行执行订单3。
record Order(Integer id,Integer batchId){}
void testParallel() {
Flux.just(new Order(1,1),new Order(2,1),new Order(3,2));
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
您应该能够创建2个单独的
flux
实例。一个处理每个发射的项目顺序的,另一个处理它们并并行处理它们。然后,您可以合并
将它们纳入单个flux
。按照上面示例的行
,元素
1、2、3
将被顺序处理,而元素3、4、5
将并行处理。Note
您尚未清楚需要应用哪种条件逻辑,因此我现在只应用了一些虚拟逻辑。
另外,
collectList()
仅应与有限流一起使用。-
此操作员等待一个内部完成,然后再生成下一个并订阅它。
merge docs -
Unlike concat, sources are subscribed to eagerly< /代码>
You should be able to create 2 separate
Flux
instances. One that process each emitted item sequential and another that process them in parallel. You can thenmerge
them into a singleFlux
.Something along the lines of
In the example above, elements
1, 2, 3
will be processed sequentially whereas elements3, 4, 5
will processed in parallel.Note
You did not make it clear what conditional logic needs to be applied so I have just apply some dummy logic for now.
Also,
collectList()
should only be used with finite streams.concatMap docs -
this operator waits for one inner to complete before generating the next one and subscribing to it.
merge docs -
Unlike concat, sources are subscribed to eagerly
需求有点模糊,但似乎
BufferuntilChanged
可以解决问题。在您的示例中,它将收集一批的连续元素,然后发出它们。这将返回A
flux&lt; list&lt; order&gt;&gt;
。您可以并行处理列表。WindownuntilChanged
也可以是一个选项。它将返回flux&lt; lux&lt; order&gt;&gt;
。The requirements are a bit vague but it seems like
bufferUntilChanged
could do the trick. In your example, it would collect successive elements of one batch and then emit them.This returns a
Flux<List<Order>>
. You can process the lists in parallel.windowUntilChanged
could also be an option. It would return aFlux<Flux<Order>>
.