Springboot-2.1.3:使用@Async使用PostalableFuture Aren调用并行方法在不同线程中调用

发布于 2025-02-09 05:32:10 字数 3686 浏览 3 评论 0原文

下面是我的代码,试图并行化4种方法调用,因为每种方法彼此独立并执行了一些内存密集的统计操作。

@EnableAsync
@Configuration
public class Config {
   
    @Bean(name = "threadPoolTaskExecutor")
    public Executor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        
        executor.initialize();
        return executor;
    }
}

class OrderStatsService{

    public CumulativeStats compute() {
        log.info("CumulativeResult compute started at " + System.currentTimeMillis() + ", Current Thread Name: " + Thread.currentThread().getName() + ", Current Thread ID: " + Thread.currentThread().getId());
        List<Order> orders = getOrders();// API Call to fetch large set of orders size could be around 100k
        
        CumulativeResult cumulativeResult = new CumulativeResult();

        CompletableFuture<Long> stats1 = getStats1(orders);
        CompletableFuture<List<String>> result2 = getStats2(orders);
        CompletableFuture<Double> result3 = getStats3(orders);
        CompletableFuture<Map<String,String>> result4 = getStats4(orders);

        cumulativeResult.setStats1(stats1);
        cumulativeResult.setStats2(stats2);
        cumulativeResult.setStats3(stats3);
        cumulativeResult.setStats4(stats4);
        return cumulativeResult;
    }
    
    @Async("threadPoolTaskExecutor")
    public CompletableFuture<Long> getStats1(var orders) {
    log.info("getStats1 started at " + System.currentTimeMillis() + ", Current Thread Name: " + Thread.currentThread().getName() + ", Current Thread ID: " + Thread.currentThread().getId());
        //computes some stats
    }

    @Async("threadPoolTaskExecutor")
    public CompletableFuture<List<String>> getStats2(var orders) {
    log.info("getStats2 started at " + System.currentTimeMillis() + ", Current Thread Name: " + Thread.currentThread().getName() + ", Current Thread ID: " + Thread.currentThread().getId());
        //computes some stats
    }

    @Async("threadPoolTaskExecutor")
    public CompletableFuture<Double>  getStats3(var> orders) {
    log.info("getStats3 started at " + System.currentTimeMillis() + ", Current Thread Name: " + Thread.currentThread().getName() + ", Current Thread ID: " + Thread.currentThread().getId());
        //computes some stats
    }

    @Async("threadPoolTaskExecutor")
    public CompletableFuture<Map<String,String>> getStats4(var orders) {
    log.info("getStats4 started at " + System.currentTimeMillis() + ", Current Thread Name: " + Thread.currentThread().getName() + ", Current Thread ID: " + Thread.currentThread().getId());
        //computes some stats
    }

}

我得到了预期的结果,但注意到正在调用Compute()的主线程执行其他4种方法getStats1getStats2getStats3getStats4方法。

CumulativeResult compute started at 1655783237437, Current Thread Name: http-nio-8080-exec-1, Current Thread ID: 28
getStats1 started at 1655783238022, Current Thread Name: http-nio-8080-exec-1, Current Thread ID: 28
getStats2 started at 1655783238024, Current Thread Name: http-nio-8080-exec-1, Current Thread ID: 28
getStats3 started at 1655783463062, Current Thread Name: http-nio-8080-exec-1, Current Thread ID: 28
getStats4 started at 1655783238085, Current Thread Name: http-nio-8080-exec-1, Current Thread ID: 28

我以为当我们使用完整的future @ASYNC带有@enableasync配置这些方法将被分配一个新线程以执行其执行,请有人可以说明这是预期的行为是否是为了平行方法调用?我的配置有什么问题吗?或者,如果这是预期的行为,当我们执行呼叫者方法和同一线程中的async时,如何实现并行性?

Below is my code in which am trying to parallelize the 4 methods invocations since each method is independent of each other and performs some memory intensive statistical operations.

@EnableAsync
@Configuration
public class Config {
   
    @Bean(name = "threadPoolTaskExecutor")
    public Executor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        
        executor.initialize();
        return executor;
    }
}

class OrderStatsService{

    public CumulativeStats compute() {
        log.info("CumulativeResult compute started at " + System.currentTimeMillis() + ", Current Thread Name: " + Thread.currentThread().getName() + ", Current Thread ID: " + Thread.currentThread().getId());
        List<Order> orders = getOrders();// API Call to fetch large set of orders size could be around 100k
        
        CumulativeResult cumulativeResult = new CumulativeResult();

        CompletableFuture<Long> stats1 = getStats1(orders);
        CompletableFuture<List<String>> result2 = getStats2(orders);
        CompletableFuture<Double> result3 = getStats3(orders);
        CompletableFuture<Map<String,String>> result4 = getStats4(orders);

        cumulativeResult.setStats1(stats1);
        cumulativeResult.setStats2(stats2);
        cumulativeResult.setStats3(stats3);
        cumulativeResult.setStats4(stats4);
        return cumulativeResult;
    }
    
    @Async("threadPoolTaskExecutor")
    public CompletableFuture<Long> getStats1(var orders) {
    log.info("getStats1 started at " + System.currentTimeMillis() + ", Current Thread Name: " + Thread.currentThread().getName() + ", Current Thread ID: " + Thread.currentThread().getId());
        //computes some stats
    }

