在 Spring Webflux Mono 中缓存并行请求

发布于 2025-01-11 06:04:02 字数 1040 浏览 0 评论 0原文

我们正在使用 spring webflux(项目反应器),作为我们需要从服务器调用一个 API 的要求的一部分。

对于 API 调用,我们需要缓存响应。所以我们使用 Mono.cache 运算符。

它会缓存响应 Mono ,下次发生相同的 API 调用时,它将从缓存中获取它。以下是示例实现

public Mono<ResponseDto> getResponse() {
    if (res == null) {
      res =
          fetchResponse()
              .onErrorMap(Exception.class, (error) -> new CustomException())
              .cache(
                  r -> Duration.ofSeconds(r.expiresIn()),
                  error -> Duration.ZERO,
                  () -> Duration.ZERO);
    }
    return res;
  }

问题是,如果服务器同时调用同一 API 两次(例如 Mono.zip),则响应不会被缓存,我们实际上会调用它两次。

有没有现成的解决方案可以解决这个问题?我们是否可以缓存 Mono 本身,以便两个请求都订阅同一个 Mono,从而在单个 API 调用响应后执行,而不是缓存 Response?

它也应该适用于顺序执行 - 恐怕如果我们缓存 Mono,那么一旦请求完成,订阅就会结束,并且没有其他进程可以订阅它。

案例

We are using spring webflux (project reactor), as part of the requirement we need to call one API from our server.

For the API call, we need to cache the response. So we are using Mono.cache operator.

It caches the response Mono<ResponseDto> and the next time the same API call happens, it will get it from the cache. Following is example implementation

public Mono<ResponseDto> getResponse() {
    if (res == null) {
      res =
          fetchResponse()
              .onErrorMap(Exception.class, (error) -> new CustomException())
              .cache(
                  r -> Duration.ofSeconds(r.expiresIn()),
                  error -> Duration.ZERO,
                  () -> Duration.ZERO);
    }
    return res;
  }

The problem is if the server calls the same API call twice ( for example Mono.zip) at the same time, then the response is not cached and we actually call it twice.

Is there any out of box solution available to this problem? Instead of caching the Response, can we cache the Mono itself so that both requests subscribe to the same Mono hence both are executed after a Single API call response?

It should also work with sequential execution too - I am afraid that if we cache the Mono then once the request is completed, the subscription is over and no other process can subscribe to it.

Cases

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

绅刃 2025-01-18 06:04:02

Project Reactor 提供了一个非阻塞但可以踩踏的缓存实用程序 CacheMono。

AsyncCache 将更好地集成,对于使用键“K”的第一次查找将导致缓存未命中,它将返回 API 调用的 CompletableFuture,而对于使用相同键“K”的第二次查找将获得相同的 CompletableFuture 对象。

返回的 future 对象可以使用 Mono.fromFuture() 与 Mono 相互转换

 public Mono<ResponseData> lookupAndWrite(AsyncCache<String, ResponseData> cache, String key) {
return Mono.defer(
    () ->
        Mono.fromFuture(
            cache.get(
                key,
                (searchKey, executor) -> {
                  CompletableFuture<ResponseData> future = callAPI(searchKey).toFuture();
                  return future.whenComplete(
                      (r, t) -> {
                        if (t != null) {
                          cache.synchronous().invalidate(key);
                        }
                      });
                })));}

Project Reactor provides a cache utility CacheMono that is non-blocking but can stampede.

AsyncCache will be better integration, for the first lookup with key "K" will result in a cache miss, it will return a CompletableFuture of the API call and for the second lookup with the same key "K" will get the same CompletableFuture object.

The returned future object can be converted to/from Mono with Mono.fromFuture()

 public Mono<ResponseData> lookupAndWrite(AsyncCache<String, ResponseData> cache, String key) {
return Mono.defer(
    () ->
        Mono.fromFuture(
            cache.get(
                key,
                (searchKey, executor) -> {
                  CompletableFuture<ResponseData> future = callAPI(searchKey).toFuture();
                  return future.whenComplete(
                      (r, t) -> {
                        if (t != null) {
                          cache.synchronous().invalidate(key);
                        }
                      });
                })));}
