WebFlux 中的 fromCallable 封装了多个阻塞调用
我在反应式 Java 中使用 Feign Client。 Feign 客户端有一个拦截器,它发送阻塞请求以获取身份验证令牌,并将其作为标头添加到 feign 请求中。
feign 请求通过 Schedulers.boundedElastic()
封装在 Mono.FromCallable
中。
我的问题是:获取身份验证令牌的内部调用是否被视为阻塞调用?
我知道这两个调用将位于与 Schedulers.boundedElastic() 不同的线程上,但不确定是否可以在同一线程上执行它们,或者我应该更改它,以便它们在不同的线程上运行。
Feign客户端:
@FeignClient(name = "remoteRestClient", url = "${remote.url}",
configuration = AuthConfiguration.class, decode404 = true)
@Profile({ "!test" })
public interface RemoteRestClient {
@GetMapping(value = "/getSomeData" )
Data getData();
}
拦截器:
public class ClientRequestInterceptor implements RequestInterceptor {
private IAPRequestBuilder iapRequestBuilder;
private String clientName;
public ClientRequestInterceptor(String clientName, String serviceAccount, String jwtClientId) {
this.iapRequestBuilder = new IAPRequestBuilder(serviceAccount, jwtClientId);
this.clientName = clientName;
}
@Override
public void apply(RequestTemplate template) {
try {
HttpRequest httpRequest = iapRequestBuilder.buildIapRequest(); <---- blocking call
template.header(HttpHeaders.AUTHORIZATION, httpRequest.getHeaders().getAuthorization());
} catch (IOException e) {
log.error("Building an IAP request has failed: {}", e.getMessage(), e);
throw new InterceptorException(String.format("failed to build IAP request for %s", clientName), e);
}
}
}
feign配置:
public class AuthConfiguration {
@Value("${serviceAccount}")
private String serviceAccount;
@Value("${jwtClientId}")
private String jwtClientId;
@Bean
public ClientRequestInterceptor getClientRequestInterceptor() {
return new ClientRequestInterceptor("Entitlement", serviceAccount, jwtClientId);
}
}
和feign客户端调用:
private Mono<Data> getData() {
return Mono.fromCallable(() -> RemoteRestClient.getData()
.publishOn(Schedulers.boundedElastic());
}
I'm using Feign Client in Reactive Java. The Feign client has an interceptor that sends a blocking request to get auth token and adds it as a header to the feign request.
the feign request is wrapped in Mono.FromCallable
with Schedulers.boundedElastic()
.
my question is: does the inner call to get the auth token considered as a blocking call?
I get that both calls will be on a different thread from Schedulers.boundedElastic()
but not sure is ok to execute them on the same thread or I should change it so they'll run on different threads.
Feign client:
@FeignClient(name = "remoteRestClient", url = "${remote.url}",
configuration = AuthConfiguration.class, decode404 = true)
@Profile({ "!test" })
public interface RemoteRestClient {
@GetMapping(value = "/getSomeData" )
Data getData();
}
interceptor:
public class ClientRequestInterceptor implements RequestInterceptor {
private IAPRequestBuilder iapRequestBuilder;
private String clientName;
public ClientRequestInterceptor(String clientName, String serviceAccount, String jwtClientId) {
this.iapRequestBuilder = new IAPRequestBuilder(serviceAccount, jwtClientId);
this.clientName = clientName;
}
@Override
public void apply(RequestTemplate template) {
try {
HttpRequest httpRequest = iapRequestBuilder.buildIapRequest(); <---- blocking call
template.header(HttpHeaders.AUTHORIZATION, httpRequest.getHeaders().getAuthorization());
} catch (IOException e) {
log.error("Building an IAP request has failed: {}", e.getMessage(), e);
throw new InterceptorException(String.format("failed to build IAP request for %s", clientName), e);
}
}
}
feign configuration:
public class AuthConfiguration {
@Value("${serviceAccount}")
private String serviceAccount;
@Value("${jwtClientId}")
private String jwtClientId;
@Bean
public ClientRequestInterceptor getClientRequestInterceptor() {
return new ClientRequestInterceptor("Entitlement", serviceAccount, jwtClientId);
}
}
and feign client call:
private Mono<Data> getData() {
return Mono.fromCallable(() -> RemoteRestClient.getData()
.publishOn(Schedulers.boundedElastic());
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您可以看出这是一个阻塞调用,因为它返回一个具体类而不是
Future
(Mono
或Flux
)。为了能够返回具体的类,线程需要等待,直到我们得到返回它的响应。所以是的,这很可能是一个阻塞调用。
Reactor 建议您使用
subscribeOn< /code> 运算符在执行阻塞调用时,这会将整个运算符链放置在其自己的线程池上。
您已选择使用
publishOn
,并且值得指出 文档:这实际上意味着在
publishOn
运算符之前,所有操作都将使用任何可用的匿名线程执行。但是之后的所有调用都将在定义的线程池上执行。
您已选择将其放置在之后,以便线程池切换将在调用
getData
之后完成。publishOn
在链中的放置很重要,而subscribeOn
会影响整个操作符链,这意味着它的放置并不重要。因此,再次回答您的问题,是的,这很可能是一个阻塞调用(我无法 100% 确认,因为我没有查看源代码)以及您希望如何使用
publishOn
subscribeOn
取决于您。或者看看是否有一个反应式替代库可供使用。
You can sort of tell that it is a blocking call since it returns a concrete class and not a
Future
(Mono
orFlux
). To be able to return a concrete class, the thread needs to wait until we have the response to return it.So yes it is most likely a blocking call.
Reactor recommends that you use the
subscribeOn
operator when doing blocking calls, this will place that entire chain of operators on its own thread pool.You have chosen to use the
publishOn
and it is worth pointing out the following from the docs:This in practice means that up until the
publishOn
operator all actions will be executed using any available anonymous thread.But all calls after will be executed on the defined thread pool.
You have chosen to place it after so the thread pool switch will be done after the call to
getData
.publishOn
s placing in the chain matters whilesubscribeOn
affects the entire chain of operator which means it's placing does not matter.So to answer your question again, yes it is most likely a blocking call (i can't confirm by 100% since i have not looked into the source code) and how you wish to solve it with either
publishOn
onsubscribeOn
is up to you.Or look into if there is an reactive alternative library to use.