如何使用Spring Flux并行对同一服务的平行呼叫

发布于 2025-01-21 18:58:41 字数 1587 浏览 0 评论 0原文

我正在使用弹簧反应式工作,需要使用WebClient将多个呼叫依次致电其他REST API。 问题是我能够致电其他REST API的多个呼叫,但是在没有订阅或阻止的情况下,响应无法阅读。 由于非反应性编程,我无法使用订阅或阻止。有什么办法,我可以在阅读响应时合并并将其作为通量发送。 以下是我被卡住的代码。

public Mono<DownloadDataLog> getDownload(Token dto, Mono<DataLogRequest> request) {
    Mono<GraphQlCustomerResponse> profileResponse = customerProfileHandler.getMyUsageHomeMethods(dto, null);
    DownloadDataLog responseObj = new DownloadDataLog();
    ArrayList<Mono<List<dataUsageLogs>>> al = new ArrayList<>();
    return Mono.zip(profileResponse, request).flatMap(tuple2 -> {
        Flux<List<Mono<DataLogGqlRequest>>> userequest = prepareUserRequest(getListOfMdns(tuple2.getT1()),
                tuple2.getT2());              
        Flux.from(userequest).flatMap(req -> {
            for (Mono<DataLogGqlRequest> logReq : req) {
                al.add(service.execute(logReq, dto));
            }
            responseObj.setAl(al);
            return Mono.empty();
        }).subscribe();
          return Mono.just(responseObj);

    });

}

private Mono<DataLogGqlRequest> prepareInnerRequest(Mono<DataLogGqlRequest> itemRequest, int v1,int v2){
    return itemRequest.flatMap(req -> {
        DataLogGqlRequest userRequest = new DataLogGqlRequest();
        userRequest.setBillDate(req.getBillDate()); 
        userRequest.setMdnNumber(req.getMdnNumber());
        userRequest.setCposition(v1+"");    
        userRequest.setPposition(v2+"");
        return Mono.just(userRequest);
    });
    
    
}

I am working on spring reactive and need to call multiple calls sequentially to other REST API using webclient.
The issue is I am able to call multiple calls to other Rest API but response am not able to read without subscribe or block.
I can't use subscribe or block due to non reactive programming. Is there any way, i can merge while reading the response and send it as flux.
Below is the piece of code where I am stuck.

public Mono<DownloadDataLog> getDownload(Token dto, Mono<DataLogRequest> request) {
    Mono<GraphQlCustomerResponse> profileResponse = customerProfileHandler.getMyUsageHomeMethods(dto, null);
    DownloadDataLog responseObj = new DownloadDataLog();
    ArrayList<Mono<List<dataUsageLogs>>> al = new ArrayList<>();
    return Mono.zip(profileResponse, request).flatMap(tuple2 -> {
        Flux<List<Mono<DataLogGqlRequest>>> userequest = prepareUserRequest(getListOfMdns(tuple2.getT1()),
                tuple2.getT2());              
        Flux.from(userequest).flatMap(req -> {
            for (Mono<DataLogGqlRequest> logReq : req) {
                al.add(service.execute(logReq, dto));
            }
            responseObj.setAl(al);
            return Mono.empty();
        }).subscribe();
          return Mono.just(responseObj);

    });

}

private Mono<DataLogGqlRequest> prepareInnerRequest(Mono<DataLogGqlRequest> itemRequest, int v1,int v2){
    return itemRequest.flatMap(req -> {
        DataLogGqlRequest userRequest = new DataLogGqlRequest();
        userRequest.setBillDate(req.getBillDate()); 
        userRequest.setMdnNumber(req.getMdnNumber());
        userRequest.setCposition(v1+"");    
        userRequest.setPposition(v2+"");
        return Mono.just(userRequest);
    });
    
    
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文