岁吢 2025-01-18 06:04:02

您可以在构造函数中初始化 Mono(假设它不依赖于任何请求时间参数)。使用 cache 运算符将阻止对源的多个订阅。

class MyService {
    private final Mono<ResponseBodyDto> response;

    public MyService() {
        response = fetchResponse()
            .onErrorMap(Exception.class, (error) -> new CustomException())
            .cache(
                r -> Duration.ofSeconds(r.expiresIn()),
                error -> Duration.ZERO,
                () -> Duration.ZERO);
    }

    public Mono<ResponseDto> getResponse() {
        return response;
    }
}

如果对请求时间参数有依赖性,您应该考虑一些自定义缓存解决方案。

You can initialize the Mono in the constructor (assuming it doesn't depend on any request time parameter). Using cache operator will prevent multiple subscriptions to the source.

class MyService {
    private final Mono<ResponseBodyDto> response;

    public MyService() {
        response = fetchResponse()
            .onErrorMap(Exception.class, (error) -> new CustomException())
            .cache(
                r -> Duration.ofSeconds(r.expiresIn()),
                error -> Duration.ZERO,
                () -> Duration.ZERO);
    }

    public Mono<ResponseDto> getResponse() {
        return response;
    }
}

If there is a dependency on request time parameters, you should consider some custom caching solution.

诗笺 2025-01-18 06:04:02

您可以使用 io.projectreactor.addons:reactor-extra 中的 CacheMono 来包装非响应式缓存实现,例如 Guava Cache 或简单的 ConcurrentHashMap。它不提供“恰好一次”保证,并行请求可能会导致缓存未命中,但在许多情况下,这不应该成为问题。

这是 Guava Cache 的示例

public class GlobalSettingsCache {
    private final GlobalSettingsClient globalSettingsClient;
    private final Cache<String, GlobalSettings> cache;

    public GlobalSettingsCache(GlobalSettingsClient globalSettingsClient, Duration cacheTtl) {
        this.globalSettingsClient = globalSettingsClient;
        this.cache = CacheBuilder.newBuilder()
                .expireAfterWrite(cacheTtl)
                .build();
    }

    public Mono<GlobalSettings> get(String tenant) {
        return CacheMono.lookup(key -> Mono.justOrEmpty(cache.getIfPresent(key)).map(Signal::next), tenant)
                .onCacheMissResume(() -> fetchGlobalSettings(tenant))
                .andWriteWith((key, signal) -> Mono.fromRunnable(() ->
                        Optional.ofNullable(signal.get())
                                .ifPresent(value -> cache.put(key, value))));
    }

    private Mono<GlobalSettings> fetchGlobalSettings(String tenant) {
        return globalSettingsClient.getGlobalSettings(tenant);
    }
}

You could use CacheMono from io.projectreactor.addons:reactor-extra to wrap non-reactive cache implementation like Guava Cache or simple ConcurrentHashMap. It doesn't provide an "exactly-once" guarantee and parallel requests could result in cache misses, but in many scenarios, it should not be an issue.

Here is an example with Guava Cache

public class GlobalSettingsCache {
    private final GlobalSettingsClient globalSettingsClient;
    private final Cache<String, GlobalSettings> cache;

    public GlobalSettingsCache(GlobalSettingsClient globalSettingsClient, Duration cacheTtl) {
        this.globalSettingsClient = globalSettingsClient;
        this.cache = CacheBuilder.newBuilder()
                .expireAfterWrite(cacheTtl)
                .build();
    }

    public Mono<GlobalSettings> get(String tenant) {
        return CacheMono.lookup(key -> Mono.justOrEmpty(cache.getIfPresent(key)).map(Signal::next), tenant)
                .onCacheMissResume(() -> fetchGlobalSettings(tenant))
                .andWriteWith((key, signal) -> Mono.fromRunnable(() ->
                        Optional.ofNullable(signal.get())
                                .ifPresent(value -> cache.put(key, value))));
    }

    private Mono<GlobalSettings> fetchGlobalSettings(String tenant) {
        return globalSettingsClient.getGlobalSettings(tenant);
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文