    @Async("threadPoolTaskExecutor")
    public CompletableFuture<List<String>> getStats2(var orders) {
    log.info("getStats2 started at " + System.currentTimeMillis() + ", Current Thread Name: " + Thread.currentThread().getName() + ", Current Thread ID: " + Thread.currentThread().getId());
        //computes some stats
    }

    @Async("threadPoolTaskExecutor")
    public CompletableFuture<Double>  getStats3(var> orders) {
    log.info("getStats3 started at " + System.currentTimeMillis() + ", Current Thread Name: " + Thread.currentThread().getName() + ", Current Thread ID: " + Thread.currentThread().getId());
        //computes some stats
    }

    @Async("threadPoolTaskExecutor")
    public CompletableFuture<Map<String,String>> getStats4(var orders) {
    log.info("getStats4 started at " + System.currentTimeMillis() + ", Current Thread Name: " + Thread.currentThread().getName() + ", Current Thread ID: " + Thread.currentThread().getId());
        //computes some stats
    }

}

I'm getting the expected result but noticed that the main thread which is invoking the compute() is executing the other 4 methods getStats1,getStats2,getStats3,getStats4 methods as well.

CumulativeResult compute started at 1655783237437, Current Thread Name: http-nio-8080-exec-1, Current Thread ID: 28
getStats1 started at 1655783238022, Current Thread Name: http-nio-8080-exec-1, Current Thread ID: 28
getStats2 started at 1655783238024, Current Thread Name: http-nio-8080-exec-1, Current Thread ID: 28
getStats3 started at 1655783463062, Current Thread Name: http-nio-8080-exec-1, Current Thread ID: 28
getStats4 started at 1655783238085, Current Thread Name: http-nio-8080-exec-1, Current Thread ID: 28

I thought when we use CompletableFuture for @Async methods with @EnableAsync config those methods would be assigned a new thread for their execution, can someone please explain is this is the expected behavior or not for parallel methods invocation? Is there anything wrong with my config? Or if this is the expected behavior how the parallelism is achieved when we are executing the caller method and the async in same thread?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

合约呢 2025-02-16 05:32:10

所需的更改可以在代码中完成。

步骤1:执行多线程的第一步是正确设置线程池配置。

首先设置线程

@Configuration
@EnableAsync
public class ThreadPoolConfiguration {

@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setThreadNamePrefix("thread-");
    threadPoolTaskExecutor.setCorePoolSize(100);
    threadPoolTaskExecutor.setMaxPoolSize(120);
    threadPoolTaskExecutor.setQueueCapacity(100000);
    threadPoolTaskExecutor.initialize();
    return threadPoolTaskExecutor;
}

这是一个示例,说明了如何使用@ASYNC注释

步骤2: ,让我们介绍规则。

  • @Async注释 - 必须仅应用于公共方法。
  • @Async的自我发电将无法使用,这意味着从同一类中调用异步方法将无法使用。

有关@Async 您可以通过

解决方案:

  • 将所有方法与@Async保持在不同的服务中,您可以
    orderstatsservice
  • 又可以做的一件事是从控制器和所有
    @Async方法可以保存在服务类中。
  • 标记您使用bean名称的注释,例如@Async(“ taskexecutor”)

哪种方式都适合您。

The required changes can be done in the code.

Step 1: The very first step to executing multithreading is to set up the thread pool config properly.

Here is an example of how you can set it up the size of the thread

@Configuration
@EnableAsync
public class ThreadPoolConfiguration {

@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setThreadNamePrefix("thread-");
    threadPoolTaskExecutor.setCorePoolSize(100);
    threadPoolTaskExecutor.setMaxPoolSize(120);
    threadPoolTaskExecutor.setQueueCapacity(100000);
    threadPoolTaskExecutor.initialize();
    return threadPoolTaskExecutor;
}

Step 2: Using @Async annotation

First, let's go over the rules.

  • @Async annotation - It must be applied to public methods only.
  • Self-invocation for @Async will not work which means calling the async method from within the same class will not work.

For more information on @Async you can go through

Solutions :

  • Keep all the methods with @Async in different service and you can
    call it from OrderStatsService
  • One more thing you can do is call from the controller and all the
    @Async methods can be kept in the service class.
  • Mark you annotation with the Bean name like @Async("taskExecutor")

Either of the ways will work for you.

青朷 2025-02-16 05:32:10

您应该设置线程池大小以控制线程计数:

final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("CarThread-");

使用完整的图中的春季启动中的多线程

You should set thread pool size for controlling thread count:

final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("CarThread-");

Multi-Threading in Spring Boot Using CompletableFuture

春花秋月 2025-02-16 05:32:10

我建议检查一些事实:

  1. 您的OrderstatsService class static static 是否bean由Spring管理?
  2. @ASYNC bean的方法必须直接从其呼叫者中调用。您在OrderstatsServicebean - 是否已经完成此操作)中正在调用non-@async方法,然后调用@Async方法。根据我的经验,这不会给您预期的行为。

如果我是个虚拟愚蠢的愚蠢,请随时纠正我。

I would recommend checking some facts:

  1. Is your OrderStatsService class static and is it a Bean managed by Spring?
  2. @Async methods of a Bean have to be invoked directly from it's caller. What you are having in your OrderStatsService(Bean-if you've done this already) is calling the non-@Async method, followed by invoking the @Async methods. In my experience, this won't give you the expected behaviour.

Feel free to correct me if I'm being a dummy dumb dumb.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文