如何在Java中的反应堆通量中缓冲和组元素

发布于 2025-01-26 11:51:16 字数 1154 浏览 5 评论 0原文

给定每个对象都有ID的无限通量,我如何使用磁通量来创建由ID属性分组的更新的缓冲列表(保持最后一个发射值)? 感谢

示例

    Obj(ID=A, V=1)
    Obj(ID=A, V=2)
    Obj(ID=B, V=3) 
    --- buffer -> I want to subscribe with a list of [Obj(ID=A, V=2), Obj(ID=B, V=3)]
    Obj(ID=A, V=1)
    Obj(ID=B, V=4)
    Obj(ID=B, V=6)
    Obj(ID=A, V=2)
    --- buffer -> I want to subscribe with a list of [Obj(ID=B, V=6), Obj(ID=A, V=2)]
    Obj(ID=B, V=1)
    --- buffer -> I want to subscribe with a list of [Obj(ID=B, V=1)]

类似以下内容是完美的,但似乎等待我的测试中的通量结束而不是缓冲。

flux
    .buffer(Duration.ofMillis(2000))
    .groupBy(Obj::getId)
    .flatMap(GroupedFlux::getLast)
    .collectToList()
    .subscribe(this::printList);

它可以与缓冲区和自定义逻辑进行分组

    public static void main(String[] args) {
        flux.buffer(Duration.ofMillis(2000)).subscribe(this::groupList);
    }

    private void groupList(List<T> ts) {
        Collection<T> values = ts.stream()
                .collect(Collectors.toMap(T::getId, Function.identity(), (k, v) -> v))
                .values();
        System.out.println(values);
    }

Given an infinite flux of objects, where each object has an ID, how can I use flux to create a buffered list of updates grouped by ID property (keeping the last emitted value)?
Thanks

Example

    Obj(ID=A, V=1)
    Obj(ID=A, V=2)
    Obj(ID=B, V=3) 
    --- buffer -> I want to subscribe with a list of [Obj(ID=A, V=2), Obj(ID=B, V=3)]
    Obj(ID=A, V=1)
    Obj(ID=B, V=4)
    Obj(ID=B, V=6)
    Obj(ID=A, V=2)
    --- buffer -> I want to subscribe with a list of [Obj(ID=B, V=6), Obj(ID=A, V=2)]
    Obj(ID=B, V=1)
    --- buffer -> I want to subscribe with a list of [Obj(ID=B, V=1)]

Something like the following would be perfect but it seems to wait the end of the flux in my tests instead of buffering.

flux
    .buffer(Duration.ofMillis(2000))
    .groupBy(Obj::getId)
    .flatMap(GroupedFlux::getLast)
    .collectToList()
    .subscribe(this::printList);

It works with buffer and custom logic for grouping

    public static void main(String[] args) {
        flux.buffer(Duration.ofMillis(2000)).subscribe(this::groupList);
    }

    private void groupList(List<T> ts) {
        Collection<T> values = ts.stream()
                .collect(Collectors.toMap(T::getId, Function.identity(), (k, v) -> v))
                .values();
        System.out.println(values);
    }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

伊面 2025-02-02 11:51:16

缓冲区将发射list&lt; t&gt;,因此您可以使用非反应性Java进行分组。例如,Java流像您的示例一样。假设您的过程功能是反应性的,您可以继续流动

flux
    .buffer(Duration.ofMillis(2000))
    .map(list -> list.stream().collect(Collectors.toMap(Entry::getId, Function.identity(), (k, v) -> v)))
    .flatMapIterable(Map::values)
    .flatMap(obj -> process(obj));

buffer will emit List<T>, therefore you could just use non-reactive java to group by. For example, java streams like in your example. Assuming your process function is reactive, you could continue the flow

flux
    .buffer(Duration.ofMillis(2000))
    .map(list -> list.stream().collect(Collectors.toMap(Entry::getId, Function.identity(), (k, v) -> v)))
    .flatMapIterable(Map::values)
    .flatMap(obj -> process(obj));
那伤。 2025-02-02 11:51:16

具有类似的用例。在AWS S3上拥有模拟数据。数据以JSON记录存储在512的块中。需要下载可变数量的这些块并将其转换为单个JSON数组。变压器信号表明它不再需要通过返回转换记录的数量来进行记录 - 当不需要进一步的记录时,这将为零。以下是两个代码片段。第一个使用 flux.window ,而后者使用 flux.buffer 。后者的执行时间约为前者的速度十分之一,但下载的数据比绝对必要的数据要多。

