带有Spring Boot的数千个休息电话

发布于 2025-01-22 03:49:48 字数 339 浏览 3 评论 0原文

假设我们有以下实体:project版本,这是一对一关系。

在从SQS队列中进行的事件消耗后,将发布ID作为事件的一部分发送,我们可能必须在DB中创建数千个版本,在每个版本中,我们必须将休息调用到一个第三方服务以获取每个版本的一些信息。

这意味着我们可能必须拨打数千个呼叫,在某些情况下,只有20k的电话才能检索不同版本的信息并将其存储在DB中。

显然,这是不可扩展的,因此我不确定在这种情况下要做什么。

我知道我可能会使用完整的未来,但是我不确定如何在春季使用它。

我正在使用的HTTP客户端是网络电视机。

有什么想法吗?

Let's say that we have the following entities: Project and Release, which is a one to many relationship.

Upon an event consumption from an SQS queue where a release id is sent as part of the event, there might be scenarios where we might have to create thousands of releases in our DB, where for each release we have to make a rest call to a 3rd party service in order to get some information for each release.

That means that we might have to make thousands of calls, in some cases more than 20k calls just to retrieve the information for the different releases and store it in the DB.

Obviously this is not scalable, so I'm not really sure what's the way to go in this scenario.

I know I might use a CompletableFuture, but I'm not sure how to use that with spring.

The http client that I am using is WebClient.

Any ideas?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

软糯酥胸 2025-01-29 03:49:48

您可以通过在方法签名上方添加注释>@transactional 来在方法中进行保存查询。该方法也应公开,否则该注释将被忽略。

至于在弹簧中使用完整的future;您可以通过将@ASYNC注释在其签名上方添加@ASYNC通过让其返回postalableFuture作为返回类型来使HTTP调用方法异步。您应该返回一个完整的将来,以持有HTTP呼叫的响应值。您可以通过方法postimablefuture.completedfuture(yourvalue)轻松地完成未来。 Spring只会在执行所有代码块的所有操作执行所有操作后,才能返回完整的未来。为了@ASYNC要工作,您还必须将@enableasync注释添加到您的一个配置类中。最重要的是,@async注释方法必须为public,并且不能通过同一类中的方法调用。如果该方法为private是从同一类中调用,则@async注释将被忽略,而该方法将在执行与调用方法相同的线程。

@Async注释方法旁边,您还可以使用ParallerStream并行执行所有20K HTTP调用。例如:

List<Long> releaseIds = new ArrayList<>();
Map<Long,ReleaseInfo> releaseInfo = releaseIds.parallelStream().map(releaseId -> new AbstractMap.SimpleEntry<>(releaseId, webClient.getReleaseInfo(releaseId)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

最后,您还可以使用threadpoolexecutor并行执行HTTP调用。一个示例:

List<Long> releaseIds = new ArrayList<>();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); //I've made the amount of threads in the pool equal to the amount of available CPU processors on the machine.

//Submit tasks to the executor
List<Future<ReleaseInfo>> releaseInfoFutures = releaseIds.stream().map(releaseId -> executor.submit(() -> webClient.getReleaseInfo(releaseId)).collect(Collectors.toList());

//Wait for all futures to complete and map all non-null values to ReleaseInfo list.
List<ReleaseInfo> releaseInfo = releaseInfoFutures.stream().map(this::getValueAfterFutureCompletion).filter(releaseInfo -> releaseInfo != null).collect(Collectors.toList());

    private ReleaseInfo getValueAfterFutureCompletion(Future<ReleaseInfo> future){
        ReleaseInfo releaseInfo = null;
        try {
            releaseInfo = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            return releaseInfo;
        }
    }

确保在thread> threadpoolexecutor上调用shutdownnow(),以避免内存泄漏。

You can make the save queries in a method transactional by adding the annotation @Transactional above the method signature. The method should also be public, or else this annotation is ignored.

As for using CompletableFuture in spring; You could make a http call method asynchronous by adding the @Async annotation above its signature and by letting it return a CompletableFuture as a return type. You should return a completed future holding the response value from the http call. You can easily make a completed future with the method CompletableFuture.completedFuture(yourValue). Spring will only return the completed future once the asynchronous method is done executing everything int its code block. For @Async to work you must also add the @EnableAsync annotation to one of your configuration classes. On top of that the @Async annotated method must be public and cannot be called by a method from within the same class. If the method is private or is called from within the same class then the @Async annotation will be ignored and instead the method will be executed in the same thread as the calling method is executed.

Next to an @Async annotated method you could also use a parallelStream to execute all 20K http calls in parallel. For example:

List<Long> releaseIds = new ArrayList<>();
Map<Long,ReleaseInfo> releaseInfo = releaseIds.parallelStream().map(releaseId -> new AbstractMap.SimpleEntry<>(releaseId, webClient.getReleaseInfo(releaseId)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Lastly you could also use a ThreadPoolExecutor to execute the http calls in parallel. An example:

List<Long> releaseIds = new ArrayList<>();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); //I've made the amount of threads in the pool equal to the amount of available CPU processors on the machine.

//Submit tasks to the executor
List<Future<ReleaseInfo>> releaseInfoFutures = releaseIds.stream().map(releaseId -> executor.submit(() -> webClient.getReleaseInfo(releaseId)).collect(Collectors.toList());

//Wait for all futures to complete and map all non-null values to ReleaseInfo list.
List<ReleaseInfo> releaseInfo = releaseInfoFutures.stream().map(this::getValueAfterFutureCompletion).filter(releaseInfo -> releaseInfo != null).collect(Collectors.toList());

    private ReleaseInfo getValueAfterFutureCompletion(Future<ReleaseInfo> future){
        ReleaseInfo releaseInfo = null;
        try {
            releaseInfo = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            return releaseInfo;
        }
    }

Make sure to call shutdownNow() on ThreadPoolExecutor after you're done with it to avoid memory leaks.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文