在 Spring Webflux Mono 中缓存并行请求
我们正在使用 spring webflux(项目反应器),作为我们需要从服务器调用一个 API 的要求的一部分。
对于 API 调用,我们需要缓存响应。所以我们使用 Mono.cache
运算符。
它会缓存响应 Mono
,下次发生相同的 API 调用时,它将从缓存中获取它。以下是示例实现
public Mono<ResponseDto> getResponse() {
if (res == null) {
res =
fetchResponse()
.onErrorMap(Exception.class, (error) -> new CustomException())
.cache(
r -> Duration.ofSeconds(r.expiresIn()),
error -> Duration.ZERO,
() -> Duration.ZERO);
}
return res;
}
问题是,如果服务器同时调用同一 API 两次(例如 Mono.zip
),则响应不会被缓存,我们实际上会调用它两次。
有没有现成的解决方案可以解决这个问题?我们是否可以缓存 Mono 本身,以便两个请求都订阅同一个 Mono,从而在单个 API 调用响应后执行,而不是缓存 Response?
它也应该适用于顺序执行 - 恐怕如果我们缓存 Mono,那么一旦请求完成,订阅就会结束,并且没有其他进程可以订阅它。
We are using spring webflux (project reactor), as part of the requirement we need to call one API from our server.
For the API call, we need to cache the response. So we are using Mono.cache
operator.
It caches the response Mono<ResponseDto>
and the next time the same API call happens, it will get it from the cache. Following is example implementation
public Mono<ResponseDto> getResponse() {
if (res == null) {
res =
fetchResponse()
.onErrorMap(Exception.class, (error) -> new CustomException())
.cache(
r -> Duration.ofSeconds(r.expiresIn()),
error -> Duration.ZERO,
() -> Duration.ZERO);
}
return res;
}
The problem is if the server calls the same API call twice ( for example Mono.zip
) at the same time, then the response is not cached and we actually call it twice.
Is there any out of box solution available to this problem? Instead of caching the Response, can we cache the Mono itself so that both requests subscribe to the same Mono hence both are executed after a Single API call response?
It should also work with sequential execution too - I am afraid that if we cache the Mono then once the request is completed, the subscription is over and no other process can subscribe to it.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
Project Reactor 提供了一个非阻塞但可以踩踏的缓存实用程序 CacheMono。
AsyncCache 将更好地集成,对于使用键“K”的第一次查找将导致缓存未命中,它将返回 API 调用的 CompletableFuture,而对于使用相同键“K”的第二次查找将获得相同的 CompletableFuture 对象。
返回的 future 对象可以使用
Mono.fromFuture()
与 Mono 相互转换Project Reactor provides a cache utility CacheMono that is non-blocking but can stampede.
AsyncCache will be better integration, for the first lookup with key "K" will result in a cache miss, it will return a CompletableFuture of the API call and for the second lookup with the same key "K" will get the same CompletableFuture object.
The returned future object can be converted to/from Mono with
Mono.fromFuture()
您可以在构造函数中初始化
Mono
(假设它不依赖于任何请求时间参数)。使用cache
运算符将阻止对源的多个订阅。如果对请求时间参数有依赖性,您应该考虑一些自定义缓存解决方案。
You can initialize the
Mono
in the constructor (assuming it doesn't depend on any request time parameter). Usingcache
operator will prevent multiple subscriptions to the source.If there is a dependency on request time parameters, you should consider some custom caching solution.
您可以使用 io.projectreactor.addons:reactor-extra 中的 CacheMono 来包装非响应式缓存实现,例如 Guava Cache 或简单的 ConcurrentHashMap。它不提供“恰好一次”保证,并行请求可能会导致缓存未命中,但在许多情况下,这不应该成为问题。
这是 Guava Cache 的示例
You could use
CacheMono
fromio.projectreactor.addons:reactor-extra
to wrap non-reactive cache implementation like Guava Cache or simpleConcurrentHashMap
. It doesn't provide an "exactly-once" guarantee and parallel requests could result in cache misses, but in many scenarios, it should not be an issue.Here is an example with Guava Cache