flux.window 代码

    Flux<Flux<S3Object>> s3Objects = Flux.fromIterable(filteredList).window(windowSize);
    Integer cumulativeSize = s3Objects.concatMap(innerS3Objects -> {
        Flux <File> downloadedFiles = innerS3Objects.publishOn(Schedulers.boundedElastic()).concatMap(s3Object -> downloadS3Object(BUCKET_NAME, s3Object.key()));
        return downloadedFiles.publishOn(Schedulers.single()).concatMap(downloadedFile -> {
            return transformMockarooData(downloadedFile, generator, targetHourCount);
        }).takeUntil(transformedRecordCount -> (transformedRecordCount == 0)).reduce((i,j) -> i + j);               
    }).takeUntil(transformedRecordCount -> (transformedRecordCount == 0)).reduce((i,j) -> i + j).block();

flux.buffer 代码

    Flux<List<S3Object>> s3Objects = Flux.fromIterable(filteredList).buffer(windowSize);
    Integer cumulativeSize = s3Objects.concatMap(innerS3Objects -> {
        List <Mono<File>> toDownloadFiles = innerS3Objects.stream().map(s3Object -> downloadS3Object(BUCKET_NAME, s3Object.key())).collect(Collectors.toList());
        Flux <File> downloadedFiles = Flux.merge(toDownloadFiles);
        return downloadedFiles.publishOn(Schedulers.single()).concatMap(downloadedFile -> {
            return transformMockarooData(downloadedFile, generator, targetHourCount);
        }).takeUntil(transformedRecordCount -> (transformedRecordCount == 0)).reduce((i,j) -> i + j);               
    }).takeUntil(transformedRecordCount -> (transformedRecordCount == 0)).reduce((i,j) -> i + j).block();

Have a somewhat similar use case. Have simulation data on AWS S3. The data is stored as JSON records in chunks of 512. Need to download a variable number of these chunks and transform them into a single JSON array. The transformer signals that it needs no more records by returning the number of transformed records - which would be zero when there is no need of further records. Below are two snippets of code. The first uses Flux.window while the latter uses Flux.buffer. The latter executes at about one tenth the speed of the former but downloads more data than is absolutely necessary.

Flux.window code

    Flux<Flux<S3Object>> s3Objects = Flux.fromIterable(filteredList).window(windowSize);
    Integer cumulativeSize = s3Objects.concatMap(innerS3Objects -> {
        Flux <File> downloadedFiles = innerS3Objects.publishOn(Schedulers.boundedElastic()).concatMap(s3Object -> downloadS3Object(BUCKET_NAME, s3Object.key()));
        return downloadedFiles.publishOn(Schedulers.single()).concatMap(downloadedFile -> {
            return transformMockarooData(downloadedFile, generator, targetHourCount);
        }).takeUntil(transformedRecordCount -> (transformedRecordCount == 0)).reduce((i,j) -> i + j);               
    }).takeUntil(transformedRecordCount -> (transformedRecordCount == 0)).reduce((i,j) -> i + j).block();

Flux.buffer code

    Flux<List<S3Object>> s3Objects = Flux.fromIterable(filteredList).buffer(windowSize);
    Integer cumulativeSize = s3Objects.concatMap(innerS3Objects -> {
        List <Mono<File>> toDownloadFiles = innerS3Objects.stream().map(s3Object -> downloadS3Object(BUCKET_NAME, s3Object.key())).collect(Collectors.toList());
        Flux <File> downloadedFiles = Flux.merge(toDownloadFiles);
        return downloadedFiles.publishOn(Schedulers.single()).concatMap(downloadedFile -> {
            return transformMockarooData(downloadedFile, generator, targetHourCount);
        }).takeUntil(transformedRecordCount -> (transformedRecordCount == 0)).reduce((i,j) -> i + j);               
    }).takeUntil(transformedRecordCount -> (transformedRecordCount == 0)).reduce((i,j) -> i + j).block();
回忆凄美了谁 2025-02-02 11:51:16

我能够通过反应性分组来实现它

flux.window(Duration.ofMillis(2000))
    .flatMap(window -> window.groupBy(Entry::getId)
        .flatMap(GroupedFlux::last)
        .collectList()
    )
    .subscribe(this::printList);

I was able to achieve it with the reactive grouping

flux.window(Duration.ofMillis(2000))
    .flatMap(window -> window.groupBy(Entry::getId)
        .flatMap(GroupedFlux::last)
        .collectList()
    )
    .subscribe(this::printList);
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文