WebFlux 中的 fromCallable 封装了多个阻塞调用

发布于 2025-01-15 20:09:49 字数 2167 浏览 1 评论 0原文

我在反应式 Java 中使用 Feign Client。 Feign 客户端有一个拦截器,它发送阻塞请求以获取身份验证令牌,并将其作为标头添加到 feign 请求中。

feign 请求通过 Schedulers.boundedElastic() 封装在 Mono.FromCallable 中。

我的问题是:获取身份验证令牌的内部调用是否被视为阻塞调用?

我知道这两个调用将位于与 Schedulers.boundedElastic() 不同的线程上,但不确定是否可以在同一线程上执行它们,或者我应该更改它,以便它们在不同的线程上运行。

Feign客户端:

@FeignClient(name = "remoteRestClient", url = "${remote.url}",
        configuration = AuthConfiguration.class, decode404 = true)
@Profile({ "!test" })
public interface RemoteRestClient {
    @GetMapping(value = "/getSomeData" )
    Data getData();
}

拦截器:

public class ClientRequestInterceptor implements RequestInterceptor {

    private IAPRequestBuilder iapRequestBuilder;
    private String clientName;

    public ClientRequestInterceptor(String clientName, String serviceAccount, String jwtClientId) {
        this.iapRequestBuilder = new IAPRequestBuilder(serviceAccount, jwtClientId);
        this.clientName = clientName;
    }

    @Override
    public void apply(RequestTemplate template) {
        try {
            HttpRequest httpRequest = iapRequestBuilder.buildIapRequest();   <---- blocking call
            template.header(HttpHeaders.AUTHORIZATION, httpRequest.getHeaders().getAuthorization());
        } catch (IOException e) {
            log.error("Building an IAP request has failed: {}", e.getMessage(), e);
            throw new InterceptorException(String.format("failed to build IAP request for %s", clientName), e);
        }
    }
}

feign配置:

public class AuthConfiguration {
    @Value("${serviceAccount}")
    private String serviceAccount;

    @Value("${jwtClientId}")
    private String jwtClientId;

    @Bean
    public ClientRequestInterceptor getClientRequestInterceptor() {
        return new ClientRequestInterceptor("Entitlement", serviceAccount, jwtClientId);
    }
}

和feign客户端调用:

  private Mono<Data> getData() {
return Mono.fromCallable(() -> RemoteRestClient.getData()
                                .publishOn(Schedulers.boundedElastic());
    }

I'm using Feign Client in Reactive Java. The Feign client has an interceptor that sends a blocking request to get auth token and adds it as a header to the feign request.

the feign request is wrapped in Mono.FromCallable with Schedulers.boundedElastic().

my question is: does the inner call to get the auth token considered as a blocking call?

I get that both calls will be on a different thread from Schedulers.boundedElastic() but not sure is ok to execute them on the same thread or I should change it so they'll run on different threads.

Feign client:

@FeignClient(name = "remoteRestClient", url = "${remote.url}",
        configuration = AuthConfiguration.class, decode404 = true)
@Profile({ "!test" })
public interface RemoteRestClient {
    @GetMapping(value = "/getSomeData" )
    Data getData();
}

interceptor:

public class ClientRequestInterceptor implements RequestInterceptor {

    private IAPRequestBuilder iapRequestBuilder;
    private String clientName;

    public ClientRequestInterceptor(String clientName, String serviceAccount, String jwtClientId) {
        this.iapRequestBuilder = new IAPRequestBuilder(serviceAccount, jwtClientId);
        this.clientName = clientName;
    }

    @Override
    public void apply(RequestTemplate template) {
        try {
            HttpRequest httpRequest = iapRequestBuilder.buildIapRequest();   <---- blocking call
            template.header(HttpHeaders.AUTHORIZATION, httpRequest.getHeaders().getAuthorization());
        } catch (IOException e) {
            log.error("Building an IAP request has failed: {}", e.getMessage(), e);
            throw new InterceptorException(String.format("failed to build IAP request for %s", clientName), e);
        }
    }
}

feign configuration:

public class AuthConfiguration {
    @Value("${serviceAccount}")
    private String serviceAccount;

    @Value("${jwtClientId}")
    private String jwtClientId;

    @Bean
    public ClientRequestInterceptor getClientRequestInterceptor() {
        return new ClientRequestInterceptor("Entitlement", serviceAccount, jwtClientId);
    }
}

and feign client call:

  private Mono<Data> getData() {
return Mono.fromCallable(() -> RemoteRestClient.getData()
                                .publishOn(Schedulers.boundedElastic());
    }

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

苏辞 2025-01-22 20:09:49

您可以看出这是一个阻塞调用,因为它返回一个具体类而不是 FutureMonoFlux)。为了能够返回具体的类,线程需要等待,直到我们得到返回它的响应。

所以是的,这很可能是一个阻塞调用。

Reactor 建议您使用 subscribeOn< /code> 运算符在执行阻塞调用时,这会将整个运算符链放置在其自己的线程池上。

您已选择使用 publishOn ,并且值得指出 文档

影响后续运算符的执行位置

这实际上意味着在 publishOn 运算符之前,所有操作都将使用任何可用的匿名线程执行。

但是之后的所有调用都将在定义的线程池上执行。

private Mono<Data> getData() {
    return Mono.fromCallable(() -> RemoteRestClient.getData()
                                .publishOn(Schedulers.boundedElastic());
}

您已选择将其放置在之后,以便线程池切换将在调用getData之后完成。

publishOn 在链中的放置很重要,而 subscribeOn 会影响整个操作符链,这意味着它的放置并不重要。

因此,再次回答您的问题,是的,这很可能是一个阻塞调用(我无法 100% 确认,因为我没有查看源代码)以及您希望如何使用 publishOn subscribeOn 取决于您。

或者看看是否有一个反应式替代库可供使用。

You can sort of tell that it is a blocking call since it returns a concrete class and not a Future (Mono or Flux). To be able to return a concrete class, the thread needs to wait until we have the response to return it.

So yes it is most likely a blocking call.

Reactor recommends that you use the subscribeOn operator when doing blocking calls, this will place that entire chain of operators on its own thread pool.

You have chosen to use the publishOn and it is worth pointing out the following from the docs:

affects where the subsequent operators execute

This in practice means that up until the publishOn operator all actions will be executed using any available anonymous thread.

But all calls after will be executed on the defined thread pool.

private Mono<Data> getData() {
    return Mono.fromCallable(() -> RemoteRestClient.getData()
                                .publishOn(Schedulers.boundedElastic());
}

You have chosen to place it after so the thread pool switch will be done after the call to getData.

publishOns placing in the chain matters while subscribeOn affects the entire chain of operator which means it's placing does not matter.

So to answer your question again, yes it is most likely a blocking call (i can't confirm by 100% since i have not looked into the source code) and how you wish to solve it with either publishOn on subscribeOn is up to you.

Or look into if there is an reactive alternative library to use.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文