过滤带有从另一个通量的分机单元的通量

发布于 2025-01-28 09:54:52 字数 957 浏览 2 评论 0原文

我有两个来自ReactivemongoDB API的通量: 第一个将获取 date1 的数据 第二个获取 date2 的数据 到最后,我想排除从date2中仍出现的元素,以使那些消失的元素并将其保存在另一个集合中。 为了解决该问题,我使用标签来收集第二个日期的元素(Date2),因此我可以通过排除标签中的元素来过滤第一个日期(date2)的通量。

Flux<Report> reportOfCurrentDate = reportRepository.get(LocalDate.now());
Flux<Report> reportOfPrevDate = reportRepository.get(LocalDate.now().minusDays(1));

reportOfCurrentDate.log("insert In Set :").collect(Collectors.toSet());

// filtering 
reportOfPrevDate
.filterWhen(
                report -> resourceIds
                        .map(installedResId -> !installedResId.contains(report.getInstalledResInstalledResourceId()))
)
.transform(rereports -> resolvedRepository.insertAll(reports.collectList())
.blockLast();

请注意,通量正在使数据形成非常大的集合 问题是我只能在一个磁通量中(插入一组插入的磁通)似乎没有反应性的行为是预期的是,这两个通量将并行运行,过滤数据并最终插入新集合中。 有人可以帮助我,还是建议另一种做到这一点?

I have two flux coming from ReactiveMongoDB API:
the first will gets the data of date1
the second one gets the data of date2
by the end i want to exclude elements that still appearing from the date1 in date2, In order to get those that are disappeared and save them in another collection.
To deal with That I used a HashSet to collect the element of the second date (date2), so I can filter the flux of the first date (date2) by excluding the element in the hashset.

Flux<Report> reportOfCurrentDate = reportRepository.get(LocalDate.now());
Flux<Report> reportOfPrevDate = reportRepository.get(LocalDate.now().minusDays(1));

reportOfCurrentDate.log("insert In Set :").collect(Collectors.toSet());

// filtering 
reportOfPrevDate
.filterWhen(
                report -> resourceIds
                        .map(installedResId -> !installedResId.contains(report.getInstalledResInstalledResourceId()))
)
.transform(rereports -> resolvedRepository.insertAll(reports.collectList())
.blockLast();

Note that the flux is getting the data form a very large collection
The problem is i get just one flux running (the one that insert in a set) it doesn't seem reactive the behavior expected is that the two fluxs will run in parallel, filtering data, and insert in the end in a new collection.
any one can help me with that, or suggest another way of doing it ?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文